"""Collection of loggers that can be inserted into an optimizer as callback."""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import IO, Any, Dict, Iterable, List, Optional
import pandas as pd
import tensorflow as tf
import yaml
[docs]class Loadable(ABC):
[docs] @staticmethod
@abstractmethod
def load_latest_parameters(filename: str) -> dict:
pass
[docs]class Callback(ABC):
[docs] @abstractmethod
def on_iteration_end(
self, function_call: int, logs: Optional[Dict[str, Any]] = None
) -> None:
pass
[docs] @abstractmethod
def on_function_call_end(self) -> None:
pass
[docs]class CallbackList(Callback):
"""Class for combining `Callback` s.
Combine different `Callback` classes in to a chain as follows:
>>> from tensorwaves.optimizer.callbacks import (
... CallbackList, TFSummary, YAMLSummary
... )
>>> from tensorwaves.optimizer.minuit import Minuit2
>>> optimizer = Minuit2(
... callback=CallbackList([TFSummary(), YAMLSummary("result.yml")])
... )
"""
def __init__(self, callbacks: Iterable[Callback]) -> None:
self.__callbacks: List[Callback] = list()
for callback in callbacks:
self.__callbacks.append(callback)
[docs] def on_iteration_end(
self, function_call: int, logs: Optional[Dict[str, Any]] = None
) -> None:
for callback in self.__callbacks:
callback.on_iteration_end(function_call, logs)
[docs] def on_function_call_end(self) -> None:
for callback in self.__callbacks:
callback.on_function_call_end()
[docs]class CSVSummary(Callback, Loadable):
def __init__(self, filename: str, step_size: int = 10) -> None:
"""Log fit parameters and the estimator value to a CSV file."""
self.__step_size = step_size
self.__first_call = True
self.__stream = open(filename, "w")
_empty_file(self.__stream)
[docs] def on_iteration_end(
self, function_call: int, logs: Optional[Dict[str, Any]] = None
) -> None:
if logs is None:
return
if function_call % self.__step_size != 0:
return
output_dict = {
"function_call": function_call,
"time": logs["time"],
"estimator_type": logs["estimator"]["type"],
"estimator_value": logs["estimator"]["value"],
**logs["parameters"],
}
data_frame = pd.DataFrame(output_dict, index=[function_call])
data_frame.to_csv(
self.__stream,
mode="a",
header=self.__first_call,
index=False,
)
self.__first_call = False
[docs] def on_function_call_end(self) -> None:
self.__stream.close()
[docs] @staticmethod
def load_latest_parameters(filename: str) -> dict:
fit_traceback = pd.read_csv(filename)
parameter_traceback = fit_traceback[fit_traceback.columns[4:]]
parameter_names = parameter_traceback.columns
latest_parameter_values = parameter_traceback.iloc[-1]
return dict(zip(parameter_names, latest_parameter_values))
[docs]class TFSummary(Callback):
def __init__(
self,
logdir: str = "logs",
step_size: int = 10,
subdir: Optional[str] = None,
) -> None:
"""Log fit parameters and the estimator value to a `tf.summary`.
The logs can be viewed with `TensorBoard
<https://www.tensorflow.org/tensorboard>`_ via:
.. code-block:: bash
tensorboard --logdir logs
"""
output_dir = logdir + "/" + datetime.now().strftime("%Y%m%d-%H%M%S")
if subdir is not None:
output_dir += "/" + subdir
self.__file_writer = tf.summary.create_file_writer(output_dir)
self.__file_writer.set_as_default()
self.__step_size = step_size
[docs] def on_iteration_end(
self, function_call: int, logs: Optional[Dict[str, Any]] = None
) -> None:
if logs is None:
return
if function_call % self.__step_size != 0:
return
parameters = logs["parameters"]
for par_name, value in parameters.items():
tf.summary.scalar(par_name, value, step=function_call)
estimator_value = logs.get("estimator", {}).get("value", None)
if estimator_value is not None:
tf.summary.scalar("estimator", estimator_value, step=function_call)
self.__file_writer.flush()
[docs] def on_function_call_end(self) -> None:
self.__file_writer.close()
[docs]class YAMLSummary(Callback, Loadable):
def __init__(self, filename: str, step_size: int = 10) -> None:
"""Log fit parameters and the estimator value to a `tf.summary`.
The logs can be viewed with `TensorBoard
<https://www.tensorflow.org/tensorboard>`_ via:
.. code-block:: bash
tensorboard --logdir logs
"""
self.__step_size = step_size
self.__stream = open(filename, "w")
[docs] def on_iteration_end(
self, function_call: int, logs: Optional[Dict[str, Any]] = None
) -> None:
if function_call % self.__step_size != 0:
return
_empty_file(self.__stream)
yaml.dump(
logs,
self.__stream,
sort_keys=False,
Dumper=_IncreasedIndent,
default_flow_style=False,
)
[docs] def on_function_call_end(self) -> None:
self.__stream.close()
[docs] @staticmethod
def load_latest_parameters(filename: str) -> dict:
with open(filename) as stream:
fit_stats = yaml.load(stream, Loader=yaml.SafeLoader)
return fit_stats["parameters"]
class _IncreasedIndent(yaml.Dumper):
# pylint: disable=too-many-ancestors
def increase_indent(self, flow=False, indentless=False): # type: ignore
return super().increase_indent(flow, False)
def _empty_file(stream: IO) -> None:
stream.seek(0)
stream.truncate()