Source code for tensorwaves.optimizer.callbacks

"""Collection of loggers that can be inserted into an optimizer as callback."""

import csv
import os
from abc import ABC, abstractmethod
from datetime import datetime
from typing import IO, Any, Dict, Iterable, List, Optional, Union

import numpy as np
import tensorflow as tf
import yaml


[docs]class Loadable(ABC):
[docs] @staticmethod @abstractmethod def load_latest_parameters(filename: str) -> dict: pass
[docs]class Callback(ABC): """Interface for callbacks such as `.CSVSummary`. .. seealso:: :ref:`usage/step3:Custom callbacks` """
[docs] @abstractmethod def on_optimize_start(self, logs: Optional[Dict[str, Any]] = None) -> None: pass
[docs] @abstractmethod def on_optimize_end(self, logs: Optional[Dict[str, Any]] = None) -> None: pass
[docs] @abstractmethod def on_iteration_end( self, iteration: int, logs: Optional[Dict[str, Any]] = None ) -> None: pass
[docs] @abstractmethod def on_function_call_end( self, function_call: int, logs: Optional[Dict[str, Any]] = None ) -> None: pass
[docs]class CallbackList(Callback): """Class for combining `Callback` s. Combine different `Callback` classes in to a chain as follows: >>> from tensorwaves.optimizer.callbacks import ( ... CallbackList, TFSummary, YAMLSummary ... ) >>> from tensorwaves.optimizer.minuit import Minuit2 >>> optimizer = Minuit2( ... callback=CallbackList([TFSummary(), YAMLSummary("result.yml")]) ... ) """ def __init__(self, callbacks: Iterable[Callback]) -> None: self.__callbacks: List[Callback] = list() for callback in callbacks: self.__callbacks.append(callback)
[docs] def on_optimize_start(self, logs: Optional[Dict[str, Any]] = None) -> None: for callback in self.__callbacks: callback.on_optimize_start(logs)
[docs] def on_optimize_end(self, logs: Optional[Dict[str, Any]] = None) -> None: for callback in self.__callbacks: callback.on_optimize_end(logs)
[docs] def on_iteration_end( self, iteration: int, logs: Optional[Dict[str, Any]] = None ) -> None: for callback in self.__callbacks: callback.on_iteration_end(iteration, logs)
[docs] def on_function_call_end( self, function_call: int, logs: Optional[Dict[str, Any]] = None ) -> None: for callback in self.__callbacks: callback.on_function_call_end(function_call, logs)
[docs]class CSVSummary(Callback, Loadable): def __init__( self, filename: str, function_call_step_size: int = 1, iteration_step_size: Optional[int] = None, ) -> None: """Log fit parameters and the estimator value to a CSV file.""" if iteration_step_size is None: iteration_step_size = 0 if function_call_step_size <= 0 and iteration_step_size <= 0: raise ValueError( "either function call or interaction step size should > 0." ) self.__function_call_step_size = function_call_step_size self.__iteration_step_size = iteration_step_size self.__latest_function_call: Optional[int] = None self.__latest_iteration: Optional[int] = None self.__writer: Optional[csv.DictWriter] = None self.__filename = filename self.__stream: IO = open(os.devnull, "w")
[docs] def on_optimize_start(self, logs: Optional[Dict[str, Any]] = None) -> None: if logs is None: raise ValueError( f"{self.__class__.__name__} requires logs on optimize start" " to determine header names" ) if self.__function_call_step_size > 0: self.__latest_function_call = 0 if self.__iteration_step_size > 0: self.__latest_iteration = 0 self.__stream = open(self.__filename, "w", newline="") self.__writer = csv.DictWriter( self.__stream, fieldnames=list(self.__log_to_rowdict(logs)), quoting=csv.QUOTE_NONNUMERIC, ) self.__writer.writeheader()
[docs] def on_optimize_end(self, logs: Optional[Dict[str, Any]] = None) -> None: if logs is not None: self.__latest_function_call = None self.__latest_iteration = None self.__write(logs) if self.__stream: self.__stream.close()
[docs] def on_iteration_end( self, iteration: int, logs: Optional[Dict[str, Any]] = None ) -> None: self.__latest_iteration = iteration if logs is None: return if ( self.__iteration_step_size is None or self.__latest_iteration % self.__iteration_step_size != 0 ): return self.__write(logs)
[docs] def on_function_call_end( self, function_call: int, logs: Optional[Dict[str, Any]] = None ) -> None: self.__latest_function_call = function_call if logs is None: return if ( self.__function_call_step_size is None or self.__latest_function_call % self.__function_call_step_size != 0 ): return self.__write(logs)
def __write(self, logs: Dict[str, Any]) -> None: if self.__writer is None: raise ValueError( f"{csv.DictWriter.__name__} has not been initialized" ) row_dict = self.__log_to_rowdict(logs) self.__writer.writerow(row_dict) def __log_to_rowdict(self, logs: Dict[str, Any]) -> Dict[str, Any]: output = { "time": logs["time"], "estimator_type": logs["estimator"]["type"], "estimator_value": logs["estimator"]["value"], **logs["parameters"], } if self.__latest_function_call is not None: output = { "function_call": self.__latest_function_call, **output, } if self.__latest_iteration is not None: output = { "iteration": self.__latest_iteration, **output, } return output
[docs] @staticmethod def load_latest_parameters(filename: str) -> dict: def cast_non_numeric(value: str) -> Union[complex, float, str]: # https://docs.python.org/3/library/csv.html#csv.QUOTE_NONNUMERIC # does not work well for complex numbers try: return complex(value) except ValueError: try: return float(value) except ValueError: return value with open(filename, "r") as stream: reader = csv.DictReader(stream) last_line = list(reader)[-1] return { name: cast_non_numeric(value) for name, value in last_line.items() }
[docs]class TFSummary(Callback): def __init__( self, logdir: str = "logs", step_size: int = 10, subdir: Optional[str] = None, ) -> None: """Log fit parameters and the estimator value to a `tf.summary`. The logs can be viewed with `TensorBoard <https://www.tensorflow.org/tensorboard>`_ via: .. code-block:: shell tensorboard --logdir logs """ self.__logdir = logdir self.__subdir = subdir self.__step_size = step_size self.__file_writer = open(os.devnull, "w")
[docs] def on_optimize_start(self, logs: Optional[Dict[str, Any]] = None) -> None: # pylint: disable=no-member output_dir = ( self.__logdir + "/" + datetime.now().strftime("%Y%m%d-%H%M%S") ) if self.__subdir is not None: output_dir += "/" + self.__subdir self.__file_writer = tf.summary.create_file_writer(output_dir) self.__file_writer.set_as_default() # type: ignore
[docs] def on_optimize_end(self, logs: Optional[Dict[str, Any]] = None) -> None: if self.__file_writer: self.__file_writer.close()
[docs] def on_iteration_end( self, iteration: int, logs: Optional[Dict[str, Any]] = None ) -> None: pass
[docs] def on_function_call_end( self, function_call: int, logs: Optional[Dict[str, Any]] = None ) -> None: # pylint: disable=no-member if logs is None: return if function_call % self.__step_size != 0: return parameters = logs["parameters"] for par_name, value in parameters.items(): tf.summary.scalar(par_name, value, step=function_call) estimator_value = logs.get("estimator", {}).get("value", None) if estimator_value is not None: tf.summary.scalar("estimator", estimator_value, step=function_call) self.__file_writer.flush()
[docs]class YAMLSummary(Callback, Loadable): def __init__(self, filename: str, step_size: int = 10) -> None: """Log fit parameters and the estimator value to a `tf.summary`. The logs can be viewed with `TensorBoard <https://www.tensorflow.org/tensorboard>`_ via: .. code-block:: shell tensorboard --logdir logs """ self.__step_size = step_size self.__filename = filename self.__stream: IO = open(os.devnull, "w")
[docs] def on_optimize_start(self, logs: Optional[Dict[str, Any]] = None) -> None: self.__stream = open(self.__filename, "w")
[docs] def on_optimize_end(self, logs: Optional[Dict[str, Any]] = None) -> None: if logs is None: return self.__dump_to_yaml(logs) self.__stream.close()
[docs] def on_iteration_end( self, iteration: int, logs: Optional[Dict[str, Any]] = None ) -> None: pass
[docs] def on_function_call_end( self, function_call: int, logs: Optional[Dict[str, Any]] = None ) -> None: if logs is None: return if function_call % self.__step_size != 0: return self.__dump_to_yaml(logs)
def __dump_to_yaml(self, logs: Dict[str, Any]) -> None: _empty_file(self.__stream) cast_logs = dict(logs) cast_logs["parameters"] = { p: _cast_value(v) for p, v in logs["parameters"].items() } yaml.dump( cast_logs, self.__stream, sort_keys=False, Dumper=_IncreasedIndent, default_flow_style=False, )
[docs] @staticmethod def load_latest_parameters(filename: str) -> dict: with open(filename) as stream: fit_stats = yaml.load(stream, Loader=yaml.Loader) return fit_stats["parameters"]
def _cast_value(value: Any) -> Union[complex, float]: # cspell:ignore iscomplex if np.iscomplex(value) or isinstance(value, complex): return complex(value) return float(value) class _IncreasedIndent(yaml.Dumper): # pylint: disable=too-many-ancestors def increase_indent(self, flow=False, indentless=False): # type: ignore return super().increase_indent(flow, False) def _empty_file(stream: IO) -> None: stream.seek(0) stream.truncate()