Source code for tensorwaves.physics.helicity_formalism.amplitude

"""Amplitude module for the helicity formalism.

Its responsibility is the construction of complicated helicity formalism
amplitude models using a recipe (see `IntensityBuilder`). These models are
encapsulated in an `IntensityTF` class, which can be evaluated as a regular
callable.
"""

import logging
from typing import (
    Callable,
    Dict,
    List,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Type,
)

import amplitf.interface as atfi
import expertsystem.amplitude.model as es
import numpy as np
import tensorflow as tf
from amplitf.dynamics import (
    blatt_weisskopf_ff_squared,
    relativistic_breit_wigner,
)
from amplitf.kinematics import two_body_momentum_squared, wigner_capital_d
from expertsystem.particle import Particle, ParticleCollection
from sympy.physics.quantum.cg import CG

from tensorwaves.interfaces import Function

from .kinematics import HelicityKinematics, SubSystem


[docs]class IntensityTF(Function): """Implementation of the `~.Function` interface using tensorflow. Initialize the intensity based on a tensorflow model. Args: tf_model: A callable with potential tensorflow code. parameters: The collection of parameters of the model. """ def __init__(self, tf_model: Callable, parameters: Dict[str, tf.Variable]): self._model = tf_model self.__parameters = parameters
[docs] def __call__(self, dataset: Dict[str, np.ndarray]) -> np.ndarray: """Evaluate the Intensity. Args: dataset: Contains all required kinematic variables. Returns: List of intensity values. """ # it is crucial to convert the input data to tensors # otherwise the memory footprint can increase dramatically newdataset = {x: tf.constant(y) for x, y in dataset.items()} return self._model(newdataset).numpy()
@property def parameters(self) -> Dict[str, tf.Variable]: return {x: y.value().numpy() for x, y in self.__parameters.items()}
[docs] def update_parameters(self, new_parameters: dict) -> None: for name, value in new_parameters.items(): self.__parameters[name].assign(value)
[docs]class IntensityBuilder: """Builds Intensities from helicity formalism recipe files. Args: particles: Contains info of various particles. kinematics: A helicity kinematics instance. Note that this kinematics instance will be modified in the process. phsp_data: A phase space event collection, required if a normalization of the Intensity is performed. """ def __init__( self, particles: ParticleCollection, kinematics: HelicityKinematics, phsp_data: Optional[np.ndarray] = None, ): self._particles = particles self._dynamics: Optional[es.ParticleDynamics] = None self._kinematics = kinematics self._parameters: Dict[str, tf.Variable] = {} if phsp_data is None: phsp_data = np.array([]) self._phsp_data = phsp_data self._registered_element_builders: Dict[Type[es.Node], Callable] = { es.NormalizedIntensity: _create_normalized_intensity, es.StrengthIntensity: _create_strength_intensity, es.IncoherentIntensity: _create_incoherent_intensity, es.CoherentIntensity: _create_coherent_intensity, es.CoefficientAmplitude: _create_coefficient_amplitude, es.SequentialAmplitude: _create_sequential_amplitude, es.HelicityDecay: _create_helicity_decay, es.CanonicalDecay: _create_helicity_decay, } self._registered_dynamics_builders: Dict[ Type[es.Dynamics], Callable ] = { es.NonDynamic: _NonDynamic, es.RelativisticBreitWigner: _RelativisticBreitWigner, }
[docs] def create_intensity(self, model: es.AmplitudeModel) -> IntensityTF: """Create an `IntensityTF` instance based on a recipe. Args: model: Contains builder instructions. These recipe files can be generated via the expert system (see :doc:`expertsystem:usage/workflow`). """ self._dynamics = model.dynamics self._initialize_parameters(model) return IntensityTF( self.create_element(model.intensity), self._parameters )
[docs] def create_element(self, intensity_node: es.Node) -> Callable: """Create a computation element from the recipe. The recipe can only contain names registered in the pool of known element builders. """ element_class = type(intensity_node) logging.debug("creating %s", element_class) if element_class not in self._registered_element_builders: raise Exception(f"Unknown element {element_class.__name__}!") return self._registered_element_builders[element_class]( self, intensity_node, kinematics=self._kinematics )
[docs] def create_dynamics( self, decaying_state: Particle, dynamics_properties: "DynamicsProperties", ) -> Callable: """Create a dynamics function callable.""" if self._dynamics is None: raise ValueError("Dynamics has not yet been set") decay_dynamics = self._dynamics[decaying_state.name] kwargs = {} form_factor = getattr(decay_dynamics, "form_factor", None) if ( form_factor is not None and not dynamics_properties.orbit_angular_momentum.is_integer() ): raise ValueError( "Model invalid! Using a non integer value for the orbital" " angular momentum L. Seems like you are using the helicity" " formalism, but should be using the canonical formalism" ) if isinstance(form_factor, es.BlattWeisskopf): kwargs.update({"form_factor": "BlattWeisskopf"}) meson_radius_val = form_factor.meson_radius.value meson_radius_par = self.register_parameter( f"MesonRadius_{decaying_state.name}", meson_radius_val, ) dynamics_properties = dynamics_properties._replace( meson_radius=meson_radius_par ) dynamics_builder = self._get_dynamics_builder(decaying_state.name) return dynamics_builder(dynamics_properties, **kwargs)
[docs] def register_dynamics_builder( self, dynamics_name: Type[es.Dynamics], builder: Callable[[str, "DynamicsProperties"], Callable], ) -> None: """Register custom dynamics function builders.""" if dynamics_name in self._registered_dynamics_builders: logging.warning( "Overwriting previously defined builder for %s", dynamics_name ) self._registered_dynamics_builders[dynamics_name] = builder
def _get_dynamics_builder( self, decaying_state_name: str ) -> Callable[..., Callable]: if self._dynamics is None: raise ValueError("Dynamics has not been set") dynamics = self._dynamics[decaying_state_name] if type(dynamics) not in self._registered_dynamics_builders: raise ValueError( f"Dynamics ({dynamics.__class__.__name__}) unknown. " f"Use one of the following: \n" f"{list(self._registered_dynamics_builders.keys())}" ) return self._registered_dynamics_builders[type(dynamics)]
[docs] def register_parameter(self, name: str, value: float) -> tf.Variable: if name not in self._parameters: self._parameters[name] = tf.Variable( value, name=name, dtype=tf.float64 ) return self._parameters[name]
[docs] def get_parameter(self, name: str) -> tf.Variable: if name not in self._parameters: raise Exception(f'Parameter "{name}" not registered') return self._parameters[name]
def _initialize_parameters(self, model: es.AmplitudeModel) -> None: parameters: List[es.FitParameter] = list(model.parameters.values()) for par in parameters: self._parameters[par.name] = tf.Variable( par.value, name=par.name, dtype=tf.float64 )
[docs] def get_normalization_data(self) -> Tuple[dict, float]: """Return phase space dataset and its volume.""" if self._phsp_data.size == 0: raise Exception( "No phase space sample given! This is required for the " "normalization." ) return ( self._kinematics.convert(self._phsp_data), self._kinematics.phase_space_volume, )
class _NormalizedIntensity: def __init__( self, unnormalized_intensity: Callable, norm_dataset: dict, norm_volume: float = 1.0, ) -> None: self._model = unnormalized_intensity self._norm_dataset = norm_dataset self._norm_volume = norm_volume @tf.function def __call__(self, dataset: dict) -> tf.Tensor: normalization = tf.multiply( self._norm_volume, tf.reduce_mean(self._model(self._norm_dataset)), ) return tf.divide(self._model(dataset), normalization) def _create_normalized_intensity( builder: IntensityBuilder, node: es.Node, **_: dict ) -> Callable: if not isinstance(node, es.NormalizedIntensity): raise TypeError( f"Requires {es.NormalizedIntensity.__class__.__name__}" ) model = builder.create_element(node.intensity) dataset, volume = builder.get_normalization_data() # its important to convert the dataset to tf tensors (memory footprint) dataset = {x: tf.constant(y) for x, y in dataset.items()} return _NormalizedIntensity(model, dataset, atfi.const(volume)) class _StrengthIntensity: def __init__(self, intensity: Callable, strength: tf.Variable) -> None: self._strength = strength self._intensity = intensity def __call__(self, dataset: dict) -> tf.Tensor: return self._strength * self._intensity(dataset) def _create_strength_intensity( builder: IntensityBuilder, node: es.Node, **_: dict ) -> Callable: if not isinstance(node, es.StrengthIntensity): raise TypeError strength = builder.get_parameter(node.strength.name) intensity = builder.create_element(node.intensity) return _StrengthIntensity(intensity, strength) class _IncoherentIntensity: def __init__(self, intensities: List[Callable]) -> None: self._intensities = intensities def __call__(self, dataset: dict) -> tf.Tensor: return tf.math.accumulate_n([y(dataset) for y in self._intensities]) def _create_incoherent_intensity( builder: IntensityBuilder, node: es.Node, **_: dict ) -> Callable: if not isinstance(node, es.IncoherentIntensity): raise TypeError intensities = [builder.create_element(x) for x in node.intensities] return _IncoherentIntensity(intensities) class _CoherentIntensity: def __init__(self, amplitudes: List[Callable]) -> None: self._amps = amplitudes def __call__(self, dataset: dict) -> tf.Tensor: return tf.pow( tf.cast( # pylint: disable=no-value-for-parameter,unexpected-keyword-arg tf.abs(tf.add_n([amp(dataset) for amp in self._amps])), dtype=tf.float64, ), tf.constant(2.0, dtype=tf.float64), ) def _create_coherent_intensity( builder: IntensityBuilder, node: es.Node, **_: dict ) -> Callable: if not isinstance(node, es.CoherentIntensity): raise TypeError amplitudes = [builder.create_element(x) for x in node.amplitudes] return _CoherentIntensity(amplitudes) class _CoefficientAmplitude: def __init__( self, amplitude: Callable, mag: tf.Variable, phase: tf.Variable, pre_factor: Optional[float] = None, ): self._mag = mag self._phase = phase self._amp = amplitude if pre_factor is None: self._pre_factor = tf.constant(1.0, dtype=tf.float64) else: self._pre_factor = tf.constant(pre_factor, dtype=tf.float64) def __call__(self, dataset: dict) -> tf.Tensor: coefficient = atfi.polar(self._pre_factor * self._mag, self._phase) return coefficient * self._amp(dataset) def _create_coefficient_amplitude( builder: IntensityBuilder, node: es.Node, **_: dict ) -> Callable: if not isinstance(node, es.CoefficientAmplitude): raise TypeError magnitude = builder.get_parameter(node.magnitude.name) phase = builder.get_parameter(node.phase.name) amplitude = builder.create_element(node.amplitude) return _CoefficientAmplitude(amplitude, magnitude, phase, node.prefactor) class _SequentialAmplitude: def __init__(self, amplitudes: List[Callable]) -> None: self._seq_amps = amplitudes def __call__(self, dataset: dict) -> tf.Tensor: seq_amp = atfi.complex(atfi.const(1.0), atfi.const(0.0)) for amp in self._seq_amps: seq_amp = seq_amp * amp(dataset) return seq_amp def _create_sequential_amplitude( builder: IntensityBuilder, node: es.Node, **_: dict ) -> Callable: if not isinstance(node, es.SequentialAmplitude): raise TypeError if len(node.amplitudes) == 0: raise Exception( "Sequential Amplitude requires a non-empty list of amplitudes!" ) return _SequentialAmplitude( [builder.create_element(x) for x in node.amplitudes] ) class _AngularProperties(NamedTuple): j: float m: float mprime: float theta_name: str phi_name: str
[docs]class DynamicsProperties(NamedTuple): """Data structure representing dynamic properties.""" orbit_angular_momentum: float resonance_mass: float resonance_width: float inv_mass_name: str inv_mass_name_prod1: str inv_mass_name_prod2: str meson_radius: Optional[float]
class _RelativisticBreitWigner: def __init__( self, dynamics_props: DynamicsProperties, form_factor: Optional[es.FormFactor] = None, ) -> None: self._dynamics_props = dynamics_props self._call_wrapper = self._without_form_factor if isinstance(form_factor, es.BlattWeisskopf): self._call_wrapper = self._with_form_factor def __call__(self, dataset: dict) -> tf.Tensor: return self._call_wrapper(dataset) def _without_form_factor(self, dataset: dict) -> tf.Tensor: mass0 = self._dynamics_props.resonance_mass gamma0 = self._dynamics_props.resonance_width return ( relativistic_breit_wigner( dataset[self._dynamics_props.inv_mass_name], self._dynamics_props.resonance_mass, self._dynamics_props.resonance_width, ) * atfi.complex(mass0 * gamma0, atfi.const(0.0)) ) def _with_form_factor(self, dataset: dict) -> tf.Tensor: inv_mass_squared = dataset[self._dynamics_props.inv_mass_name] inv_mass = atfi.sqrt(inv_mass_squared) mass0 = self._dynamics_props.resonance_mass gamma0 = self._dynamics_props.resonance_width m_a = atfi.sqrt(dataset[self._dynamics_props.inv_mass_name_prod1]) m_b = atfi.sqrt(dataset[self._dynamics_props.inv_mass_name_prod2]) meson_radius = self._dynamics_props.meson_radius l_orbit = self._dynamics_props.orbit_angular_momentum q_squared = two_body_momentum_squared(inv_mass, m_a, m_b) q0_squared = two_body_momentum_squared(mass0, m_a, m_b) ff2 = blatt_weisskopf_ff_squared(q_squared, meson_radius, l_orbit) ff02 = blatt_weisskopf_ff_squared(q0_squared, meson_radius, l_orbit) width = gamma0 * (mass0 / inv_mass) * (ff2 / ff02) # So far its all in float64, # but for the sqrt operation it has to be converted to complex width = atfi.complex( width, tf.constant(0.0, dtype=tf.float64) ) * atfi.sqrt( atfi.complex( (q_squared / q0_squared), tf.constant(0.0, dtype=tf.float64), ) ) return relativistic_breit_wigner( inv_mass_squared, mass0, width ) * atfi.complex(mass0 * gamma0 * atfi.sqrt(ff2), atfi.const(0.0)) class _NonDynamic: def __init__( self, dynamics_props: DynamicsProperties, form_factor: Optional[es.FormFactor] = None, ) -> None: self._dynamics_props = dynamics_props self._call_wrapper: Callable[ [dict], tf.Tensor ] = self._without_form_factor if isinstance(form_factor, es.BlattWeisskopf): self._call_wrapper = self._with_form_factor def __call__(self, dataset: dict) -> tf.Tensor: return self._call_wrapper(dataset) @staticmethod def _without_form_factor(_: dict) -> tf.Tensor: return tf.complex( tf.constant(1.0, dtype=tf.float64), tf.constant(0.0, dtype=tf.float64), ) def _with_form_factor(self, dataset: dict) -> tf.Tensor: inv_mass = atfi.sqrt(dataset[self._dynamics_props.inv_mass_name]) m_a = atfi.sqrt(dataset[self._dynamics_props.inv_mass_name_prod1]) m_b = atfi.sqrt(dataset[self._dynamics_props.inv_mass_name_prod2]) meson_radius = self._dynamics_props.meson_radius l_orbit = self._dynamics_props.orbit_angular_momentum q_squared = two_body_momentum_squared(inv_mass, m_a, m_b) return atfi.complex( atfi.sqrt( blatt_weisskopf_ff_squared(q_squared, meson_radius, l_orbit) ), atfi.const(0.0), ) class _HelicityDecay: def __init__( self, angular_params: "_AngularProperties", dynamics_function: Callable, prefactor: float = 1.0, ) -> None: self._params = angular_params self._dynamics_function = dynamics_function self._prefactor = prefactor def __call__(self, dataset: dict) -> tf.Tensor: return ( self._prefactor * wigner_capital_d( dataset[self._params.phi_name], dataset[self._params.theta_name], 0.0, int(2 * self._params.j), int(2 * self._params.m), int(2 * self._params.mprime), ) * self._dynamics_function(dataset) ) def _clebsch_gordan_coefficient(clebsch_gordan: es.ClebschGordan) -> float: return ( CG( j1=clebsch_gordan.j_1, m1=clebsch_gordan.m_1, j2=clebsch_gordan.j_2, m2=clebsch_gordan.m_2, j3=clebsch_gordan.J, m3=clebsch_gordan.M, ) .doit() .evalf() ) def _determine_canonical_prefactor(node: es.CanonicalDecay) -> float: l_s = _clebsch_gordan_coefficient(node.l_s) s2s3 = _clebsch_gordan_coefficient(node.s2s3) return float(l_s * s2s3) def _create_helicity_decay( # pylint: disable=too-many-locals builder: IntensityBuilder, node: es.Node, kinematics: HelicityKinematics, ) -> Callable: if not isinstance(node, (es.HelicityDecay, es.CanonicalDecay)): raise TypeError decaying_state = node.decaying_particle decay_products = node.decay_products dec_prod_fs_ids = [x.final_state_ids for x in decay_products] recoil_final_state = [] parent_recoil_final_state = [] recoil_system = node.recoil_system if recoil_system is not None: recoil_final_state = recoil_system.recoil_final_state if recoil_system.parent_recoil_final_state is not None: parent_recoil_final_state = recoil_system.parent_recoil_final_state inv_mass_name, theta_name, phi_name = kinematics.register_subsystem( SubSystem( dec_prod_fs_ids, recoil_final_state, parent_recoil_final_state, ) ) particle = decaying_state.particle j = particle.spin prefactor = 1.0 if isinstance(node, es.CanonicalDecay): prefactor = _determine_canonical_prefactor(node) dynamics = _create_dynamics( builder, node, dec_prod_fs_ids, decaying_state, inv_mass_name, kinematics, ) return _HelicityDecay( _AngularProperties( j=j, m=decaying_state.helicity, mprime=decay_products[0].helicity - decay_products[1].helicity, theta_name=theta_name, phi_name=phi_name, ), dynamics, prefactor=prefactor, ) def _create_dynamics( builder: IntensityBuilder, amplitude_node: es.AmplitudeNode, dec_prod_fs_ids: Sequence, decaying_state: es.HelicityParticle, inv_mass_name: str, kinematics: HelicityKinematics, ) -> Callable: particle = decaying_state.particle orbit_angular_momentum = particle.spin if isinstance(amplitude_node, es.CanonicalDecay): orbit_angular_momentum = amplitude_node.l_s.j_1 dynamics = builder.create_dynamics( particle, DynamicsProperties( orbit_angular_momentum=orbit_angular_momentum, resonance_mass=builder.register_parameter( f"Position_{particle.name}", particle.mass, ), resonance_width=builder.register_parameter( f"Width_{particle.name}", particle.width, ), inv_mass_name=inv_mass_name, inv_mass_name_prod1=kinematics.register_invariant_mass( dec_prod_fs_ids[0] ), inv_mass_name_prod2=kinematics.register_invariant_mass( dec_prod_fs_ids[1] ), meson_radius=None, ), ) return dynamics