Source code for madminer.likelihood

from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import numpy as np
import time
from scipy.stats import poisson, norm, chi2
from scipy.optimize import minimize
from itertools import product

from madminer.analysis import DataAnalyzer
from madminer.utils.various import mdot, less_logging, math_commands
from madminer.ml import ParameterizedRatioEstimator, ScoreEstimator, Ensemble, LikelihoodEstimator, load_estimator
from madminer.utils.histo import Histo
from madminer.sampling import SampleAugmenter
from madminer import sampling

logger = logging.getLogger(__name__)


[docs]class BaseLikelihood(DataAnalyzer):
[docs] def create_negative_log_likelihood(self, *args, **kwargs): raise NotImplementedError
[docs] def create_expected_negative_log_likelihood(self, *args, **kwargs): raise NotImplementedError
def _asimov_data(self, theta, test_split=0.2, sample_only_from_closest_benchmark=True, n_asimov=None): # get data start_event, end_event, correction_factor = self._train_test_split(False, test_split) x, weights_benchmarks = next( self.event_loader( start=start_event, end=end_event, batch_size=n_asimov, generated_close_to=theta if sample_only_from_closest_benchmark else None, ) ) weights_benchmarks *= correction_factor # morphing theta_matrix = self._get_theta_benchmark_matrix(theta) weights_theta = mdot(theta_matrix, weights_benchmarks) weights_theta /= np.sum(weights_theta) return x, weights_theta def _log_likelihood(self, *args, **kwargs): raise NotImplementedError def _log_likelihood_kinematic(self, *args, **kwargs): raise NotImplementedError def _log_likelihood_poisson( self, n_observed, theta, nu, luminosity=300000.0, weights_benchmarks=None, total_weights=None ): if total_weights is not None: theta_matrix = self._get_theta_benchmark_matrix(theta) xsec = mdot(theta_matrix, total_weights) if weights_benchmarks is None: xsec = self.xsecs(thetas=[theta], nus=[nu], partition="train", generated_close_to=theta)[0][0] else: weights = self._weights([theta], [nu], weights_benchmarks)[0] xsec = sum(weights) n_predicted = xsec * luminosity if xsec < 0: logger.warning("Total cross section is negative (%s pb) at theta=%s)", xsec, theta) n_predicted = 10 ** -5 n_observed_rounded = int(np.round(n_observed, 0)) log_likelihood = poisson.logpmf(k=n_observed_rounded, mu=n_predicted) logger.debug( "Poisson log likelihood: %s (%s expected, %s observed at theta=%s)", log_likelihood, n_predicted, n_observed_rounded, theta, ) return log_likelihood def _log_likelihood_constraint(self, nu): log_p = np.sum(norm.logpdf(nu)) logger.debug("Constraint log likelihood: %s", log_p) return log_p
[docs]class NeuralLikelihood(BaseLikelihood):
[docs] def create_negative_log_likelihood( self, model_file, x_observed, n_observed=None, x_observed_weights=None, include_xsec=True, luminosity=300000.0, mode="weighted", n_weighted=10000, ): estimator = load_estimator(model_file) if n_observed is None: n_observed = len(x_observed) # Weighted sampled if mode == "weighted": weights_benchmarks = self._get_weights_benchmarks(n_toys=n_weighted, test_split=None) else: weights_benchmarks = None def nll(params): # Just return the expected Length if params is None: return self.n_nuisance_parameters + self.n_parameters # Process input if len(params) != self.n_nuisance_parameters + self.n_parameters: logger.warning( "Number of parameters is %s, expected %s physical parameters and %s nuisance paramaters", len(params), self.n_parameters, self.n_nuisance_parameters, ) theta = params[: self.n_parameters] nu = params[self.n_parameters :] if len(nu) == 0: nu = None # Compute Log Likelihood log_likelihood = self._log_likelihood( estimator, n_observed, x_observed, theta, nu, include_xsec, luminosity, x_observed_weights, weights_benchmarks, ) return -log_likelihood return nll
[docs] def create_expected_negative_log_likelihood( self, model_file, theta_true, nu_true, include_xsec=True, luminosity=300000.0, n_asimov=None, mode="sampled", n_weighted=10000, ): x_asimov, x_weights = self._asimov_data(theta_true, n_asimov=n_asimov) n_observed = luminosity * self.xsecs([theta_true], [nu_true])[0] return self.create_negative_log_likelihood( model_file, x_asimov, n_observed, x_weights, include_xsec, luminosity, mode, n_weighted )
def _log_likelihood( self, estimator, n_events, xs, theta, nu, include_xsec=True, luminosity=300000.0, x_weights=None, weights_benchmarks=None, ): """ Low-level function which calculates the value of the log-likelihood ratio. See create_negative_log_likelihood for options. """ log_likelihood = 0.0 if include_xsec: log_likelihood = log_likelihood + self._log_likelihood_poisson( n_events, theta, nu, luminosity, weights_benchmarks ) if x_weights is None: x_weights = n_events / float(len(xs)) * np.ones(len(xs)) else: x_weights = x_weights * n_events / np.sum(x_weights) log_likelihood_events = self._log_likelihood_kinematic(estimator, xs, theta, nu) log_likelihood = log_likelihood + np.dot(x_weights, log_likelihood_events) if nu is not None: log_likelihood = log_likelihood + self._log_likelihood_constraint(nu) logger.debug("Total log likelihood: %s", log_likelihood) return log_likelihood def _log_likelihood_kinematic(self, estimator, xs, theta, nu): """ Low-level function which calculates the value of the kinematic part of the log-likelihood. See create_negative_log_likelihood for options. """ if nu is not None: theta = np.concatenate((theta, nu), axis=0) if isinstance(estimator, ParameterizedRatioEstimator): with less_logging(): log_r, _ = estimator.evaluate_log_likelihood_ratio( x=xs, theta=theta.reshape((1, -1)), test_all_combinations=True, evaluate_score=False ) elif isinstance(estimator, LikelihoodEstimator): with less_logging(): log_r, _ = estimator.evaluate_log_likelihood( x=xs, theta=theta.reshape((1, -1)), test_all_combinations=True, evaluate_score=False ) elif isinstance(estimator, Ensemble) and estimator.estimator_type == "parameterized_ratio": with less_logging(): log_r, _ = estimator.evaluate_log_likelihood_ratio( x=xs, theta=theta.reshape((1, -1)), test_all_combinations=True, evaluate_score=False, calculate_covariance=False, ) elif isinstance(estimator, Ensemble) and estimator.estimator_type == "likelihood": with less_logging(): log_r, _ = estimator.evaluate_log_likelihood( x=xs, theta=theta.reshape((1, -1)), test_all_combinations=True, evaluate_score=False, calculate_covariance=False, ) else: raise NotImplementedError( "Likelihood (ratio) estimation is currently only implemented for " "ParameterizedRatioEstimator and LikelihoodEstimator and Ensemble instancees" ) logger.debug("Kinematic log likelihood (ratio): %s", log_r.flatten()) log_r = log_r.flatten() log_r = log_r.astype(np.float64) log_r = self._clean_nans(log_r) return log_r def _get_weights_benchmarks(self, n_toys, test_split=None): """ Low-level function that creates weighted events and returns weights """ start_event, end_event, correction_factor = self._train_test_split(True, test_split) x, weights_benchmarks = self.weighted_events(start_event=start_event, end_event=end_event, n_draws=n_toys) weights_benchmarks *= self.n_samples / n_toys return weights_benchmarks @staticmethod def _clean_nans(array): not_finite = np.any(~np.isfinite(array), axis=0) if np.sum(not_finite) > 0: logger.warning("Removing %s inf / nan results from calculation") array[:, not_finite] = 0.0 return array
[docs]class HistoLikelihood(BaseLikelihood):
[docs] def create_negative_log_likelihood( self, x_observed, observables=None, score_components=None, n_observed=None, x_observed_weights=None, include_xsec=True, luminosity=300000.0, mode="sampled", n_histo_toys=100000, model_file=None, hist_bins=None, thetas_binning=None, test_split=None, ): """ Returns a function which calculates the negative log likelihood for a given parameter point, evaulated with a dataset (x_observed,n_observed,x_observed_weights). Parameters ---------- x_observed : list of ndarray Set of event observables with shape `(n_events, n_observables)`. observables : list of str or None , optional Kinematic variables used in the histograms. The names are the same as used for instance in `DelphesReader`. score_components : None or list of int, optional Defines the score components used. Default value: None. n_observed : int or None , optional If int, number of observed events. If None, n_observed is defined by the length of x_observed. Default: None. x_observed_weights : list of float or None , optional List of event weights with shape `(n_events)`. If None, all events have equal weights. Default: None. include_xsec : bool, optional Whether the Poisson likelihood representing the total number of events is included in the analysis. Default value: True. luminosity : float, optional Integrated luminosity in pb^{-1} assumed in the analysis. Default value: 300000. mode : {"weighted" , "sampled", "histo"} , optional If "sampled", for each evaulation of the likelihood function, a separate set of events are sampled and histogram is created to construct the likelihood function. If "weighted", first a set of weighted events is sampled which is then used to create histograms. Default value: "sampled" n_histo_toys : int or None, optional Number of events drawn to construct the histograms used. If None and weighted_histo is True, all events in the training fraction of the MadMiner file are used. If None and weighted_histo is False, 100000 events are used. Default value: 100000. model_file : str or None, optional Filename of a saved neural network estimating the score. Required if score_components is not None. Default value: None. hist_bins : int or list of (int or ndarray) or None, optional Defines the histogram binning. If int, gives the number of bins automatically chosen for each summary statistic. If list, each entry corresponds to one summary statistic (e.g. kinematic variable specified by hist_vars); an int entry corresponds to the number of automatically chosen bins, an ndarray specifies the bin edges along this dimension explicitly. If None, the bins are chosen according to the defaults: for one summary statistic the default is 25 bins, for 2 it's 8 bins along each direction, for more it's 5 per dimension. Default value: None. thetas_binning : list of ndarray or None Specifies the parameter points used to determine the optimal binning. This is requires if hist_bins doesn't already fully specify the binning of the histogram. Default value : None test_split : Returns ------- negative_log_likelihood : likelihood Function that evaluates the negative log likelihood for a given parameter point """ # Check input and join observables and score components - nothing interesting if observables is None: observables = list([]) if score_components is None: score_components = list([]) if observables == [] and score_components == []: logger.info("No observables and scores provided. Calculate LLR due to rate and set include_xsec=True.") include_xsec = True observables = list(observables) + list(score_components) if n_observed is None: n_observed = len(x_observed) supported_modes = ["sampled", "weighted", "histo"] if mode not in supported_modes: raise ValueError("Mode %s unknown. Choose one of the following methods: %s", mode, supported_modes) # Load model - nothing interesting if score_components != []: assert all([isinstance(score_component, int) for score_component in score_components]) if model_file is None: raise ValueError("You need to provide a model_file!") model = load_estimator(model_file) else: model = None # Create summary function logger.info("Setting up standard summary statistics") summary_function = None if observables != []: summary_function = self._make_summary_statistic_function(observables=observables, model=model) # Weighted sampled data, summary_stats, weights_benchmarks = None, None, None if mode == "weighted" or mode == "histo": logger.info("Getting weighted data") data, weights_benchmarks = self._make_histo_data_weighted( summary_function=summary_function, n_toys=n_histo_toys, test_split=test_split ) if (mode == "weighted" or mode == "histo") and observables != []: summary_stats = summary_function(x_observed) # find binning logger.info("Setting up binning") if observables != [] and ( hist_bins is None or not all([hasattr(hist_bin, "__len__") for hist_bin in hist_bins]) ): if thetas_binning is None: raise ValueError("Your input requires adaptive binning: thetas_binning can not be None.") hist_bins = self._find_bins(hist_bins=hist_bins, n_summary_stats=len(observables)) hist_bins = self._fixed_adaptive_binning( thetas_binning=thetas_binning, x_bins=hist_bins, data=data, weights_benchmarks=weights_benchmarks, n_toys=n_histo_toys, summary_function=summary_function, ) logger.info("Use binning: %s", hist_bins) if mode == "histo": if hist_bins is not None: benchmark_histograms, _ = self._get_benchmark_histograms(data, weights_benchmarks, hist_bins) total_weights = np.array( [sum(benchmark_histogram.flatten()) for benchmark_histogram in benchmark_histograms] ) else: benchmark_histograms = None total_weights = np.array([sum(weights_benchmark) for weights_benchmark in weights_benchmarks.T]) else: total_weights, benchmark_histograms = None, None # define negative likelihood function def nll(params): # Just return the expected Length if params is None: return self.n_nuisance_parameters + self.n_parameters # Process input if len(params) != self.n_nuisance_parameters + self.n_parameters: logger.warning( "Number of parameters is %s, expected %s physical parameters and %s nuisance paramaters", len(params), self.n_parameters, self.n_nuisance_parameters, ) theta = params[: self.n_parameters] nu = params[self.n_parameters :] if len(nu) == 0: nu = None # Compute Log Likelihood log_likelihood = self._log_likelihood( n_events=n_observed, xs=x_observed, theta=theta, nu=nu, include_xsec=include_xsec, luminosity=luminosity, x_weights=x_observed_weights, mode=mode, n_histo_toys=n_histo_toys, hist_bins=hist_bins, summary_function=summary_function, data=data, summary_stats=summary_stats, weights_benchmarks=weights_benchmarks, benchmark_histograms=benchmark_histograms, total_weights=total_weights, ) return -log_likelihood return nll
[docs] def create_expected_negative_log_likelihood( self, theta_true, nu_true, observables=None, score_components=None, include_xsec=True, luminosity=300000.0, n_asimov=None, mode="sampled", n_histo_toys=100000, model_file=None, hist_bins=None, thetas_binning=None, test_split=None, ): """ Returns a function which calculates the expected negative log likelihood for a given parameter point, evaulated with test data sampled according to theta_true. Parameters ---------- theta_true : ndarray Specifies the physical paramaters according to which the test data is sampled. nu_true : ndarray Specifies the nuisance paramaters according to which the test data is sampled. observables : list of str or None , optional Kinematic variables used in the histograms. The names are the same as used for instance in `DelphesReader`. score_components : None or list of int, optional Defines the score components used. Default value: None. include_xsec : bool, optional Whether the Poisson likelihood representing the total number of events is included in the analysis. Default value: True. luminosity : float, optional Integrated luminosity in pb^{-1} assumed in the analysis. Default value: 300000. n_asimov : int or None, optional Size of the Asimov sample. If None, all weighted events in the MadMiner file are used. Default value: None. mode : {"weighted" , "sampled"} , optional If "sampled", for each evaulation of the likelihood function, a separate set of events are sampled and histogram is created to construct the likelihood function. If "weighted", first a set of weighted events is sampled which is then used to create histograms. Default value: "sampled" n_histo_toys : int or None, optional Number of events drawn to construct the histograms used. If None and weighted_histo is True, all events in the training fraction of the MadMiner file are used. If None and weighted_histo is False, 100000 events are used. Default value: 100000. model_file : str or None, optional Filename of a saved neural network estimating the score. Required if score_components is not None. Default value: None. hist_bins : int or list of (int or ndarray) or None, optional Defines the histogram binning. If int, gives the number of bins automatically chosen for each summary statistic. If list, each entry corresponds to one summary statistic (e.g. kinematic variable specified by hist_vars); an int entry corresponds to the number of automatically chosen bins, an ndarray specifies the bin edges along this dimension explicitly. If None, the bins are chosen according to the defaults: for one summary statistic the default is 25 bins, for 2 it's 8 bins along each direction, for more it's 5 per dimension. Default value: None. thetas_binning : list of ndarray or None Specifies the parameter points used to determine the optimal binning. If none, theta_true will be used. Default value : None test_split : Returns ------- negative_log_likelihood : likelihood Function that evaluates the negative log likelihood for a given parameter point """ if thetas_binning is None: thetas_binning = [theta_true] x_asimov, x_weights = self._asimov_data(theta_true, n_asimov=n_asimov) n_observed = luminosity * self.xsecs([theta_true], [nu_true])[0] return self.create_negative_log_likelihood( observables=observables, score_components=score_components, x_observed=x_asimov, n_observed=n_observed, x_observed_weights=x_weights, include_xsec=include_xsec, luminosity=luminosity, mode=mode, n_histo_toys=n_histo_toys, model_file=model_file, hist_bins=hist_bins, thetas_binning=thetas_binning, )
def _log_likelihood( self, n_events, xs, theta, nu, include_xsec=True, luminosity=300000.0, x_weights=None, mode="sampled", n_histo_toys=100000, hist_bins=None, summary_function=None, data=None, summary_stats=None, weights_benchmarks=None, benchmark_histograms=None, total_weights=None, ): """ Low-level function which calculates the value of the log-likelihood ratio. See create_negative_log_likelihood for options. """ log_likelihood = 0.0 if include_xsec: log_likelihood = log_likelihood + self._log_likelihood_poisson( n_events, theta, nu, luminosity, weights_benchmarks, total_weights ) if summary_function is not None: if x_weights is None: x_weights = n_events / float(len(xs)) * np.ones(len(xs)) else: x_weights = x_weights * n_events / np.sum(x_weights) log_likelihood_events = self._log_likelihood_kinematic( xs, theta, nu, mode, n_histo_toys, hist_bins, summary_function, data, summary_stats, weights_benchmarks, benchmark_histograms, ) log_likelihood = log_likelihood + np.dot(x_weights, log_likelihood_events) if nu is not None: log_likelihood = log_likelihood + self._log_likelihood_constraint(nu) logger.debug("Total log likelihood: %s", log_likelihood) return log_likelihood def _log_likelihood_kinematic( self, xs, theta, nu, mode="sampled", n_histo_toys=100000, hist_bins=None, summary_function=None, data=None, summary_stats=None, weights_benchmarks=None, benchmark_histograms=None, ): """ Low-level function which calculates the value of the kinematic part of the log-likelihood. See create_negative_log_likelihood for options. """ # shape of theta if nu is not None: theta = np.concatenate((theta, nu), axis=0) # Calculate summary statistics if summary_stats is None: summary_stats = summary_function(xs) # Make histograms if mode == "sampled": data = self._make_histo_data_sampled( summary_function=summary_function, theta=theta, n_histo_toys=n_histo_toys ) histo = Histo(data, weights=None, bins=hist_bins, epsilon=1.0e-12) elif mode == "weighted": weights = self._weights([theta], [nu], weights_benchmarks)[0] histo = Histo(data, weights=weights, bins=hist_bins, epsilon=1.0e-12) elif mode == "histo": bin_centers = [np.array([(bins[i] + bins[i + 1]) / 2 for i in range(len(bins) - 1)]) for bins in hist_bins] bin_centers = np.array(list(product(*bin_centers))) histo = self._histogram_morphing(theta, benchmark_histograms, hist_bins, bin_centers) # calculate log-likelihood from histogram log_p = histo.log_likelihood(summary_stats) return log_p def _make_summary_statistic_function(self, observables=None, model=None): """ Low-level function that returns a function "summary_function" which evaluates the summary statistic for an event. """ variables = math_commands() x_indices = self._find_x_indices(observables) def summary_function(xs): # only prefined observables - very fast if not "score" in x_indices and not "function" in x_indices: return xs[:, x_indices] # evaulate some observables using eval() - more slow data_events = [] for x in xs: data_event = [] if "function" in x_indices: for observable_name, observable_value in zip(self.observables, x): variables[observable_name] = observable_value if "score" in x_indices: if isinstance(model, ScoreEstimator): score = model.evaluate_score(x=np.array([x]))[0] elif isinstance(model, Ensemble) and model.estimator_type == "score": score, _ = model.evaluate_score(x=np.array([x]), calculate_covariance=False)[0] else: raise RuntimeError("Model has to be 'ScoreEstimator' or Ensemble thereof.") for observable, x_indice in zip(observables, x_indices): if x_indice == "function": data_event.append(float(eval(observable, variables))) elif x_indice == "score": data_event.append(score[observable]) else: data_event.append(x[x_indice]) data_events.append(data_event) return np.array(data_events) return summary_function def _find_x_indices(self, observables): """ Low-level function that finds the indices corresponding to the observables and returns them as a list. """ x_names = list(self.observables.keys()) x_indices = [] for obs in observables: if isinstance(obs, int): x_indices.append("score") else: try: x_indices.append(x_names.index(obs)) except: x_indices.append("function") logger.debug("Using x indices %s", x_indices) return x_indices def _make_histo_data_sampled(self, summary_function, theta, n_histo_toys=1000): """ Low-level function that creates histogram data sampled from one benchmark """ # Get unweighted events with less_logging(): sampler = SampleAugmenter(self.madminer_filename, include_nuisance_parameters=True) x, theta, _ = sampler.sample_train_plain( theta=sampling.morphing_point(theta), n_samples=n_histo_toys, test_split=False, filename=None, folder=None, ) # Calculate summary stats data = summary_function(x) return data def _make_histo_data_weighted(self, summary_function, n_toys, test_split=None): """ Low-level function that creates weighted histogram data """ # Get weighted events start_event, end_event, correction_factor = self._train_test_split(True, test_split) x, weights_benchmarks = self.weighted_events(start_event=start_event, end_event=end_event, n_draws=n_toys) weights_benchmarks *= self.n_samples / n_toys # Calculate summary stats if summary_function is not None: data = summary_function(x) else: data = None return data, weights_benchmarks def _find_bins(self, hist_bins, n_summary_stats): """ Low-level function that sets up the binning of the histograms (I) """ if hist_bins is None: if n_summary_stats == 1: hist_bins = [25] elif n_summary_stats == 2: hist_bins = [8, 8] else: hist_bins = [5 for _ in range(n_summary_stats)] elif isinstance(hist_bins, int): # hist_bins = tuple([hist_bins] * n_summary_stats) hist_bins = [hist_bins for _ in range(n_summary_stats)] return hist_bins def _fixed_adaptive_binning( self, thetas_binning, x_bins, data=None, weights_benchmarks=None, n_toys=None, test_split=None, summary_function=None, ): """ Low-level function that sets up the binning of the histograms (II) """ # Get weighted data if data is None: data, weights_benchmarks = self._make_histo_data_weighted( summary_function=summary_function, n_toys=n_toys, test_split=test_split ) # Calculate weights for thetas weights = self._weights(thetas_binning, None, weights_benchmarks) weights = np.asarray(weights) weights = np.mean(weights, axis=0) # Histogram histo = Histo(data, weights, x_bins, epsilon=1.0e-12) x_bins = histo.edges return x_bins def _get_benchmark_histograms(self, data, weights_benchmarks, hist_bins, epsilon=1.0e-12): """ Low-level function that returns histograms for morphing benchmarks """ # get histogram bins histo_benchmarks = [] for weights in weights_benchmarks.T: # histo = Histo(data, weights=weights, bins=hist_bins, epsilon=1.0e-12) ranges = [(edges[0], edges[-1]) for edges in hist_bins] histo, _ = np.histogramdd(data, bins=hist_bins, range=ranges, normed=False, weights=weights) histo[:] += epsilon histo_benchmarks.append(histo) # get bin centers bin_centers = [np.array([(bins[i] + bins[i + 1]) / 2 for i in range(len(bins) - 1)]) for bins in hist_bins] bin_centers = np.array(list(product(*bin_centers))) return histo_benchmarks, bin_centers def _histogram_morphing(self, theta, histogram_benchmarks, hist_bins, bin_centers): """ Low-level function that morphes histograms """ # get binning hist_nbins = [len(bins) - 1 for bins in hist_bins] # get array of flattened histograms flattened_histo_weights = [] for histo in histogram_benchmarks: flattened_histo_weights.append(histo.flatten()) flattened_histo_weights = np.array(flattened_histo_weights).T # calculate dot product theta_matrix = self._get_theta_benchmark_matrix(theta) histo_weights_theta = mdot(theta_matrix, flattened_histo_weights) # create histogram histo = Histo(bin_centers, weights=histo_weights_theta, bins=hist_bins, epsilon=1.0e-12) return histo def _clean_nans(self, array): not_finite = np.any(~np.isfinite(array), axis=0) if np.sum(not_finite) > 0: logger.warning("Removing %s inf / nan results from calculation") array[:, not_finite] = 0.0 return array
[docs]def fix_params(negative_log_likelihood, theta, fixed_components): """ Function that reduces the dimensionality of a likelihood function by fixing some of the components. Parameters ---------- negative_log_likelihood : likelihood Function returned by Likelihood class (for example NeuralLikelihood.create_expected_negative_log_likelihood()`) which takes an n-dimensional input parameter. theta : list of float m-dimensional vector of coordinate which will be fixed. fixed_components : list of int m-dimensional vector of coordinate indices provided in theta. Example: fixed_components=[0,1] will fix the 1st and 2nd component of the paramater point. Returns ------- constrained_nll_negative_log_likelihood : likelihood Constrained likelihood function which takes an n-m dimensional input parameter. """ def constrained_nll(params): # Just return the expected Length n_dimension = negative_log_likelihood(None) if params is None: return n_dimension - len(fixed_components) # Process input if len(theta) != len(fixed_components): logger.warning("Length of fixed_components and theta should be the same") if len(params) + len(fixed_components) != n_dimension: logger.warning("Length of params should be %s", n_dimension - len(fixed_components)) # Initialize full paramaters params_full = np.zeros(n_dimension) # fill fixed components for icomp, thetacomp in zip(fixed_components, theta): params_full[icomp] = thetacomp # fill other components iparam = 0 for i in range(len(params_full)): if i not in fixed_components: params_full[i] = params[iparam] iparam += 1 # Return params_full = np.array(params_full) return negative_log_likelihood(params_full) return constrained_nll
[docs]def project_log_likelihood( negative_log_likelihood, remaining_components=None, grid_ranges=None, grid_resolutions=25, dof=None, thetas_eval=None, ): """ Takes a likelihood function depending on N parameters, and evaluates for a set of M-dimensional parameter points (either grid or explicitly specified) while the remaining N-M paramters are set to zero. Parameters ---------- negative_log_likelihood : likelihood Function returned by Likelihood class (for example NeuralLikelihood.create_expected_negative_log_likelihood()`). remaining_components : list of int or None , optional List with M entries, each an int with 0 <= remaining_components[i] < N. Denotes which parameters are kept, and their new order. All other parameters or projected out (set to zero). If None, all components are kept. Default: None grid_ranges : list of (tuple of float) or None, optional Specifies the boundaries of the parameter grid on which the p-values are evaluated. It should be `[(min, max), (min, max), ..., (min, max)]`, where the list goes over all parameters and `min` and `max` are float. If None, thetas_eval has to be given. Default: None. grid_resolutions : int or list of int, optional Resolution of the parameter space grid on which the p-values are evaluated. If int, the resolution is the same along every dimension of the hypercube. If list of int, the individual entries specify the number of points along each parameter individually. Doesn't have any effect if grid_ranges is None. Default value: 25. dof : int or None, optional If not None, sets the number of parameters for the calculation of the p-values. If None, the overall number of parameters is used. Default value: None. thetas_eval : ndarray or None , optional Manually specifies the parameter point at which the likelihood and p-values are evaluated. If None, grid_ranges and resolution are used instead to construct a regular grid. Default value: None. Returns ------- parameter_grid : ndarray Parameter points at which the p-values are evaluated with shape `(n_grid_points, n_parameters)`. p_values : ndarray Observed p-values for each parameter point on the grid, with shape `(n_grid_points,)`. mle : int Index of the parameter point with the best fit (largest p-value / smallest -2 log likelihood ratio). log_likelihood_ratio : ndarray or None log likelihood ratio based only on kinematics for each point of the grid, with shape `(n_grid_points,)`. """ # Components n_parameters = negative_log_likelihood(None) if remaining_components is None: remaining_components = range(n_parameters) m_paramaters = len(remaining_components) # DOF if dof is None: dof = m_paramaters # Theta grid if thetas_eval is None and grid_resolutions is None: raise ValueError("thetas_eval and grid_resolutions cannot both be None") elif thetas_eval is not None and grid_resolutions is not None: raise ValueError("thetas_eval and grid_resolutions cannot both be set, make up your mind!") elif thetas_eval is None: if isinstance(grid_resolutions, int): grid_resolutions = [grid_resolutions for _ in range(grid_ranges)] if len(grid_resolutions) != m_paramaters: raise ValueError("Dimension of grid should be the same as number of remaining components!") theta_each = [] for resolution, (theta_min, theta_max) in zip(grid_resolutions, grid_ranges): theta_each.append(np.linspace(theta_min, theta_max, resolution)) theta_grid_each = np.meshgrid(*theta_each, indexing="ij") theta_grid_each = [theta.flatten() for theta in theta_grid_each] theta_grid_mdim = np.vstack(theta_grid_each).T else: theta_grid_mdim = thetas_eval # Obtain a theta_grid in n dimensions theta_grid_ndim = [] for theta_mdim in theta_grid_mdim: theta_ndim = np.zeros([n_parameters]) for i, theta in zip(remaining_components, theta_mdim): theta_ndim[i] = theta theta_grid_ndim.append(theta_ndim) # evaluate -2 E[log r] log_r = np.array([-1.0 * negative_log_likelihood(theta) for theta in theta_grid_ndim]) i_ml = np.argmax(log_r) log_r = log_r[:] - log_r[i_ml] m2_log_r = -2.0 * log_r p_value = chi2.sf(x=m2_log_r, df=dof) return theta_grid_mdim, p_value, i_ml, log_r
[docs]def profile_log_likelihood( negative_log_likelihood, remaining_components=None, grid_ranges=None, grid_resolutions=25, thetas_eval=None, theta_start=None, dof=None, method="TNC", ): """ Takes a likelihood function depending on N parameters, and evaluates for a set of M-dimensional parameter points (either grid or explicitly specified) while the remaining N-M paramters are profiled over. Parameters ---------- negative_log_likelihood : likelihood Function returned by Likelihood class (for example NeuralLikelihood.create_expected_negative_log_likelihood()`). remaining_components : list of int or None , optional List with M entries, each an int with 0 <= remaining_components[i] < N. Denotes which parameters are kept, and their new order. All other parameters or projected out (set to zero). If None, all components are kept. Default: None grid_ranges : list of (tuple of float) or None, optional Specifies the boundaries of the parameter grid on which the p-values are evaluated. It should be `[(min, max), (min, max), ..., (min, max)]`, where the list goes over all parameters and `min` and `max` are float. If None, thetas_eval has to be given. Default: None. grid_resolutions : int or list of int, optional Resolution of the parameter space grid on which the p-values are evaluated. If int, the resolution is the same along every dimension of the hypercube. If list of int, the individual entries specify the number of points along each parameter individually. Doesn't have any effect if grid_ranges is None. Default value: 25. thetas_eval : ndarray or None , optional Manually specifies the parameter point at which the likelihood and p-values are evaluated. If None, grid_ranges and resolution are used instead to construct a regular grid. Default value: None. theta_start : ndarray or None , optional Manually specifies a parameter point which is the starting point for the minimization algorithm which find the maximum likelihood point. If None, theta_start = 0 is used. Default is None. dof : int or None, optional If not None, sets the number of parameters for the calculation of the p-values. If None, the overall number of parameters is used. Default value: None. method : {"TNC", " L-BFGS-B"} , optional Mimization method used. Default value: "TNC" Returns ------- parameter_grid : ndarray Parameter points at which the p-values are evaluated with shape `(n_grid_points, n_parameters)`. p_values : ndarray Observed p-values for each parameter point on the grid, with shape `(n_grid_points,)`. mle : int Index of the parameter point with the best fit (largest p-value / smallest -2 log likelihood ratio). log_likelihood_ratio : ndarray or None log likelihood ratio based only on kinematics for each point of the grid, with shape `(n_grid_points,)`. """ # Components n_parameters = negative_log_likelihood(None) if remaining_components is None: remaining_components = range(n_parameters) m_paramaters = len(remaining_components) # DOF if dof is None: dof = m_paramaters # Method supported_methods = ["TNC", " L-BFGS-B"] if method not in supported_methods: raise ValueError("Method %s unknown. Choose one of the following methods: %s", method, supported_methods) # Initial guess for theta if theta_start is None: theta_start = np.zeros(n_parameters) # Theta grid if thetas_eval is None and grid_resolutions is None: raise ValueError("thetas_eval and grid_resolutions cannot both be None") elif thetas_eval is not None and grid_resolutions is not None: raise ValueError("thetas_eval and grid_resolutions cannot both be set, make up your mind!") elif thetas_eval is None: if isinstance(grid_resolutions, int): grid_resolutions = [grid_resolutions for _ in range(grid_ranges)] if len(grid_resolutions) != m_paramaters: raise ValueError("Dimension of grid should be the same as number of remaining components!") theta_each = [] for resolution, (theta_min, theta_max) in zip(grid_resolutions, grid_ranges): theta_each.append(np.linspace(theta_min, theta_max, resolution)) theta_grid_each = np.meshgrid(*theta_each, indexing="ij") theta_grid_each = [theta.flatten() for theta in theta_grid_each] theta_grid_mdim = np.vstack(theta_grid_each).T else: theta_grid_mdim = thetas_eval # Obtain global minimum - Eq.(59) in 1805.00020 result = minimize(negative_log_likelihood, x0=theta_start, method=method) best_fit_global = result.x # scan over grid log_r = [] pscan = 0.01 start_time = time.time() for iscan, theta_mdim in enumerate(theta_grid_mdim): # logger output if iscan / len(theta_grid_mdim) > pscan: elapsed_time = time.time() - start_time logger.info("Processed %s %% of parameter points in %.1f seconds.", pscan * 100, elapsed_time) while iscan / len(theta_grid_mdim) > pscan: if pscan > 0.095: pscan += 0.1 else: pscan += 0.01 # fix some parameters constrained_negative_log_likelihood = fix_params( negative_log_likelihood, theta=theta_mdim, fixed_components=remaining_components ) # obtain starting point theta0 = [] for i, theta in enumerate(theta_start): if i not in remaining_components: theta0.append(theta) # obtain local minimum - Eq.(58) in 1805.00020 result = minimize(constrained_negative_log_likelihood, x0=np.array(theta0), method=method) best_fit_constrained = result.x # Expected Log Likelihood - Eq.(57) in 1805.00020 profiled_logr = -1.0 * ( constrained_negative_log_likelihood(best_fit_constrained) - negative_log_likelihood(best_fit_global) ) log_r.append(profiled_logr) # evaluate p_value and best fit point logr = np.array(log_r) i_ml = np.argmax(log_r) log_r = log_r[:] - log_r[i_ml] m2_log_r = -2.0 * log_r p_value = chi2.sf(x=m2_log_r, df=dof) return theta_grid_mdim, p_value, i_ml, log_r