Source code for madminer.limits

from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import numpy as np
import os
from scipy.stats import chi2, poisson

from madminer.analysis import DataAnalyzer
from madminer.utils.various import mdot
from madminer.ml import ParameterizedRatioEstimator, Ensemble
from madminer.utils.histo import Histo
from madminer.sampling import SampleAugmenter
from madminer import sampling
from madminer.ml import ScoreEstimator

logger = logging.getLogger(__name__)


[docs]class AsymptoticLimits(DataAnalyzer): """ Functions to calculate observed and expected constraints, using asymptotic properties of the likelihood ratio as test statistics. Parameters ---------- filename : str Path to MadMiner file (for instance the output of `madminer.delphes.DelphesProcessor.save()`). include_nuisance_parameters : bool, optional If True, nuisance parameters are taken into account. Default value: False. """ def __init__(self, filename=None, include_nuisance_parameters=False): super(AsymptoticLimits, self).__init__(filename, False, include_nuisance_parameters)
[docs] def observed_limits( self, x_observed, theta_ranges, mode="ml", model_file=None, hist_vars=None, hist_bins=20, include_xsec=True, resolutions=25, luminosity=300000.0, n_toys_per_theta=10000, returns="pval", dof=None, ): theta_grid, return_values, i_ml = self._analyse( len(x_observed), x_observed, theta_ranges, resolutions, mode, model_file, hist_vars, hist_bins, include_xsec, None, luminosity, n_toys_per_theta, returns=returns, dof=dof, ) return theta_grid, return_values, i_ml
[docs] def expected_limits( self, theta_true, theta_ranges, mode="ml", model_file=None, hist_vars=None, hist_bins=20, include_xsec=True, resolutions=25, luminosity=300000.0, n_toys_per_theta=10000, returns="pval", dof=None, ): x_asimov, x_weights = self._asimov_data(theta_true) n_observed = luminosity * self._calculate_xsecs([theta_true])[0] theta_grid, return_values, i_ml = self._analyse( n_observed, x_asimov, theta_ranges, resolutions, mode, model_file, hist_vars, hist_bins, include_xsec, x_weights, luminosity, n_toys_per_theta, returns=returns, dof=dof, ) return theta_grid, return_values, i_ml
[docs] def asymptotic_p_value(self, log_likelihood_ratio, dof=None): if dof is None: dof = self.n_parameters q = -2.0 * log_likelihood_ratio p_value = chi2.sf(x=q, df=dof) return p_value
def _analyse( self, n_events, x, theta_ranges, theta_resolutions, mode="ml", model_file=None, hist_vars=None, hist_bins=20, include_xsec=True, obs_weights=None, luminosity=300000.0, n_toys_per_theta=10000, returns="pval", dof=None, ): logger.debug("Calculating p-values for %s expected events", n_events) assert returns in ["pval", "llr"], "returns has to be either 'pval' or 'llr'!" # Observation weights if obs_weights is None: obs_weights = np.ones(len(x)) obs_weights /= np.sum(obs_weights) obs_weights = obs_weights.astype(np.float64) # Theta grid theta_grid = self._make_theta_grid(theta_ranges, theta_resolutions) # Kinematic part if mode == "rate": log_r_kin = 0.0 elif mode == "ml": assert model_file is not None logger.info("Loading kinematic likelihood ratio estimator") model = self._load_ratio_model(model_file) logger.info("Calculating kinematic log likelihood ratio with estimator") log_r_kin = self._calculate_log_likelihood_ratio_kinematics(x, theta_grid, model) log_r_kin = log_r_kin.astype(np.float64) log_r_kin = self._clean_nans(log_r_kin) logger.debug("Raw mean -2 log r: %s", np.mean(-2.0 * log_r_kin, axis=1)) log_r_kin = n_events * np.sum(log_r_kin * obs_weights[np.newaxis, :], axis=1) logger.debug("Rescaled -2 log r: %s", -2.0 * log_r_kin) elif mode == "histo": if hist_vars is not None: logger.info("Setting up standard summary statistics") summary_function = self._make_summary_statistic_function("observables", observables=hist_vars) elif model_file is not None: logger.info("Loading score estimator and setting it up as summary statistics") model = self._load_score_model(model_file) summary_function = self._make_summary_statistic_function("sally", model=model) else: raise RuntimeError("For 'histo' mode, either provide histo_vars or model_file!") summary_stats = summary_function(x) logger.info("Creating histogram with %s bins for the summary statistics", hist_bins) histo = self._make_histo(summary_function, hist_bins, theta_grid, theta_resolutions, n_toys_per_theta) logger.info("Calculating kinematic log likelihood with histograms") log_r_kin = self._calculate_log_likelihood_histo(summary_stats, theta_grid, histo) log_r_kin = log_r_kin.astype(np.float64) log_r_kin = self._clean_nans(log_r_kin) log_r_kin = n_events * np.sum(log_r_kin * obs_weights[np.newaxis, :], axis=1) else: raise ValueError("Unknown mode {}, has to be 'ml' or 'histo' or 'xsec'".format(mode)) # xsec part if include_xsec: logger.info("Calculating rate log likelihood") log_p_xsec = self._calculate_log_likelihood_xsec(n_events, theta_grid, luminosity) logger.debug("Rate -2 log p: %s", -2.0 * log_p_xsec) else: log_p_xsec = 0.0 # Combine and get p-values logger.info("Calculating p-values") log_r = log_r_kin + log_p_xsec logger.debug("Combined -2 log r: %s", -2.0 * log_r) log_r, i_ml = self._subtract_ml(log_r) logger.debug("Min-subtracted -2 log r: %s", -2.0 * log_r) p_values = self.asymptotic_p_value(log_r, dof=dof) if returns == "llr": return theta_grid, log_r, i_ml return theta_grid, p_values, i_ml def _make_summary_statistic_function(self, mode, model=None, observables=None): if mode == "observables": assert observables is not None x_indices = self._find_x_indices(observables) logger.debug("Preparing observables %s as summary statistic function", x_indices) def summary_function(x): return x[:, x_indices] elif mode == "sally": assert isinstance(model, ScoreEstimator) logger.debug("Preparing score estimator as summary statistic function") def summary_function(x): score = model.evaluate_score(x) score = score[:, : self.n_parameters] return score else: raise RuntimeError("Unknown mode {}, has to be 'observables' or 'sally'".format(mode)) return summary_function @staticmethod def _load_ratio_model(filename): if os.path.isdir(filename): model = Ensemble() model.load(filename) else: model = ParameterizedRatioEstimator() model.load(filename) return model @staticmethod def _load_score_model(filename): if os.path.isdir(filename): model = Ensemble() model.load(filename) else: model = ScoreEstimator() model.load(filename) return model def _calculate_xsecs(self, thetas, test_split=0.2): # Test split start_event, end_event, correction_factor = self._train_test_split(False, test_split) # Total xsecs for benchmarks xsecs_benchmarks = 0.0 for observations, weights in self.event_loader(start=start_event, end=end_event): xsecs_benchmarks += np.sum(weights, axis=0) # xsecs at thetas xsecs = [] for theta in thetas: theta_matrix = self._get_theta_benchmark_matrix(theta) xsecs.append(mdot(theta_matrix, xsecs_benchmarks) * correction_factor) return np.asarray(xsecs) def _asimov_data(self, theta, test_split=0.2): start_event, end_event, correction_factor = self._train_test_split(False, test_split) x, weights_benchmarks = next(self.event_loader(start=start_event, end=end_event, batch_size=None)) weights_benchmarks *= correction_factor theta_matrix = self._get_theta_benchmark_matrix(theta) weights_theta = mdot(theta_matrix, weights_benchmarks) weights_theta /= np.sum(weights_theta) return x, weights_theta @staticmethod def _make_theta_grid(theta_ranges, resolutions): if isinstance(resolutions, int): resolutions = [resolutions for _ in range(theta_ranges)] theta_each = [] for resolution, (theta_min, theta_max) in zip(resolutions, theta_ranges): theta_each.append(np.linspace(theta_min, theta_max, resolution)) theta_grid_each = np.meshgrid(*theta_each, indexing="ij") theta_grid_each = [theta.flatten() for theta in theta_grid_each] theta_grid = np.vstack(theta_grid_each).T return theta_grid def _make_histo(self, summary_function, x_bins, theta_grid, theta_bins, n_toys_per_theta=1000): logger.info("Building histogram with %s bins per parameter and %s bins per observable", theta_bins, x_bins) histo = Histo(theta_bins, x_bins) logger.debug("Generating histo data") theta, x = self._make_histo_data(theta_grid, n_toys_per_theta * len(theta_grid)) logger.debug("Histo data has theta dimensions %s and x dimensions %s", theta.shape, x.shape) logger.debug("Calculating summary statistics") summary_stats = summary_function(x) logger.debug("Summary stats have dimensions %s", summary_stats.shape) logger.debug("Filling histogram with summary statistics") histo.fit(theta, summary_stats, fill_empty_bins=True) return histo def _make_histo_data(self, thetas, n_samples, test_split=0.2): sampler = SampleAugmenter(self.madminer_filename, include_nuisance_parameters=self.include_nuisance_parameters) x, theta, _ = sampler.sample_train_plain( theta=sampling.morphing_points(thetas), n_samples=n_samples, test_split=test_split, filename=None, folder=None, ) return theta, x def _find_x_indices(self, observables): x_names = list(self.observables.keys()) x_indices = [] for obs in observables: try: x_indices.append(x_names.index(obs)) except ValueError: raise RuntimeError("Unknown observable {}, has to be one of {}".format(obs, x_names)) logger.debug("Using x indices %s", x_indices) return x_indices @staticmethod def _calculate_log_likelihood_histo(x, theta_grid, histo): log_p = [] for theta in theta_grid: log_p.append(histo.log_likelihood(theta, x)) log_p = np.asarray(log_p) return log_p def _calculate_log_likelihood_xsec(self, n_observed, theta_grid, luminosity=300000.0): n_observed_rounded = int(np.round(n_observed, 0)) n_predicted = self._calculate_xsecs(theta_grid) * luminosity logger.debug("Observed events: %s", n_observed) logger.debug("Expected events: %s", n_predicted) log_p = poisson.logpmf(k=n_observed_rounded, mu=n_predicted) return log_p def _calculate_log_likelihood_ratio_kinematics(self, x_observed, theta_grid, model, theta1=None): if isinstance(model, ParameterizedRatioEstimator): log_r, _ = model.evaluate_log_likelihood_ratio( x=x_observed, theta=theta_grid, test_all_combinations=True, evaluate_score=False ) elif isinstance(model, Ensemble) and model.estimator_type == "parameterized_ratio": log_r, _ = model.evaluate_log_likelihood_ratio( x=x_observed, theta=theta_grid, test_all_combinations=True, evaluate_score=False, calculate_covariance=False, ) else: raise NotImplementedError( "Likelihood ratio estimation is currently only implemented for " "ParameterizedRatioEstimator instancees" ) return log_r @staticmethod def _subtract_ml(log_r): i_ml = np.argmax(log_r) log_r_subtracted = log_r[:] - log_r[i_ml] return log_r_subtracted, i_ml @staticmethod def _clean_nans(array): not_finite = np.any(~np.isfinite(array), axis=0) if np.sum(not_finite) > 0: logger.warning("Removing %s inf / nan results from calculation") array[:, not_finite] = 0.0 return array