Source code for madminer.fisherinformation
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
import os
from madminer.analysis import DataAnalyzer
from madminer.utils.various import math_commands, weighted_quantile, sanitize_array, mdot
from madminer.utils.various import separate_information_blocks
from madminer.ml import ScoreEstimator, Ensemble
logger = logging.getLogger(__name__)
[docs]class FisherInformation(DataAnalyzer):
"""
Functions to calculate expected Fisher information matrices.
After inializing a `FisherInformation` instance with the filename of a MadMiner file, different information matrices
can be calculated:
* `FisherInformation.calculate_fisher_information_full_truth()` calculates the full truth-level Fisher information.
This is the information in an idealized measurement where all parton-level particles with their charges, flavours,
and four-momenta can be accessed with perfect accuracy.
* `FisherInformation.calculate_fisher_information_full_detector()` calculates the full Fisher information in
realistic detector-level observations, estimated with neural networks. In addition to the MadMiner file, this
requires a trained SALLY or SALLINO estimator as well as an unweighted evaluation sample.
* `FisherInformation.calculate_fisher_information_rate()` calculates the Fisher information in the total cross
section.
* `FisherInformation.calculate_fisher_information_hist1d()` calculates the Fisher information in the histogram of
one (parton-level or detector-level) observable.
* `FisherInformation.calculate_fisher_information_hist2d()` calculates the Fisher information in a two-dimensional
histogram of two (parton-level or detector-level) observables.
* `FisherInformation.histogram_of_fisher_information()` calculates the full truth-level Fisher information in
different slices of one observable (the "distribution of the Fisher information").
Finally, don't forget that in the presence of nuisance parameters the constraint terms also affect the Fisher
information. This term is given by `FisherInformation.calculate_fisher_information_nuisance_constraints()`.
Parameters
----------
filename : str
Path to MadMiner file (for instance the output of `madminer.delphes.DelphesProcessor.save()`).
include_nuisance_parameters : bool, optional
If True, nuisance parameters are taken into account. Default value: True.
"""
def __init__(self, filename, include_nuisance_parameters=True):
super(FisherInformation, self).__init__(filename, False, include_nuisance_parameters)
[docs] def calculate_fisher_information_full_truth(
self, theta, luminosity=300000.0, cuts=None, efficiency_functions=None, include_nuisance_parameters=True
):
"""
Calculates the full Fisher information at parton / truth level. This is the information in an idealized
measurement where all parton-level particles with their charges, flavours, and four-momenta can be accessed with
perfect accuracy, i.e. the latent variables `z_parton` can be measured directly.
Parameters
----------
theta : ndarray
Parameter point `theta` at which the Fisher information matrix `I_ij(theta)` is evaluated.
luminosity : float
Luminosity in pb^-1.
cuts : None or list of str, optional
Cuts. Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
efficiency_functions : list of str or None
Efficiencies. Each entry is a parseable Python expression that returns a float for the efficiency of one
component. Default value: None.
include_nuisance_parameters : bool, optional
If True, nuisance parameters are taken into account. Default value: True.
Returns
-------
fisher_information : ndarray
Expected full truth-level Fisher information matrix with shape `(n_parameters, n_parameters)`.
fisher_information_uncertainty : ndarray
Covariance matrix of the Fisher information matrix with shape
`(n_parameters, n_parameters, n_parameters, n_parameters)`, calculated with plain Gaussian error
propagation.
"""
# Input
if cuts is None:
cuts = []
if efficiency_functions is None:
efficiency_functions = []
include_nuisance_parameters = include_nuisance_parameters and (self.nuisance_parameters is not None)
# Loop over batches
n_all_parameters = self.n_parameters
if include_nuisance_parameters:
n_all_parameters += self.n_nuisance_parameters
fisher_info = np.zeros((n_all_parameters, n_all_parameters))
covariance = np.zeros((n_all_parameters, n_all_parameters, n_all_parameters, n_all_parameters))
for observations, weights in self.event_loader():
# Cuts
cut_filter = [self._pass_cuts(obs_event, cuts) for obs_event in observations]
observations = observations[cut_filter]
weights = weights[cut_filter]
# Efficiencies
efficiencies = np.array(
[self._eval_efficiency(obs_event, efficiency_functions) for obs_event in observations]
)
weights *= efficiencies[:, np.newaxis]
# Fisher information
this_fisher_info, this_covariance = self._calculate_fisher_information(
theta,
weights,
luminosity,
sum_events=True,
calculate_uncertainty=True,
include_nuisance_parameters=include_nuisance_parameters,
)
fisher_info += this_fisher_info
covariance += this_covariance
return fisher_info, covariance
[docs] def calculate_fisher_information_full_detector(
self,
theta,
model_file,
unweighted_x_sample_file=None,
luminosity=300000.0,
include_xsec_info=True,
mode="score",
calculate_covariance=True,
batch_size=100000,
test_split=0.2,
):
"""
Calculates the full Fisher information in realistic detector-level observations, estimated with neural networks.
In addition to the MadMiner file, this requires a trained SALLY or SALLINO estimator.
Nuisance parameter are taken into account automatically if the SALLY / SALLINO model was trained with them.
Parameters
----------
theta : ndarray
Parameter point `theta` at which the Fisher information matrix `I_ij(theta)` is evaluated.
model_file : str
Filename of a trained local score regression model that was trained on samples from `theta` (see
`madminer.ml.Estimator`).
unweighted_x_sample_file : str or None
Filename of an unweighted x sample that is sampled according to theta and obeys the cuts
(see `madminer.sampling.SampleAugmenter.extract_samples_train_local()`). If None, the Fisher information
is instead calculated on the full, weighted samples (the data in the MadMiner file). Default value: None.
luminosity : float, optional
Luminosity in pb^-1. Default value: 300000.
include_xsec_info : bool, optional
Whether the rate information is included in the returned Fisher information. Default value: True.
mode : {"score", "information"}, optional
How the ensemble uncertainty on the kinematic Fisher information is calculated. If mode is "information",
the Fisher information for each estimator is calculated individually and only then
are the sample mean and covariance calculated. If mode is "score", the sample mean is
calculated for the score for each event. Default value: "score".
calculate_covariance : bool, optional
If True, the covariance between the different estimators is calculated. Default value: True.
batch_size : int, optional
Batch size. Default value: 100000.
test_split : float or None, optional
If unweighted_x_sample_file is None, this determines the fraction of weighted events used for evaluation.
If None, all events are used (this will probably include events used during training!). Default value: 0.2.
Returns
-------
fisher_information : ndarray or list of ndarray
Estimated expected full detector-level Fisher information matrix with shape `(n_parameters, n_parameters)`.
If more then one value ensemble_vote_expectation_weight is given, this is a list with results for all
entries in ensemble_vote_expectation_weight.
fisher_information_uncertainty : ndarray or list of ndarray or None
Covariance matrix of the Fisher information matrix with shape
`(n_parameters, n_parameters, n_parameters, n_parameters)`. If more then one value
ensemble_vote_expectation_weight is given, this is a list with results for all entries in
ensemble_vote_expectation_weight.
"""
# Check input
if mode not in ["score", "information"]:
raise ValueError("Unknown mode {}, has to be 'score' or 'information'!".format(mode))
# Load SALLY model
if os.path.isdir(model_file) and os.path.exists(model_file + "/ensemble.json"):
model_is_ensemble = True
model = Ensemble()
model.load(model_file)
else:
model_is_ensemble = False
model = ScoreEstimator()
model.load(model_file)
# Nuisance parameters?
if model.n_parameters == self.n_parameters:
logger.info(
"Found %s parameters in SALLY model, matching %s physical parameters in MadMiner file",
model.n_parameters,
self.n_parameters,
)
include_nuisance_parameters = False
elif model.n_parameters == self.n_parameters + self.n_nuisance_parameters:
logger.info(
"Found %s parameters in SALLY model, matching %s physical parameters + %s nuisance parameters"
+ " in MadMiner file",
model.n_parameters,
self.n_parameters,
self.n_nuisance_parameters,
)
include_nuisance_parameters = True
else:
raise RuntimeError(
"Inconsistent numbers of parameters! Found %s in SALLY model, %s physical parameters in "
"MadMiner file, and %s nuisance parameters in MadMiner file.",
model.n_parameters,
self.n_parameters,
self.n_nuisance_parameters,
)
if include_nuisance_parameters:
logger.debug("Including nuisance parameters")
else:
logger.debug("Not including nuisance parameters")
# Total xsec
total_xsec = self._calculate_xsec(theta=theta)
logger.debug("Total cross section: %s pb", total_xsec)
# Rate part of Fisher information
fisher_info_rate = 0.0
rate_covariance = 0.0
if include_xsec_info:
logger.info("Evaluating rate Fisher information")
fisher_info_rate, rate_covariance = self.calculate_fisher_information_rate(
theta=theta, luminosity=luminosity, include_nuisance_parameters=include_nuisance_parameters
)
# Evaluation from weighted events
if unweighted_x_sample_file is None:
# Which events to sum over
if test_split is None or test_split <= 0.0 or test_split >= 1.0:
start_event = 0
else:
start_event = int(round((1.0 - test_split) * self.n_samples, 0)) + 1
if start_event > 0:
total_sum_weights_theta = self._calculate_xsec(theta=theta, start_event=start_event)
else:
total_sum_weights_theta = total_xsec
# Theta morphing matrix
theta_matrix = self._get_theta_benchmark_matrix(theta)
# Prepare output
fisher_info_kin = None
covariance = None
# Number of batches
n_batches = int(np.ceil((self.n_samples - start_event) / batch_size))
n_batches_verbose = max(int(round(n_batches / 10, 0)), 1)
for i_batch, (observations, weights_benchmarks) in enumerate(
self.event_loader(
batch_size=batch_size, start=start_event, include_nuisance_parameters=include_nuisance_parameters
)
):
if (i_batch + 1) % n_batches_verbose == 0:
logger.info("Evaluating kinematic Fisher information on batch %s / %s", i_batch + 1, n_batches)
else:
logger.debug("Evaluating kinematic Fisher information on batch %s / %s", i_batch + 1, n_batches)
weights_theta = mdot(theta_matrix, weights_benchmarks)
# Calculate Fisher info on this batch
if model_is_ensemble:
this_fisher_info, this_covariance = model.calculate_fisher_information(
x=observations,
obs_weights=weights_theta,
n_events=luminosity * total_xsec * np.sum(weights_theta) / total_sum_weights_theta,
calculate_covariance=calculate_covariance,
mode=mode,
)
else:
this_fisher_info = model.calculate_fisher_information(
x=observations,
weights=weights_theta,
n_events=luminosity * total_xsec * np.sum(weights_theta) / total_sum_weights_theta,
)
this_covariance = None
# Sum up results
if fisher_info_kin is None:
fisher_info_kin = this_fisher_info
elif isinstance(fisher_info_kin, list):
for i in range(len(fisher_info_kin)):
fisher_info_kin[i] += this_fisher_info[i]
else:
fisher_info_kin += this_fisher_info
if this_covariance is not None:
if covariance is None:
covariance = this_covariance
elif isinstance(covariance, list):
for i in range(len(covariance)):
covariance[i] += this_covariance[i]
else:
covariance += this_covariance
# Evaluation from unweighted event sample
else:
if model_is_ensemble:
fisher_info_kin, covariance = model.calculate_fisher_information(
x=unweighted_x_sample_file,
n_events=luminosity * total_xsec,
mode=mode,
calculate_covariance=calculate_covariance,
)
else:
fisher_info_kin = model.calculate_fisher_information(
x=unweighted_x_sample_file, n_events=luminosity * total_xsec
)
covariance = None
# Returns
if model_is_ensemble:
return fisher_info_rate + fisher_info_kin, rate_covariance + covariance
return fisher_info_rate + fisher_info_kin, rate_covariance
[docs] def calculate_fisher_information_rate(
self, theta, luminosity, cuts=None, efficiency_functions=None, include_nuisance_parameters=True
):
"""
Calculates the Fisher information in a measurement of the total cross section (without any kinematic
information).
Parameters
----------
theta : ndarray
Parameter point `theta` at which the Fisher information matrix `I_ij(theta)` is evaluated.
luminosity : float
Luminosity in pb^-1.
cuts : None or list of str, optional
Cuts. Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
efficiency_functions : list of str or None
Efficiencies. Each entry is a parseable Python expression that returns a float for the efficiency of one
component. Default value: None.
include_nuisance_parameters : bool, optional
If True, nuisance parameters are taken into account. Default value: True.
Returns
-------
fisher_information : ndarray
Expected Fisher information in the total cross section with shape `(n_parameters, n_parameters)`.
fisher_information_uncertainty : ndarray
Covariance matrix of the Fisher information matrix with shape
`(n_parameters, n_parameters, n_parameters, n_parameters)`, calculated with plain Gaussian error
propagation.
"""
include_nuisance_parameters = include_nuisance_parameters and (self.nuisance_parameters is not None)
# Get weights at benchmarks
weights_benchmarks, weights_benchmark_uncertainties = self._calculate_xsec(
cuts=cuts,
efficiency_functions=efficiency_functions,
return_benchmark_xsecs=True,
return_error=True,
include_nuisance_parameters=include_nuisance_parameters,
)
weights_benchmarks = weights_benchmarks.reshape((1, -1))
weights_benchmark_uncertainties = weights_benchmark_uncertainties.reshape((1, -1))
# Get Fisher information
fisher_info, covariance = self._calculate_fisher_information(
theta=theta,
weights_benchmarks=weights_benchmarks,
luminosity=luminosity,
sum_events=True,
calculate_uncertainty=True,
weights_benchmark_uncertainties=weights_benchmark_uncertainties,
include_nuisance_parameters=include_nuisance_parameters,
)
return fisher_info, covariance
[docs] def calculate_fisher_information_hist1d(
self,
theta,
luminosity,
observable,
bins,
histrange=None,
cuts=None,
efficiency_functions=None,
n_events_dynamic_binning=None,
):
"""
Calculates the Fisher information in the one-dimensional histogram of an (parton-level or detector-level,
depending on how the observations in the MadMiner file were calculated) observable.
Parameters
----------
theta : ndarray
Parameter point `theta` at which the Fisher information matrix `I_ij(theta)` is evaluated.
luminosity : float
Luminosity in pb^-1.
observable : str
Expression for the observable to be histogrammed. The str will be parsed by Python's `eval()` function
and can use the names of the observables in the MadMiner files.
bins : int or ndarray
If int: number of bins in the histogram, excluding overflow bins. Otherwise, defines the bin boundaries
(excluding overflow bins).
histrange : tuple of float or None, optional
Minimum and maximum value of the histogram in the form `(min, max)`. Overflow bins are always added. If
None and bins is an int, variable-width bins with equal cross section are constructed automatically.
Default value: None.
cuts : None or list of str, optional
Cuts. Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
efficiency_functions : list of str or None
Efficiencies. Each entry is a parseable Python expression that returns a float for the efficiency of one
component. Default value: None.
n_events_dynamic_binning : int or None, optional
Number of events used to calculate the dynamic binning (if histrange is None). If None, all events are used.
Note that these events are not shuffled, so if the events in the MadMiner file are sorted, using a value
different from None can cause issues. Default value: None.
Returns
-------
fisher_information : ndarray
Expected Fisher information in the histogram with shape `(n_parameters, n_parameters)`.
fisher_information_uncertainty : ndarray
Covariance matrix of the Fisher information matrix with shape
`(n_parameters, n_parameters, n_parameters, n_parameters)`, calculated with plain Gaussian error
propagation.
"""
# Input
if cuts is None:
cuts = []
if efficiency_functions is None:
efficiency_functions = []
# Binning
bin_boundaries, n_bins_total = self._calculate_binning(
bins, cuts, efficiency_functions, histrange, n_events_dynamic_binning, observable, theta
)
# Loop over batches
weights_benchmarks = np.zeros((n_bins_total, self.n_benchmarks))
weights_squared_benchmarks = np.zeros((n_bins_total, self.n_benchmarks))
for observations, weights in self.event_loader():
# Cuts
cut_filter = [self._pass_cuts(obs_event, cuts) for obs_event in observations]
observations = observations[cut_filter]
weights = weights[cut_filter]
# Efficiencies
efficiencies = np.array(
[self._eval_efficiency(obs_event, efficiency_functions) for obs_event in observations]
)
weights *= efficiencies[:, np.newaxis]
# Evaluate histogrammed observable
histo_observables = np.asarray([self._eval_observable(obs_event, observable) for obs_event in observations])
# Find bins
i_bins = np.searchsorted(bin_boundaries, histo_observables)
assert ((0 <= i_bins) & (i_bins < n_bins_total)).all(), "Wrong bin {}".format(i_bins)
# Add up
for i in range(n_bins_total):
if len(weights[i_bins == i]) > 0:
weights_benchmarks[i] += np.sum(weights[i_bins == i], axis=0)
weights_squared_benchmarks[i] += np.sum(weights[i_bins == i] ** 2, axis=0)
weights_benchmark_uncertainties = weights_squared_benchmarks ** 0.5
# Check cross sections per bin
self._check_binning_stats(weights_benchmarks, weights_benchmark_uncertainties, theta)
# Calculate Fisher information in histogram
fisher_info, covariance = self._calculate_fisher_information(
theta,
weights_benchmarks,
luminosity,
sum_events=True,
weights_benchmark_uncertainties=weights_benchmark_uncertainties,
calculate_uncertainty=True,
)
return fisher_info, covariance
def _check_binning_stats(
self, weights_benchmarks, weights_benchmark_uncertainties, theta, report=5, n_bins_first_axis=None
):
theta_matrix = self._get_theta_benchmark_matrix(theta, zero_pad=False) # (n_benchmarks_phys,)
sigma = mdot(theta_matrix, weights_benchmarks) # Shape (n_bins,)
sigma_uncertainties = mdot(theta_matrix, weights_benchmark_uncertainties) # Shape (n_bins,)
rel_uncertainties = sigma_uncertainties / np.maximum(sigma, 1.0e-12)
order = np.argsort(rel_uncertainties)[::-1]
logger.info("Bins with largest statistical uncertainties on rates:")
for i_bin in order[:report]:
bin_nd = i_bin + 1
if n_bins_first_axis is not None:
bin_nd = (i_bin // n_bins_first_axis + 1, i_bin % n_bins_first_axis + 1)
logger.info(
" Bin %s: (%.5f +/- %.5f) fb (%.0f %%)",
bin_nd,
1000.0 * sigma[i_bin],
1000.0 * sigma_uncertainties[i_bin],
100.0 * rel_uncertainties[i_bin],
)
def _calculate_binning(
self, bins, cuts, efficiency_functions, histrange, n_events_dynamic_binning, observable, theta
):
dynamic_binning = histrange is None and isinstance(bins, int)
if dynamic_binning:
n_bins_total = bins
bin_boundaries = self._calculate_dynamic_binning(
observable, theta, bins, n_events_dynamic_binning, cuts, efficiency_functions
)
logger.debug("Automatic dynamic binning: bin boundaries %s", bin_boundaries)
elif isinstance(bins, int):
n_bins_total = bins + 2
bin_boundaries = np.linspace(histrange[0], histrange[1], num=bins + 1)
else:
bin_boundaries = bins
n_bins_total = len(bins) + 1
return bin_boundaries, n_bins_total
[docs] def calculate_fisher_information_hist2d(
self,
theta,
luminosity,
observable1,
bins1,
observable2,
bins2,
histrange1=None,
histrange2=None,
cuts=None,
efficiency_functions=None,
n_events_dynamic_binning=None,
):
"""
Calculates the Fisher information in a two-dimensional histogram of two (parton-level or detector-level,
depending on how the observations in the MadMiner file were calculated) observables.
Parameters
----------
theta : ndarray
Parameter point `theta` at which the Fisher information matrix `I_ij(theta)` is evaluated.
luminosity : float
Luminosity in pb^-1.
observable1 : str
Expression for the first observable to be histogrammed. The str will be parsed by Python's `eval()` function
and can use the names of the observables in the MadMiner files.
bins1 : int or ndarray
If int: number of bins along the first axis in the histogram in the histogram, excluding overflow bins.
Otherwise, defines the bin boundaries along the first axis in the histogram (excluding overflow bins).
observable2 : str
Expression for the first observable to be histogrammed. The str will be parsed by Python's `eval()` function
and can use the names of the observables in the MadMiner files.
bins2 : int or ndarray
If int: number of bins along the second axis in the histogram in the histogram, excluding overflow bins.
Otherwise, defines the bin boundaries along the second axis in the histogram (excluding overflow bins).
histrange1 : tuple of float or None, optional
Minimum and maximum value of the first axis of the histogram in the form `(min, max)`. Overflow bins are
always added. If None, variable-width bins with equal cross section are constructed automatically. Default
value: None.
histrange2 : tuple of float or None, optional
Minimum and maximum value of the first axis of the histogram in the form `(min, max)`. Overflow bins are
always added. If None, variable-width bins with equal cross section are constructed automatically. Default
value: None.
cuts : None or list of str, optional
Cuts. Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
efficiency_functions : list of str or None
Efficiencies. Each entry is a parseable Python expression that returns a float for the efficiency of one
component. Default value: None.
n_events_dynamic_binning : int or None, optional
Number of events used to calculate the dynamic binning (if histrange is None). If None, all events are used.
Note that these events are not shuffled, so if the events in the MadMiner file are sorted, using a value
different from None can cause issues. Default value: None.
Returns
-------
fisher_information : ndarray
Expected Fisher information in the histogram with shape `(n_parameters, n_parameters)`.
fisher_information_uncertainty : ndarray
Covariance matrix of the Fisher information matrix with shape
`(n_parameters, n_parameters, n_parameters, n_parameters)`, calculated with plain Gaussian error
propagation.
"""
# Input
if cuts is None:
cuts = []
if efficiency_functions is None:
efficiency_functions = []
# Binning
bin1_boundaries, n_bins1_total = self._calculate_binning(
bins1, cuts, efficiency_functions, histrange1, n_events_dynamic_binning, observable1, theta
)
bin2_boundaries, n_bins2_total = self._calculate_binning(
bins2, cuts, efficiency_functions, histrange2, n_events_dynamic_binning, observable2, theta
)
# Loop over batches
weights_benchmarks = np.zeros((n_bins1_total, n_bins2_total, self.n_benchmarks))
weights_squared_benchmarks = np.zeros((n_bins1_total, n_bins2_total, self.n_benchmarks))
for observations, weights in self.event_loader():
# Cuts
cut_filter = [self._pass_cuts(obs_event, cuts) for obs_event in observations]
observations = observations[cut_filter]
weights = weights[cut_filter]
# Efficiencies
efficiencies = np.array(
[self._eval_efficiency(obs_event, efficiency_functions) for obs_event in observations]
)
weights *= efficiencies[:, np.newaxis]
# Evaluate histogrammed observable
histo1_observables = np.asarray(
[self._eval_observable(obs_event, observable1) for obs_event in observations]
)
histo2_observables = np.asarray(
[self._eval_observable(obs_event, observable2) for obs_event in observations]
)
# Find bins
i_bins1 = np.searchsorted(bin1_boundaries, histo1_observables)
i_bins2 = np.searchsorted(bin2_boundaries, histo2_observables)
assert ((0 <= i_bins1) & (i_bins1 < n_bins1_total)).all(), "Wrong bin {}".format(i_bins1)
assert ((0 <= i_bins2) & (i_bins2 < n_bins1_total)).all(), "Wrong bin {}".format(i_bins2)
# Add up
for i in range(n_bins1_total):
for j in range(n_bins2_total):
if len(weights[(i_bins1 == i) & (i_bins2 == j)]) > 0:
weights_benchmarks[i, j] += np.sum(weights[(i_bins1 == i) & (i_bins2 == j)], axis=0)
weights_squared_benchmarks[i, j] += np.sum(
weights[(i_bins1 == i) & (i_bins2 == j)] ** 2, axis=0
)
weights_benchmark_uncertainties = weights_squared_benchmarks ** 0.5
# Calculate Fisher information in histogram
weights_benchmarks = weights_benchmarks.reshape(-1, self.n_benchmarks)
weights_benchmark_uncertainties = weights_benchmark_uncertainties.reshape(-1, self.n_benchmarks)
self._check_binning_stats(
weights_benchmarks, weights_benchmark_uncertainties, theta, n_bins_first_axis=n_bins1_total
)
fisher_info, covariance = self._calculate_fisher_information(
theta,
weights_benchmarks,
luminosity,
sum_events=True,
weights_benchmark_uncertainties=weights_benchmark_uncertainties,
calculate_uncertainty=True,
)
return fisher_info, covariance
[docs] def histogram_of_fisher_information(
self,
theta,
observable,
nbins,
histrange,
model_file=None,
luminosity=300000.0,
cuts=None,
efficiency_functions=None,
batch_size=100000,
test_split=0.2,
):
"""
Calculates the full and rate-only Fisher information in slices of one observable. For the full
information, it will return the truth-level information if model_file is None, and otherwise the
detector-level information based on the SALLY-type score estimator saved in model_file.
Parameters
----------
theta : ndarray
Parameter point `theta` at which the Fisher information matrix `I_ij(theta)` is evaluated.
observable : str
Expression for the observable to be sliced. The str will be parsed by Python's `eval()` function
and can use the names of the observables in the MadMiner files.
nbins : int
Number of bins in the slicing, excluding overflow bins.
histrange : tuple of float
Minimum and maximum value of the slicing in the form `(min, max)`. Overflow bins are always added.
model_file : str or None, optional
If None, the truth-level Fisher information is calculated. If str, filename of a trained local score
regression model that was trained on samples from `theta` (see `madminer.ml.Estimator`). Default value:
None.
luminosity : float, optional
Luminosity in pb^-1. Default value: 300000.
cuts : None or list of str, optional
Cuts. Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
efficiency_functions : list of str or None
Efficiencies. Each entry is a parseable Python expression that returns a float for the efficiency of one
component. Default value: None.
batch_size : int, optional
If model_file is not None: Batch size. Default value: 100000.
test_split : float or None, optional
If model_file is not None: If unweighted_x_sample_file is None, this determines the fraction of weighted
events used for evaluation.
If None, all events are used (this will probably include events used during training!). Default value: 0.2.
Returns
-------
bin_boundaries : ndarray
Observable slice boundaries.
sigma_bins : ndarray
Cross section in pb in each of the slices.
fisher_infos_rate : ndarray
Expected rate-only Fisher information for each slice. Has shape `(n_slices, n_parameters, n_parameters)`.
fisher_infos_full : ndarray
Expected full Fisher information for each slice. Has shape
`(n_slices, n_parameters, n_parameters)`.
"""
# Input
if cuts is None:
cuts = []
if efficiency_functions is None:
efficiency_functions = []
# Theta morphing matrix
theta_matrix = self._get_theta_benchmark_matrix(theta)
# Number of bins
n_bins_total = nbins + 2
bin_boundaries = np.linspace(histrange[0], histrange[1], num=nbins + 1)
# Prepare output
weights_benchmarks_bins = np.zeros((n_bins_total, self.n_benchmarks))
fisher_info_full_bins = np.zeros((n_bins_total, self.n_parameters, self.n_parameters))
# Main loop: truth-level case
if model_file is None:
for observations, weights in self.event_loader():
# Cuts
cut_filter = [self._pass_cuts(obs_event, cuts) for obs_event in observations]
observations = observations[cut_filter]
weights = weights[cut_filter]
# Efficiencies
efficiencies = np.array(
[self._eval_efficiency(obs_event, efficiency_functions) for obs_event in observations]
)
weights *= efficiencies[:, np.newaxis]
# Fisher info per event
fisher_info_events = self._calculate_fisher_information(theta, weights, luminosity, sum_events=False)
# Evaluate histogrammed observable
histo_observables = np.asarray(
[self._eval_observable(obs_event, observable) for obs_event in observations]
)
# Get rid of nuisance parameters
fisher_info_events = fisher_info_events[:, : self.n_parameters, : self.n_parameters]
# Find bins
bins = np.searchsorted(bin_boundaries, histo_observables)
assert ((0 <= bins) & (bins < n_bins_total)).all(), "Wrong bin {}".format(bins)
# Add up
for i in range(n_bins_total):
if len(weights[bins == i]) > 0:
weights_benchmarks_bins[i] += np.sum(weights[bins == i], axis=0)
fisher_info_full_bins[i] += np.sum(fisher_info_events[bins == i], axis=0)
# ML case
else:
# Load SALLY model
if os.path.isdir(model_file) and os.path.exists(model_file + "/ensemble.json"):
model_is_ensemble = True
model = Ensemble()
model.load(model_file)
else:
model_is_ensemble = False
model = ScoreEstimator()
model.load(model_file)
# Nuisance parameters?
if model.n_parameters == self.n_parameters:
logger.debug(
"Found %s parameters in SALLY model, matching %s physical parameters in MadMiner file",
model.n_parameters,
self.n_parameters,
)
include_nuisance_parameters = False
elif model.n_parameters == self.n_parameters + self.n_nuisance_parameters:
logger.debug(
"Found %s parameters in SALLY model, matching %s physical parameters + %s nuisance parameters"
+ " in MadMiner file",
model.n_parameters,
self.n_parameters,
self.n_nuisance_parameters,
)
include_nuisance_parameters = True
else:
raise RuntimeError(
"Inconsistent numbers of parameters! Found %s in SALLY model, %s physical parameters in "
"MadMiner file, and %s nuisance parameters in MadMiner file.",
model.n_parameters,
self.n_parameters,
self.n_nuisance_parameters,
)
# Total xsec
total_xsec = self._calculate_xsec(theta=theta)
logger.debug("Total cross section: %s pb", total_xsec)
# Which events to sum over
if test_split is None or test_split <= 0.0 or test_split >= 1.0:
start_event = 0
else:
start_event = int(round((1.0 - test_split) * self.n_samples, 0)) + 1
if start_event > 0:
total_sum_weights_theta = self._calculate_xsec(theta=theta, start_event=start_event)
else:
total_sum_weights_theta = total_xsec
# Number of batches
n_batches = int(np.ceil((self.n_samples - start_event) / batch_size))
n_batches_verbose = max(int(round(n_batches / 10, 0)), 1)
# ML main loop
for i_batch, (observations, weights_benchmarks) in enumerate(
self.event_loader(
batch_size=batch_size, start=start_event, include_nuisance_parameters=include_nuisance_parameters
)
):
if (i_batch + 1) % n_batches_verbose == 0:
logger.info("Evaluating kinematic Fisher information on batch %s / %s", i_batch + 1, n_batches)
else:
logger.debug("Evaluating kinematic Fisher information on batch %s / %s", i_batch + 1, n_batches)
# Cuts
cut_filter = [self._pass_cuts(obs_event, cuts) for obs_event in observations]
observations = observations[cut_filter]
weights_benchmarks = weights_benchmarks[cut_filter]
# Efficiencies
efficiencies = np.array(
[self._eval_efficiency(obs_event, efficiency_functions) for obs_event in observations]
)
weights_benchmarks *= efficiencies[:, np.newaxis]
# Rescale for test_split
if test_split is not None:
correction = np.array([1.0 / test_split for obs_event in observations])
weights_benchmarks *= correction[:, np.newaxis]
weights_theta = mdot(theta_matrix, weights_benchmarks)
# Calculate Fisher info on this batch
if model_is_ensemble:
fisher_info_events, _ = model.calculate_fisher_information(
x=observations,
obs_weights=weights_theta,
n_events=luminosity * np.sum(weights_theta),
mode="score",
calculate_covariance=False,
sum_events=False,
)
else:
fisher_info_events = model.calculate_fisher_information(
x=observations,
weights=weights_theta,
n_events=luminosity * np.sum(weights_theta),
sum_events=False,
)
# Get rid of nuisance parameters
if include_nuisance_parameters:
fisher_info_events = fisher_info_events[:, : self.n_parameters, : self.n_parameters]
# Evaluate histogrammed observable
histo_observables = np.asarray(
[self._eval_observable(obs_event, observable) for obs_event in observations]
)
# Find bins
bins = np.searchsorted(bin_boundaries, histo_observables)
assert ((0 <= bins) & (bins < n_bins_total)).all(), "Wrong bin {}".format(bins)
# Add up
for i in range(n_bins_total):
if len(weights_benchmarks[bins == i]) > 0:
weights_benchmarks_bins[i] += np.sum(weights_benchmarks[bins == i], axis=0)
fisher_info_full_bins[i] += np.sum(fisher_info_events[bins == i], axis=0)
# Calculate xsecs in bins
sigma_bins = mdot(theta_matrix, weights_benchmarks_bins) # (n_bins,)
# Calculate rate-only Fisher informations in bins
fisher_info_rate_bins = self._calculate_fisher_information(
theta, weights_benchmarks_bins, luminosity, sum_events=False
)
# Get rid of nuisance parameters
fisher_info_rate_bins = fisher_info_rate_bins[:, : self.n_parameters, : self.n_parameters]
# If ML: xsec info is still missing !
if model_file is not None:
fisher_info_full_bins += fisher_info_rate_bins
return bin_boundaries, sigma_bins, fisher_info_rate_bins, fisher_info_full_bins
[docs] def histogram_of_sigma_dsigma(self, theta, observable, nbins, histrange, cuts=None, efficiency_functions=None):
"""
Fills events into histograms and calculates the cross section and first derivative for each bin
Parameters
----------
theta : ndarray
Parameter point `theta` at which the Fisher information matrix `I_ij(theta)` is evaluated.
observable : str
Expression for the observable to be sliced. The str will be parsed by Python's `eval()` function
and can use the names of the observables in the MadMiner files.
nbins : int
Number of bins in the slicing, excluding overflow bins.
histrange : tuple of float
Minimum and maximum value of the slicing in the form `(min, max)`. Overflow bins are always added.
cuts : None or list of str, optional
Cuts. Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
efficiency_functions : list of str or None
Efficiencies. Each entry is a parseable Python expression that returns a float for the efficiency of one
component. Default value: None.
Returns
-------
bin_boundaries : ndarray
Observable slice boundaries.
sigma_bins : ndarray
Cross section in pb in each of the slices.
dsigma_bins : ndarray
Cross section in pb in each of the slices.
"""
# Input
if cuts is None:
cuts = []
if efficiency_functions is None:
efficiency_functions = []
# Binning
dynamic_binning = histrange is None
if dynamic_binning:
n_bins_total = nbins
bin_boundaries = self._calculate_dynamic_binning(observable, theta, nbins, None, cuts, efficiency_functions)
else:
n_bins_total = nbins + 2
bin_boundaries = np.linspace(histrange[0], histrange[1], num=nbins + 1)
# # Number of bins
# n_bins_total = nbins + 2
# bin_boundaries = np.linspace(histrange[0], histrange[1], num=nbins + 1)
# Prepare output
weights_benchmarks_bins = np.zeros((n_bins_total, self.n_benchmarks))
# Main loop: truth-level case
for observations, weights in self.event_loader():
# Cuts
cut_filter = [self._pass_cuts(obs_event, cuts) for obs_event in observations]
observations = observations[cut_filter]
weights = weights[cut_filter]
# Efficiencies
efficiencies = np.array(
[self._eval_efficiency(obs_event, efficiency_functions) for obs_event in observations]
)
weights *= efficiencies[:, np.newaxis]
# Evaluate histogrammed observable
histo_observables = np.asarray([self._eval_observable(obs_event, observable) for obs_event in observations])
# Find bins
bins = np.searchsorted(bin_boundaries, histo_observables)
assert ((0 <= bins) & (bins < n_bins_total)).all(), "Wrong bin {}".format(bins)
# Add up
for i in range(n_bins_total):
if len(weights[bins == i]) > 0:
weights_benchmarks_bins[i] += np.sum(weights[bins == i], axis=0)
# Get morphing matrices
theta_matrix = self._get_theta_benchmark_matrix(theta, zero_pad=False) # (n_benchmarks_phys,)
dtheta_matrix = self._get_dtheta_benchmark_matrix(theta, zero_pad=False) # (n_parameters, n_benchmarks_phys)
# Calculate xsecs in bins
sigma_bins = mdot(theta_matrix, weights_benchmarks_bins) # (n_bins,)
dsigma_bins = mdot(dtheta_matrix, weights_benchmarks_bins) # (n_parameters,n_bins,)
return bin_boundaries, sigma_bins, dsigma_bins
[docs] def calculate_fisher_information_nuisance_constraints(self):
""" Builds the Fisher information term representing the Gaussian constraints on the nuisance parameters """
diagonal = np.array([0.0 for _ in range(self.n_parameters)] + [1.0 for _ in range(self.n_nuisance_parameters)])
return np.diag(diagonal)
def _calculate_fisher_information(
self,
theta,
weights_benchmarks,
luminosity=300000.0,
include_nuisance_parameters=True,
sum_events=False,
calculate_uncertainty=False,
weights_benchmark_uncertainties=None,
):
"""
Low-level function that calculates a list of full Fisher information matrices for a given parameter point and
benchmark weights. Do not use this function directly, instead use the other `FisherInformation` functions.
Parameters
----------
theta : ndarray
Parameter point.
weights_benchmarks : ndarray
Benchmark weights. Shape (n_events, n_benchmark).
luminosity : float, optional
Luminosity in pb^-1. Default value: 300000.
include_nuisance_parameters : bool, optional
If True, nuisance parameters are taken into account. Default value: True.
sum_events : bool, optional
If True, returns the summed FIsher information. Otherwise, a list of Fisher
information matrices for each event. Default value: False.
calculate_uncertainty : bool, optional
Whether an uncertainty of the result is calculated. Note that this uncertainty is currently only
implemented for the "physical" part of the FIsher information, not for the nuisance parameters. Default
value: False.
weights_benchmark_uncertainties : ndarray or None, optional
If calculate_uncertainty is True, weights_benchmark_uncertainties sets the uncertainties on each entry of
weights_benchmarks. If None, weights_benchmark_uncertainties = weights_benchmarks is assumed.
Returns
-------
fisher_information : ndarray
If sum_events is True, the return value is an nxn matrix, the total Fisher information
summed over all events. Otherwise, a n_events x n_parameters x n_parameters tensor is returned that
includes the Fisher information matrices for each event separately.
fisher_information_uncertainty : ndarray
Only returned if calculate_uncertainty is True. Covariance matrix of the Fisher information. Note that this
does not take into account any uncertainty on the nuisance parameter part of the Fisher information, and
correlations between events are neglected. Note that independent of sum_events, the covariance matrix is
always summed over the events.
"""
include_nuisance_parameters = include_nuisance_parameters and self.include_nuisance_parameters
# Get morphing matrices
theta_matrix = self._get_theta_benchmark_matrix(theta, zero_pad=False) # (n_benchmarks_phys,)
dtheta_matrix = self._get_dtheta_benchmark_matrix(theta, zero_pad=False) # (n_parameters, n_benchmarks_phys)
# Get differential xsec per event, and the derivative wrt to theta
sigma = mdot(theta_matrix, weights_benchmarks) # Shape (n_events,)
total_xsec = np.sum(sigma)
inv_sigma = sanitize_array(1.0 / sigma) # Shape (n_events,)
dsigma = mdot(dtheta_matrix, weights_benchmarks) # Shape (n_parameters, n_events)
# Calculate physics Fisher info for this event
fisher_info_phys = luminosity * np.einsum("n,in,jn->nij", inv_sigma, dsigma, dsigma)
# Nuisance parameter Fisher info
if include_nuisance_parameters:
nuisance_a = self.nuisance_morpher.calculate_a(weights_benchmarks) # Shape (n_nuisance_params, n_events)
# grad_i dsigma(x), where i is a nuisance parameter, is given by
# sigma[np.newaxis, :] * a
fisher_info_nuisance = luminosity * np.einsum("n,in,jn->nij", sigma, nuisance_a, nuisance_a)
fisher_info_mix = luminosity * np.einsum("in,jn->nij", dsigma, nuisance_a)
fisher_info_mix_transposed = luminosity * np.einsum("in,jn->nji", dsigma, nuisance_a)
n_all_parameters = self.n_parameters + self.n_nuisance_parameters
fisher_info = np.zeros((fisher_info_phys.shape[0], n_all_parameters, n_all_parameters))
fisher_info[:, : self.n_parameters, : self.n_parameters] = fisher_info_phys
fisher_info[:, : self.n_parameters, self.n_parameters :] = fisher_info_mix
fisher_info[:, self.n_parameters :, : self.n_parameters] = fisher_info_mix_transposed
fisher_info[:, self.n_parameters :, self.n_parameters :] = fisher_info_nuisance
else:
n_all_parameters = self.n_parameters
fisher_info = fisher_info_phys
# Error propagation
if calculate_uncertainty:
if weights_benchmarks.shape[1] > self.n_benchmarks_phys:
weights_benchmarks_phys = weights_benchmarks[:, np.logical_not(self.benchmark_is_nuisance)]
else:
weights_benchmarks_phys = weights_benchmarks
n_events = weights_benchmarks_phys.shape[0]
# Input uncertainties
if weights_benchmark_uncertainties is None:
weights_benchmark_uncertainties = weights_benchmarks_phys # Shape (n_events, n_benchmarks_phys)
# Build covariance matrix of inputs
# We assume full correlation between weights_benchmarks[i, b1] and weights_benchmarks[i, b2]
covariance_inputs = np.zeros((n_events, self.n_benchmarks_phys, self.n_benchmarks_phys))
for i in range(n_events):
for b1 in range(self.n_benchmarks_phys):
for b2 in range(self.n_benchmarks_phys):
if b1 == b2: # Diagonal
covariance_inputs[i, b1, b2] = weights_benchmark_uncertainties[i, b1] ** 2
else: # Off-diagonal, same event
covariance_inputs[i, b1, b2] = (
weights_benchmark_uncertainties[i, b1] * weights_benchmark_uncertainties[i, b2]
)
# Jacobian
temp1 = np.einsum("ib,jn,n->ijnb", dtheta_matrix, dsigma, inv_sigma)
temp2 = np.einsum("jb,in,n->ijnb", dtheta_matrix, dsigma, inv_sigma)
temp3 = np.einsum("b,in,jn,n,n->ijnb", theta_matrix, dsigma, dsigma, inv_sigma, inv_sigma)
temp1, temp2, temp3 = sanitize_array(temp1), sanitize_array(temp2), sanitize_array(temp3)
jacobian = luminosity * (temp1 + temp2 + temp3) # (n_parameters, n_parameters, n_events, n_benchmarks_phys)
# Covariance of information
covariance_information_phys = np.einsum("ijnb,nbc,klnc->ijkl", jacobian, covariance_inputs, jacobian)
if include_nuisance_parameters:
covariance_information = np.zeros(
(n_all_parameters, n_all_parameters, n_all_parameters, n_all_parameters)
)
covariance_information[
: self.n_parameters, : self.n_parameters, : self.n_parameters, : self.n_parameters
] = covariance_information_phys
else:
covariance_information = covariance_information_phys
if sum_events:
return np.sum(fisher_info, axis=0), covariance_information
return fisher_info, covariance_information
if sum_events:
return np.sum(fisher_info, axis=0)
return fisher_info
def _pass_cuts(self, observations, cuts=None):
"""
Checks if an event, specified by a list of observations, passes a set of cuts.
Parameters
----------
observations : list of float
list of float. Values of the observables for a single event.
cuts : list of str or None, optional
Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
Returns
-------
passes : bool
True if the event passes all cuts, False otherwise.
"""
# Check inputs
if cuts is None:
cuts = []
assert len(observations) == len(self.observables), "Mismatch between observables and observations"
# Variables that can be used in cuts
variables = math_commands()
for observable_name, observable_value in zip(self.observables, observations):
variables[observable_name] = observable_value
# Check cuts
for cut in cuts:
if not bool(eval(cut, variables)):
return False
return True
def _eval_efficiency(self, observations, efficiency_functions=None):
"""
Calculates the efficiency for an event.
Parameters
----------
observations : list of float
Values of the observables.
efficiency_functions : list of str or None
Each entry is a parseable Python expression that returns a float for the efficiency of one component.
Default value: None.
Returns
-------
efficiency : float
Efficiency (0. <= efficiency <= 1.), product of the results of the calls to all entries in
efficiency_functions.
"""
# Check inputs
if efficiency_functions is None:
efficiency_functions = []
assert len(observations) == len(self.observables), "Mismatch between observables and observations"
# Variables that can be used in efficiency functions
variables = math_commands()
for observable_name, observable_value in zip(self.observables, observations):
variables[observable_name] = observable_value
# Check cuts
efficiency = 1.0
for efficency_function in efficiency_functions:
efficiency *= float(eval(efficency_function, variables))
return efficiency
def _eval_observable(self, observations, observable_definition):
"""
Calculates an observable expression for an event.
Parameters
----------
observations : ndarray
Values of the observables for an event, should have shape `(n_observables,)`.
observable_definition : str
A parseable Python expression that returns the value of the observable to be calculated.
Returns
-------
observable_value : float
Value of the observable defined in observable_definition.
"""
assert len(observations) == len(self.observables), "Mismatch between observables and observations"
# Variables that can be used in efficiency functions
variables = math_commands()
for observable_name, observable_value in zip(self.observables, observations):
variables[observable_name] = observable_value
# Check cuts
return float(eval(observable_definition, variables))
def _calculate_xsec(
self,
theta=None,
cuts=None,
efficiency_functions=None,
return_benchmark_xsecs=False,
return_error=False,
include_nuisance_parameters=True,
start_event=0,
):
"""
Calculates the total cross section for a parameter point.
Parameters
----------
theta : ndarray or None, optional
The parameter point. If None, return_benchmark_xsecs should be True. Default value: None.
cuts : list of str or None, optional
Cuts. Each entry is a parseable Python expression that returns a bool (True if the event should pass a cut,
False otherwise). Default value: None.
efficiency_functions : list of str or None
Efficiencies. Each entry is a parseable Python expression that returns a float for the efficiency of one
component. Default value: None.
return_benchmark_xsecs : bool, optional
If True, this function returns the benchmark xsecs. Otherwise, it returns the xsec at theta. Default value:
False.
return_error : bool, optional
If True, this function also returns the square root of the summed squared weights.
include_nuisance_parameters : bool, optional
If True and if return_benchmark_xsecs is True, the nuisance benchmarks are included in the output. Default
value: True.
start_event : int, optional
Index of first event in MadMiner file to consider. Default value: 0.
Returns
-------
xsec : ndarray or float
If return_benchmark_xsecs is True, an ndarray of benchmark xsecs in pb is returned. Otherwise, the cross
section at theta in pb is returned.
xsec_uncertainty : ndarray or float
Only returned if return_error is True. Uncertainty (square root of the summed squared weights) on xsec.
"""
logger.debug("Calculating total cross section for theta = %s", theta)
# Input
if cuts is None:
cuts = []
if efficiency_functions is None:
efficiency_functions = []
assert (theta is not None) or return_benchmark_xsecs, "Please supply theta or set return_benchmark_xsecs=True"
# Total xsecs for benchmarks
xsecs_benchmarks = None
xsecs_uncertainty_benchmarks = None
for observations, weights in self.event_loader(
start=start_event, include_nuisance_parameters=include_nuisance_parameters
):
# Cuts
cut_filter = [self._pass_cuts(obs_event, cuts) for obs_event in observations]
observations = observations[cut_filter]
weights = weights[cut_filter]
# Efficiencies
efficiencies = np.array(
[self._eval_efficiency(obs_event, efficiency_functions) for obs_event in observations]
)
weights *= efficiencies[:, np.newaxis]
# xsecs
if xsecs_benchmarks is None:
xsecs_benchmarks = np.sum(weights, axis=0)
xsecs_uncertainty_benchmarks = np.sum(weights ** 2, axis=0)
else:
xsecs_benchmarks += np.sum(weights, axis=0)
xsecs_uncertainty_benchmarks += np.sum(weights ** 2, axis=0)
assert xsecs_benchmarks is not None, "No events passed cuts"
xsecs_uncertainty_benchmarks = xsecs_uncertainty_benchmarks ** 0.5
logger.debug("Benchmarks xsecs [pb]: %s", xsecs_benchmarks)
if return_benchmark_xsecs:
if return_error:
return xsecs_benchmarks, xsecs_uncertainty_benchmarks
return xsecs_benchmarks
# Translate to xsec for theta
theta_matrix = self._get_theta_benchmark_matrix(theta)
xsec = mdot(theta_matrix, xsecs_benchmarks)
xsec_error = mdot(theta_matrix, xsecs_uncertainty_benchmarks)
logger.debug("Theta matrix: %s", theta_matrix)
logger.debug("Cross section at theta: %s pb", xsec)
if return_error:
return xsec, xsec_error
return xsec
def _calculate_dynamic_binning(
self, observable, theta, n_bins, n_events=None, cuts=None, efficiency_functions=None
):
if cuts is None:
cuts = []
if efficiency_functions is None:
efficiency_functions = []
# Quantile values
quantile_values = np.linspace(0.0, 1.0, n_bins + 1)
# Get data
x_pilot, weights_pilot = next(self.event_loader(batch_size=n_events))
# Cuts
cut_filter = [self._pass_cuts(x, cuts) for x in x_pilot]
x_pilot = x_pilot[cut_filter]
weights_pilot = weights_pilot[cut_filter]
# Efficiencies
efficiencies = np.array([self._eval_efficiency(x, efficiency_functions) for x in x_pilot])
weights_pilot *= efficiencies[:, np.newaxis]
# Evaluate histogrammed observable
histo_observables_pilot = np.asarray([self._eval_observable(x, observable) for x in x_pilot])
# Weights at theta
theta_matrix = self._get_theta_benchmark_matrix(theta)
weight_theta_pilot = mdot(theta_matrix, weights_pilot)
# Bin boundaries
bin_boundaries = weighted_quantile(histo_observables_pilot, quantile_values, weight_theta_pilot)
bin_boundaries = bin_boundaries[1:-1]
return bin_boundaries
[docs]def project_information(fisher_information, remaining_components, covariance=None):
"""
Calculates projections of a Fisher information matrix, that is, "deletes" the rows and columns corresponding to
some parameters not of interest.
Parameters
----------
fisher_information : ndarray
Original n x n Fisher information.
remaining_components : list of int
List with m entries, each an int with 0 <= remaining_compoinents[i] < n. Denotes which parameters are kept, and
their new order. All other parameters or projected out.
covariance : ndarray or None, optional
The covariance matrix of the original Fisher information with shape (n, n, n, n). If None, the error on the
profiled information is not calculated. Default value: None.
Returns
-------
projected_fisher_information : ndarray
Projected m x m Fisher information, where the `i`-th row or column corresponds to the
`remaining_components[i]`-th row or column of fisher_information.
profiled_fisher_information_covariance : ndarray
Covariance matrix of the projected Fisher information matrix with shape (m, m, m, m). Only returned if
covariance is not None.
"""
n_new = len(remaining_components)
fisher_information_new = np.zeros([n_new, n_new])
# Project information
for xnew, xold in enumerate(remaining_components):
for ynew, yold in enumerate(remaining_components):
fisher_information_new[xnew, ynew] = fisher_information[xold, yold]
# Project covariance matrix
if covariance is not None:
covariance_new = np.zeros([n_new, n_new, n_new, n_new])
for xnew, xold in enumerate(remaining_components):
for ynew, yold in enumerate(remaining_components):
for znew, zold in enumerate(remaining_components):
for zznew, zzold in enumerate(remaining_components):
covariance_new[xnew, ynew, znew, zznew] = covariance[xold, yold, zold, zzold]
return fisher_information_new, covariance_new
return fisher_information_new
[docs]def profile_information(
fisher_information,
remaining_components,
covariance=None,
error_propagation_n_ensemble=1000,
error_propagation_factor=1.0e-3,
):
"""
Calculates the profiled Fisher information matrix as defined in Appendix A.4 of arXiv:1612.05261.
Parameters
----------
fisher_information : ndarray
Original n x n Fisher information.
remaining_components : list of int
List with m entries, each an int with 0 <= remaining_compoinents[i] < n. Denotes which parameters are kept, and
their new order. All other parameters or profiled out.
covariance : ndarray or None, optional
The covariance matrix of the original Fisher information with shape (n, n, n, n). If None, the error on the
profiled information is not calculated. Default value: None.
error_propagation_n_ensemble : int, optional
If covariance is not None, this sets the number of Fisher information matrices drawn from a normal distribution
for the Monte-Carlo error propagation. Default value: 1000.
error_propagation_factor : float, optional
If covariance is not None, this factor multiplies the covariance of the distribution of Fisher information
matrices. Smaller factors can avoid problems with ill-behaved Fisher information matrices. Default value: 1.e-3.
Returns
-------
profiled_fisher_information : ndarray
Profiled m x m Fisher information, where the `i`-th row or column corresponds to the
`remaining_components[i]`-th row or column of fisher_information.
profiled_fisher_information_covariance : ndarray
Covariance matrix of the profiled Fishere information matrix with shape (m, m, m, m).
"""
logger.debug("Profiling Fisher information")
n_components = len(fisher_information)
n_remaining_components = len(remaining_components)
_, information_phys, information_mix, information_nuisance = separate_information_blocks(
fisher_information, remaining_components
)
# Error propagation
if covariance is not None:
# Central value
profiled_information = profile_information(
fisher_information, remaining_components=remaining_components, covariance=None
)
# Draw toys
information_toys = np.random.multivariate_normal(
mean=fisher_information.reshape((-1,)),
cov=error_propagation_factor * covariance.reshape(n_components ** 2, n_components ** 2),
size=error_propagation_n_ensemble,
)
information_toys = information_toys.reshape(-1, n_components, n_components)
# Profile each toy
profiled_information_toys = np.array(
[
profile_information(info, remaining_components=remaining_components, covariance=None)
for info in information_toys
]
)
# Calculate ensemble covariance
toy_covariance = np.cov(profiled_information_toys.reshape(-1, n_remaining_components ** 2).T)
toy_covariance = toy_covariance.reshape(
(n_remaining_components, n_remaining_components, n_remaining_components, n_remaining_components)
)
profiled_information_covariance = toy_covariance / error_propagation_factor
# Cross-check: toy mean
toy_mean = np.mean(profiled_information_toys, axis=0)
logger.debug("Central Fisher info:\n%s\nToy mean Fisher info:\n%s", profiled_information, toy_mean)
return profiled_information, profiled_information_covariance
# Calculate profiled information
inverse_information_nuisance = np.linalg.inv(information_nuisance)
profiled_information = information_phys - information_mix.T.dot(inverse_information_nuisance.dot(information_mix))
return profiled_information