In [None]:
%%capture
%config Completer.use_jedi = False
%config InlineBackend.figure_formats = ['svg']
import os

STATIC_WEB_PAGE = {"EXECUTE_NB", "READTHEDOCS"}.intersection(os.environ)

# Install on Google Colab
import subprocess
import sys

from IPython import get_ipython

install_packages = "google.colab" in str(get_ipython())
if install_packages:
    for package in ["tensorwaves[doc]", "graphviz"]:
        subprocess.check_call(
            [sys.executable, "-m", "pip", "install", package]
        )

# Core ideas illustrated

The {doc}`/usage` page illustrates how to use {mod}`tensorwaves` in combination with {mod}`ampform`. At core, however, {mod}`tensorwaves` is a package that can 'fit' arbitrary models to a data set using different backends, as well as generate toy data samples.

These pages illustrate what's going on behind the scenes with some simple 1-dimensional and 2-dimensional models. Since we don't have a data sample to fit a model to, we follow the the same procedure as in the {doc}`/usage` page:

1. Formulate a mathematical description with {mod}`sympy`.
2. Generate a 'toy' data sample with hit-and-miss.
3. Convert the model to some backend with {mod}`tensorwaves`.
4. Tweak the parameters and fit the modified to the generated data sample.

In [None]:
!pip install black

In [None]:
from typing import Dict, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sympy as sp
from sympy.plotting import plot3d

from tensorwaves.estimator import UnbinnedNLL
from tensorwaves.interfaces import Function
from tensorwaves.model import LambdifiedFunction, SympyModel
from tensorwaves.optimizer.callbacks import CSVSummary
from tensorwaves.optimizer.minuit import Minuit2

## Formulate model

First of all, we'll formulate a 1-dimensional model with {mod}`sympy`. In this example, we take a sum of [Gaussians](https://en.wikipedia.org/wiki/Gaussian_function) plus some [Poisson](https://en.wikipedia.org/wiki/Poisson_distribution) distribution.

In [None]:
def gaussian(x: sp.Symbol, mu: sp.Symbol, sigma: sp.Symbol) -> sp.Expr:
    return sp.exp(-(((x - mu) / sigma) ** 2) / 2)


def poisson(x: sp.Symbol, k: sp.Symbol) -> sp.Expr:
    return x ** k * sp.exp(-x) / sp.factorial(k)

In [None]:
x, mu, sigma = sp.symbols("x, mu, sigma", real=True)
k = sp.Symbol("k", integer=True)
lam = sp.Symbol("lambda", positive=True)
style = "<style>#output-body{display:flex; flex-direction: row;}</style>"
display(
    gaussian(x, mu, sigma),
    poisson(lam, k),
)

In [None]:
x, a, b, c, mu1, mu2, sigma1, sigma2 = sp.symbols(
    "x, (a:c), mu_(:2), sigma_(:2)", real=True
)
expression_1d = (
    a * gaussian(x, mu1, sigma1)
    + b * gaussian(x, mu2, sigma2)
    + c * poisson(x, k=2)
)
expression_1d

The expression above consists of a number of {class}`~sympy.core.symbol.Symbol`s that we want to identify as **parameters** (that we want to optimize with regard to a certain data sample) and **variables** (in which the data sample is expressed). Let's say $x$ is the variable and that the rest of the {class}`~sympy.core.symbol.Symbol`s are the parameters.

Here, we'll pick some default values for the parameter and use them to plot the model with regard to the variable $x$ (see {meth}`~sympy.core.basic.Basic.subs`). The default values are used later on as well when we generate data.

```{margin}
We use {attr}`~sympy.core.basic.Basic.args` here to extract the components of the sum that forms this expression. See {doc}`sympy:tutorial/manipulation`.
```

In [None]:
parameter_defaults = {
    a: 0.15,
    b: 0.05,
    c: 0.3,
    mu1: 1.0,
    sigma1: 0.3,
    mu2: 2.7,
    sigma2: 0.5,
}
x_range = (x, 0, 5)
substituted_expr_1d = expression_1d.subs(parameter_defaults)
p1 = sp.plot(substituted_expr_1d, x_range, show=False, line_color="red")
p2 = sp.plot(*substituted_expr_1d.args, x_range, show=False, line_color="gray")
p2.append(p1[0])
p2.show()

## Convert to backend

So far, all we did was using {mod}`sympy` to mathematically formulate a model. We now need to {func}`~sympy.utilities.lambdify.lambdify` that expression to some computational backend, so that we can efficiently generate data and/or optimize the parameters to 'fit' the model to some data sample.

{mod}`tensorwaves` does this in two intermediate stages. First, the expression and parameter defaults are expressed in terms of a {class}`.SympyModel` (interface: {class}`.Model`) that serves as a template for different computational backends.

In [None]:
model_1d = SympyModel(
    expression=expression_1d,
    parameters=parameter_defaults,
)

Next, we use this {class}`.SympyModel` as a template to construct a {class}`.LambdifiedFunction` (interface: {class}`.Function`). We choose {mod}`numpy` as computational backend, as we're not doing any optimizing yet (for optimizing it's better to use {obj}`jax <jax.jit>`).

In [None]:
function_1d = LambdifiedFunction(model_1d, backend="numpy")

Internally, the {class}`.LambdifiedFunction` carries some source code that numpy understands. And it indeed looks similar to the expression that we formulated in {ref}`usage/basics:Formulate model`:

In [None]:
import inspect

from black import FileMode, format_str

print(
    format_str(
        inspect.getsource(function_1d._LambdifiedFunction__lambdified_model),
        mode=FileMode(),
    )
)

The {class}`.LambdifiedFunction` also carries the original default values for the {attr}`~.LambdifiedFunction.parameters` that we defined earlier on.

In [None]:
function_1d.parameters

The {meth}`.LambdifiedFunction.__call__` takes a {class}`dict` of variable names (here, `"x"` only) to the value(s) that should be used in their place.

In [None]:
function_1d({"x": 0})

This is where we move to the data generation â€• the input values are usually a list of values (expressed in the backend):

In [None]:
rng = np.random.default_rng()
x_values = np.linspace(0, 5, num=20)
y_values = function_1d({"x": x_values})
y_values

In [None]:
plt.scatter(x_values, y_values)
plt.gca().set_xlabel("$x$")
plt.gca().set_ylabel("$f(x)$");

## Generate data

So, we now have a function $f$ of $x$ expressed in some computational backend. This function is to describe a distribution of $x$ values. In the real world, these $x$-values are some observables from a process you measure. But sometimes, it's useful to generate a 'toy' data sample for your model function as well, to try it out.

### Hit & miss

The challenge is to generate values of $x$ with a density that is proportional to the value of the function evaluated at that point. To do this, we use a **hit & miss** approach:

1. Generate a random value for $x$ within the domain $(x_\mathrm{min}, x_\mathrm{max})$ on which you want to generate the data sample.
2. Generate a random value $y$ between $0$ and the maximum value $y_\mathrm{max}$ of the function over the domain of $x$.
3. Check if $y$ lies below $f(x)$ ("hit") or above ("miss").
4. If there is a "hit", accept this value of $x$ and add it to the data sample.

We keep performing this until the sample of $x$ values contains the desired number of events.

In [None]:
x_domain = np.linspace(0, 5, num=200)
y_values = function_1d({"x": x_domain})
fig = plt.figure(figsize=(8, 5))
plt.plot(x_domain, y_values)
plt.gca().set_xlabel("$x$")
plt.gca().set_ylabel("$f(x)$")

x_min = x_range[1]
x_max = x_range[2]
y_max = 0.21
x_value = 1.5

line_segment = [[0, 0], [0, y_max]]
plt.plot(*line_segment, color="black")
plt.text(
    -0.22,
    y_max / 2 * 0.5,
    "uniform sample $(0, y_{max})$",
    rotation="vertical",
)
plt.axhline(y=y_max, linestyle="dotted", color="black")
plt.text(
    x_min + 0.1,
    y_max - 0.01,
    "$y_{max}$",
)

line_segment = [[x_min, x_max], [0, 0]]
plt.plot(*line_segment, color="black")
plt.text(
    (x_max - x_min) / 2 - 0.22,
    0.005,
    R"uniform sample $(x_\mathrm{min}, x_\mathrm{max})$",
)
plt.scatter(x_value, function_1d({"x": x_value}))
plt.axvline(x=x_value, linestyle="dotted")


def draw_y_hit(x_random, y_random):
    y_value = function_1d({"x": x_random})
    color = "green" if y_random < y_value else "red"
    text = "hit" if y_random < y_value else "miss"
    plt.scatter(0, y_random, color=color)
    plt.arrow(
        x=0,
        y=y_random,
        dx=x_random,
        dy=0,
        head_length=0.15,
        length_includes_head=True,
        color=color,
        linestyle="dotted",
    )
    plt.text(x_value + 0.05, y_random, text)


draw_y_hit(x_random=x_value, y_random=0.05)
draw_y_hit(x_random=x_value, y_random=0.17)

There is one problem though: _how to determine $y_\mathrm{max}$?_ In this example, we can just read off the value of $y_\mathrm{max}$, or even compute it analytically from the original {class}`sympy.Expr <sympy.core.expr.Expr>`. This is not the case generally though, so we need to apply a trick.

Since we are generating **uniformly distributed** random values values of $x$ and computing their $f(x)$ values, we can keep track of which values of $f(x)$ is the highest. Starting with $y_\mathrm{max} = 0$ we just set $y_\mathrm{max} = f(x)$ once $f(x) > y_\mathrm{max}$ and _completely restart_ the generate loop. Eventually, some value of $x$ will lie near the absolute maximum of $f$ and the data generation will happily continue until the requested number of events has been reached.

```{warning}
There are two caveats:
1. The domain sample (here: the uniformly distributed values of $x$) has to be large in order for the data sample to accurately describe the original function.
2. The the function displays narrow structures, like some sharp global maximum containing $y_\mathrm{max}$, changes are smaller that the value of $x$ will lie within this peak. The domain sample will therefore have to be even larger. It will also take longer before $y_\mathrm{max}$ is found.
```

### Domain distribution

First of all, we need to randomly generate values of $x$. In this simple, 1-dimensional example, we could just use a random generator like {class}`numpy.random.Generator` feed its output to the {meth}`.LambdifiedFunction.__call__`. Generally, though, we want to cover $n$-dimensional cases. So here's a small example function that generates a **uniform** distribution for each variable within a certain range:

In [None]:
def generate_domain(
    size: int,
    boundaries: Dict[str, Tuple[float, float]],
    rng: Optional[np.random.Generator] = None,
) -> Dict[str, np.ndarray]:
    if rng is None:
        rng = np.random.default_rng()
    return {
        var_name: rng.uniform(size=size, low=low, high=high)
        for var_name, (low, high) in boundaries.items()
    }

We can see that this works correctly, by feeding the sample generated by this to the {class}`.LambdifiedFunction`, then using its output values as weights to the histogram of the uniform sample.

In [None]:
domain_sample = generate_domain(1_000_000, {"x": (0, 5)})

In [None]:
plt.hist(
    domain_sample["x"],
    bins=200,
    density=True,
    alpha=0.5,
    label="uniform",
)
plt.hist(
    domain_sample["x"],
    weights=function_1d(domain_sample),
    bins=200,
    alpha=0.5,
    density=True,
    label="weighted with $f$",
)
plt.legend();

```{note}
In PWA, the sample on which we perform hit-and-miss is not uniform, because the available space is limited by the masses of the initial and final state (phase space). See {func}`.generate_phsp`.
```

### Intensity distribution

With a {ref}`usage/basics:Domain distribution` in hand, we can work out an implementation for the {ref}`usage/basics:Hit & miss` approach:

In [None]:
def generate_data(
    size: int,
    boundaries: Dict[str, Tuple[float, float]],
    function: Function,
    rng: Optional[np.random.Generator] = None,
    bunch_size: int = 10_000,
) -> Dict[str, np.ndarray]:
    if rng is None:
        rng = np.random.default_rng()
    output = {var: np.array([]) for var in boundaries}
    some_variable = next(iter(boundaries))
    while len(output[some_variable]) < size:
        phsp = generate_domain(bunch_size, boundaries, rng)
        y_values = function(phsp)
        y_max = np.max(y_values)
        random_y_values = rng.uniform(size=bunch_size, high=y_max)
        hit_and_miss_sample = {
            var: phsp[var][random_y_values < y_values] for var in boundaries
        }
        output = {
            var: np.concatenate([output[var], hit_and_miss_sample[var]])
            for var in boundaries
        }
    output = {var: output[var][:size] for var in boundaries}
    return output

And indeed, it results in the correct distribution!

In [None]:
data = generate_data(
    1_000_000,
    function=function_1d,
    boundaries={"x": (0, 5)},
)

In [None]:
plt.hist(data["x"], bins=200);

## Optimize the model

For the rest, the procedure is really just the same as that sketched in {doc}`/usage/step3`.

We tweak the parameters a bit, then use {meth}`.LambdifiedFunction.update_parameters` to change the function...

In [None]:
initial_parameters = {
    "a": 0.2,
    "b": 0.1,
    "c": 0.2,
    "mu_0": 0.9,
    "sigma_0": 0.4,
    "sigma_1": 0.4,
}
function_1d.update_parameters(initial_parameters)

...compare what this looks like compared to the data...

In [None]:
plt.hist(data["x"], bins=200, density=True)
plt.hist(
    domain_sample["x"],
    weights=function_1d(domain_sample),
    bins=200,
    histtype="step",
    color="red",
    density=True,
);

...define an {class}`.Estimator` and choose {obj}`jax <jax.jit>` as backend...

In [None]:
estimator = UnbinnedNLL(model_1d, data, domain_sample, backend="jax")

...optimize with {class}`.Minuit2`.

In [None]:
minuit2 = Minuit2(callback=CSVSummary("traceback.csv"))
result = minuit2.optimize(estimator, initial_parameters)
result

And again, we have a look at the resulting fit, as well as what happened during the optimization.

In [None]:
optimized_parameters = result.parameter_values
function_1d.update_parameters(optimized_parameters)

In [None]:
plt.hist(data["x"], bins=200, density=True)
plt.hist(
    domain_sample["x"],
    weights=function_1d(domain_sample),
    bins=200,
    histtype="step",
    color="red",
    density=True,
);

In [None]:
fit_traceback = pd.read_csv("traceback.csv")
fig, (ax1, ax2) = plt.subplots(
    2, figsize=(7, 9), sharex=True, gridspec_kw={"height_ratios": [1, 2]}
)
fit_traceback.plot("function_call", "estimator_value", ax=ax1)
fit_traceback.plot("function_call", sorted(initial_parameters), ax=ax2)
fig.tight_layout()
ax2.set_xlabel("function call");

## Example in 2D

The idea illustrated above works for any number of dimensions. Let's create multiply the expression we had with some $\cos$ as a function of $y$:

In [None]:
y, omega = sp.symbols("y, omega", real=True)
expression_2d = expression_1d * sp.cos(y * omega) ** 2
expression_2d

In [None]:
parameter_defaults[omega] = 0.5

In [None]:
y_range = (y, -sp.pi, +sp.pi)
substituted_expr_2d = expression_2d.subs(parameter_defaults)
plot3d(substituted_expr_2d, x_range, y_range);

In [None]:
model_2d = SympyModel(
    expression=expression_2d,
    parameters=parameter_defaults,
)
function_2d = LambdifiedFunction(model_2d, backend="numpy")

### Generate 2D data

In [None]:
boundaries = {"x": (0, 5), "y": (-np.pi, +np.pi)}
domain_sample = generate_domain(
    size=300_000,
    boundaries=boundaries,
)
data = generate_data(
    30_000,
    function=function_2d,
    boundaries=boundaries,
)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
intensities = function_2d(domain_sample)
kwargs = {
    "weights": intensities,
    "bins": 100,
    "density": True,
}
axes[0].hist(domain_sample["x"], **kwargs)
axes[1].hist(domain_sample["y"], **kwargs)
axes[0].set_xlabel("$x$")
axes[1].set_xlabel("$y$")
axes[0].set_ylabel("$f(x, y)$")
axes[1].set_ylabel("$f(x, y)$")
axes[0].set_yticks([])
axes[1].set_yticks([]);

### Perform fit

In [None]:
initial_parameters = {
    "a": 0.1,
    "b": 0.1,
    "c": 0.2,
    "mu_0": 0.9,
    "omega": 0.35,
    "sigma_0": 0.4,
    "sigma_1": 0.4,
}
function_2d.update_parameters(initial_parameters)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(9, 4), sharey=True, tight_layout=True)
axes[0].hist2d(**data, bins=50)
axes[1].hist2d(**domain_sample, weights=function_2d(domain_sample), bins=50)
axes[0].set_xlabel("$x$")
axes[0].set_ylim([-3, +3])
axes[1].set_xlabel("$x$")
axes[0].set_ylabel("$y$")
axes[0].set_title("Data sample")
axes[1].set_title("Function with optimized parameters");

In [None]:
estimator = UnbinnedNLL(model_2d, data, domain_sample, backend="jax")
minuit2 = Minuit2(callback=CSVSummary("traceback.csv"))
result = minuit2.optimize(estimator, initial_parameters)
result

### Analyze result

In [None]:
optimized_parameters = result.parameter_values
function_2d.update_parameters(optimized_parameters)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(9, 4), sharey=True, tight_layout=True)
axes[0].hist2d(**data, bins=50)
axes[1].hist2d(**domain_sample, weights=function_2d(domain_sample), bins=50)
axes[0].set_xlabel("$x$")
axes[0].set_ylim([-3, +3])
axes[1].set_xlabel("$x$")
axes[0].set_ylabel("$y$")
axes[0].set_title("Data sample")
axes[1].set_title("Function with initial parameters");

In [None]:
fit_traceback = pd.read_csv("traceback.csv")
fig, (ax1, ax2) = plt.subplots(
    2, figsize=(7, 9), sharex=True, gridspec_kw={"height_ratios": [1, 2]}
)
fit_traceback.plot("function_call", "estimator_value", ax=ax1)
fit_traceback.plot("function_call", sorted(initial_parameters), ax=ax2)
fig.tight_layout()
ax2.set_xlabel("function call");