Source code for sntd.survey_cosmo

from matplotlib import pyplot as plt
import numpy as np
import scipy
import math
from scipy.integrate import quad
from matplotlib import ticker, rcParams
from matplotlib.lines import Line2D
from scipy.misc import derivative as deriv
import pandas

from copy import deepcopy, copy
import sncosmo
import nestle
import corner


from .coetools import *
from .util import *

__all__ = ['Survey', 'Fisher']


def EA1(z1, z2, cosmo):
    """The integral of the inverse of the normalized 
    Hubble parameter, from z1 to z2:  int_z1^z2{1/E(z')dz}
    Eq 8 of Coe & Moustakas 2009 (for non-flat)"""

    if 'w' in cosmo.keys():
        def Ez2(z): return (cosmo['Om0']*(1+z)**3 + cosmo['Ok']*(1+z)**2 +
                            cosmo['Ode0']*(1+z)**(3*(1+cosmo['w'])))
    else:
        def Ez2(z): return (cosmo['Om0']*(1+z)**3 + cosmo['Ok']*(1+z)**2 +
                            cosmo['Ode0']*(1+z)**(3*(1+cosmo['w0']+cosmo['wa'])) *
                            np.exp(-3*cosmo['wa']*z/(1+z)))

    def Ezinv(z): return 1 / np.sqrt(Ez2(z))
    allz = np.append(z1, z2)
    sort = np.argsort(allz)
    n1 = len(z1)
    result = np.zeros(n1*2)

    last = 0
    for i in range(len(sort)):
        if i > 0:
            temp, _ = quad(Ezinv, allz[sort[i-1]], allz[sort[i]])
        else:
            temp, _ = quad(Ezinv, 0, allz[sort[i]])

        result[sort[i]] = last+temp
        last += temp

    return(result[:n1], result[n1:])


def EA2(z1, z2, cosmo):
    """The integral of the inverse of the normalized 
    Hubble parameter, from z1 to z2:  int_z1^z2{1/E(z')dz}
    Eq 8 of Coe & Moustakas 2009. Assumes flat universe. """

    if 'w' in cosmo.keys():
        def Ez2(z): return (cosmo['Om0']*(1+z)**3 + cosmo['Ok']*(1+z)**2 +
                            cosmo['Ode0']*(1+z)**(3*(1+cosmo['w'])))
    else:
        def Ez2(z): return (cosmo['Om0']*(1+z)**3 + cosmo['Ok']*(1+z)**2 +
                            cosmo['Ode0']*(1+z)**(3*(1+cosmo['w0']+cosmo['wa'])) *
                            np.exp(-3*cosmo['wa']*z/(1+z)))

    def Ezinv(z): return 1 / np.sqrt(Ez2(z))
    # if Ez2(z1)>0 and Ez2(z2)>0:
    return(np.array([quad(Ezinv, z1[i], z2[i])[0] for i in range(len(z1))]))


def Eratio(zl, zs, cosmo):
    """The time delay expansion function ratio 
    (script E, eq 12 in Coe & Moustakas 2009)."""

    EL, ES = EA1(zl, zs, cosmo)
    if cosmo['Ok'] < -.0001:
        ELS = EA2(zl, zs, cosmo)
        EL = np.sin(np.sqrt(np.abs(cosmo['Ok']))
                    * EL)/np.sqrt(np.abs(cosmo['Ok']))
        ES = np.sin(np.sqrt(np.abs(cosmo['Ok']))
                    * ES)/np.sqrt(np.abs(cosmo['Ok']))
        ELS = np.sin(np.sqrt(np.abs(cosmo['Ok']))
                     * ELS)/np.sqrt(np.abs(cosmo['Ok']))
    elif cosmo['Ok'] > .0001:
        ELS = EA2(zl, zs, cosmo)
        EL = np.sinh(np.sqrt(np.abs(cosmo['Ok']))
                     * EL)/np.sqrt(np.abs(cosmo['Ok']))
        ES = np.sinh(np.sqrt(np.abs(cosmo['Ok']))
                     * ES)/np.sqrt(np.abs(cosmo['Ok']))
        ELS = np.sinh(np.sqrt(np.abs(cosmo['Ok']))
                      * ELS)/np.sqrt(np.abs(cosmo['Ok']))
    else:  # np.abs(cosmo['Ok'])<.001:
        ELS = ES-EL

    Erat = EL * ES / ELS
    return(Erat/(cosmo['h']*100))


def Evariance(Erat, dTc):
    """The variance on the E ratio due to the 
    time delay and lens potential uncertainty. 
    dTc : percent uncertainty on time delay distance ratio 
            (combining time delay measurement + lens modeling error)  
            (This is a percentage:  enter "5" for 5% precision)
    """
    return(Erat**2 * (dTc/100.)**2)


def loglikelihoodE(testVars, testPoint, zl, zs, dTc=2, set_cosmo={}, Eratio_true=None,
                   Om0true=0.3, Oktrue=0, Ode0true=.7, w0true=-1., watrue=0., htrue=.7,
                   return_ratios=False, P=1):
    """log(likelihood) for a time delay distance 
    ratio constraint, comparing 
    a given test point to the true position. 

    testVars: the test variables
    testPOint: the test point
    zl : lens redshift(s)
    zs : source redshift(s)
    dTc : percent precision on cosmological distance ratio, combining
       uncertainty from the time delay measurement + lens modeling
    set_cosmo: dictionary of constants to set (if not defaults)
    Eratio_true: The true ratio if computed beforehand for speed
    Om0true: True value of Om0
    Oktrue: True value of Ok
    Ode0true: True value of Ode0
    w0true: True value of w0
    watrue: True value of wa
    htrue: True value of h
    return_ratios: if true, return the actual ratios in additon to the likelihood
    P: The probability of each source/lens redshift combo (see C&M 2009 equation 17)
    """

    cosmo = {testVars[i]: testPoint[i] for i in range(len(testVars))}
    if 'w' not in testVars:
        if 'w0' not in cosmo.keys():
            cosmo['w0'] = w0true
        if 'wa' not in cosmo.keys():
            cosmo['wa'] = watrue
        true_cosmo = {'Om0': Om0true, 'Ok': Oktrue,
                      'Ode0': Ode0true, 'w0': w0true, 'wa': watrue, 'h': htrue}
    else:
        true_cosmo = {'Om0': Om0true, 'Ok': Oktrue,
                      'Ode0': Ode0true, 'w': w0true, 'h': htrue}
    for k in set_cosmo.keys():
        if k not in cosmo.keys():
            cosmo[k] = set_cosmo[k]

    if 'Om0' not in cosmo.keys():
        if 'Ode0' in cosmo.keys():
            if 'Ok' in cosmo.keys():
                cosmo['Om0'] = 1-cosmo['Ode0']-cosmo['Ok']
            else:
                cosmo['Om0'] = 1-cosmo['Ode0']-Oktrue
        elif 'Ok' in cosmo.keys():
            cosmo['Om0'] = 1-Ode0true-cosmo['Ok']
        else:
            cosmo['Om0'] = Om0true
    if 'Ode0' not in cosmo.keys():
        if 'Ok' in cosmo.keys():
            cosmo['Ode0'] = 1-cosmo['Om0']-cosmo['Ok']
        else:
            cosmo['Ode0'] = Ode0true
    if 'Ok' not in cosmo.keys():
        cosmo['Ok'] = 1-cosmo['Om0']-cosmo['Ode0']

    if 'H0' in cosmo.keys():
        cosmo['h'] = cosmo['H0']/100.
    else:
        if 'h' not in cosmo.keys():
            cosmo['h'] = htrue

    if Eratio_true is None:
        Eratio_true = Eratio(zl, zs, true_cosmo)
    Eratio_test = Eratio(zl, zs, cosmo)

    # if np.isfinite(Eratio_w0wa):
    loglike = -0.5 * np.sum(P*(Eratio_true-Eratio_test)**2 /
                            Evariance(Eratio_test, dTc))
    if return_ratios:
        return(Eratio_true, Eratio_test, loglike)
    # else:
    #    loglike = -np.inf
    return(loglike)


def rescale_likelihood(a):
    """
    Rescale the likelihood array using 
    a Sorted Cumulative Sum function (e.g., for making contours)
    Construct an array "sumabove" such that the cell 
    at index i in sumabove is equal to the sum of all 
    cells from the input array "a" that have a
    cell value higher than a[i]
    """
    # Collapse the array into 1 dimension
    sumabove = deepcopy(a).ravel()

    # Sort the raveled array by descending cell value
    iravelsorted = sumabove.argsort(axis=0)[::-1]

    # Reassign each cell to be the cumulative sum of all
    # input array cells with a higher value :
    sumabove[iravelsorted] = sumabove[iravelsorted].cumsum()

    # Normalize by the max value in the array
    sumabove /= sumabove.max()

    # Now unravel back into shape of original array and return
    return(sumabove.reshape(a.shape))


def deriv_like(p1, p2, names, zl, zs, dTc, Om0, Ok, Ode0, w0true, watrue, htrue, P):
    """
    helper to calculate fisher matrix
    """
    if p2 is not None:
        return -2*loglikelihoodE(names, [p1, p2], zl, zs, dTc=dTc, Om0true=Om0, Oktrue=Ok, Ode0true=Ode0,
                                 w0true=w0true, watrue=watrue, htrue=htrue, P=P)
    else:
        return -2*loglikelihoodE(names, [p1], zl, zs, dTc=dTc, Om0true=Om0, Oktrue=Ok,
                                 Ode0true=Ode0, w0true=w0true, watrue=watrue, htrue=htrue, P=P)


[docs]class Survey(object): """ A Survey class that enables cosmology tests with assumptions of redshift distributions and time delay/lens model precision. Parameters ---------- N: int The number of discovered glSN (overruled by zl/zs below) dTL: float The percent precision on each lens model. dTT: float The percent precision on each time delay measurement zl: float or list Redshift(s) of the lens(es). If a float, assumes you want N identical SN. zs: float or list Redshift(s) of the source(s). If a float, assumes you want N identical SN. P: float or list The probability of each source/lens redshift combo, defaults to 1 (i.e. equal weight) calc_ensemble_P: bool If True, probabilities are calculated based on gaussian distributions name: str Name of your survey """ def __init__(self, N=10, dTL=5, dTT=5, zl=0.3, zs=0.8, P=1, calc_ensemble_P=False, name='mySurvey', sys_dTL=0, **kwargs): if not isinstance(zl, (list, tuple, np.ndarray)): zl = [zl] if not isinstance(zs, (list, tuple, np.ndarray)): zs = [zs] assert(len(zl) == len(zs)) if len(zl) > 1: self.N = len(zl) else: self.N = N self.dTL = dTL self.dTT = dTT self.sys_dTL = sys_dTL self.zl = zl self.zs = zs if calc_ensemble_P: zl_p = scipy.stats.norm(np.mean(zl), np.std(zl)) zs_p = scipy.stats.norm(np.mean(zs), np.std(zs)) self.P = np.array([scipy.integrate.simps(zl_p.pdf([z-.01, z+.01]), [z-.01, z+.01]) for z in zl]) *\ np.array([scipy.integrate.simps( zs_p.pdf([z-.01, z+.01]), [z-.01, z+.01]) for z in zs]) else: self.P = P self.grid_likelihood = None self.nestle_result = None self.w0 = -1 self.wa = 0 self.Om0 = 0.3 self.Ok = 0 self.Ode0 = 0.7 self.w = -1 self.h = .7 self.H0 = 70 self.cosmo_truths = {'h': self.h, 'w': self.w, 'Ode0': self.Ode0, 'Ok': self.Ok, 'w0': self.w0, 'wa': self.wa, 'Om0': self.Om0, 'H0': self.H0} self.name = name @property def dTc(self): """ Returns the combined lens/delay uncertainty assuming statistical uncertainties """ return np.sqrt((self.dTL**2 + self.dTT**2) / self.N + self.sys_dTL**2)
[docs] def survey_grid(self, vparam_names, bounds={}, npoints=100, grad_param=None, constants={}, grad_param_bounds=None, ngrad=10, grid_param1=None, grid_param2=None, **kwargs): """Calculate cosmological contours by varying 2 parameters in a grid. Parameters ------- vparam_names: list The names of parameters to vary bounds: dict Dictionary with param names as keys and list/tuple/array of bounds as values npoints: int The number of sample points grad_param: str Parameter to assume we're to have measured wrong (see C&M 2009 Figure 4) constants: dict Constants that are not the defaults grad_param_bounds: dict Bounds for grad_param, same format as bounds ngrad: int Number of grid points to vary grad_param grid_param1: iterable Optional choice of grid param 1 list (default uniform based on bounds) grid_param2: iterable Optional choice of grid param 2 list (default uniform based on bounds) Returns ------- Adds to class attribute "grid" (a dictionary), with a comma-separated list of the vparam_names as the key and grid values as the value. """ if len(vparam_names) != 2: print('For grid mode, must provide exactly 2 parameters.') return if grid_param1 is None: if vparam_names[0] not in bounds.keys(): print('Must provide bounds for %s.' % vparam_names[0]) return param1_list = np.linspace( bounds[vparam_names[0]][0], bounds[vparam_names[0]][1], npoints) else: param1_list = grid_param1 if grid_param2 is None: if vparam_names[1] not in bounds.keys(): print('Must provide bounds for %s.' % vparam_names[1]) return param2_list = np.linspace( bounds[vparam_names[1]][0], bounds[vparam_names[1]][1], npoints) else: param2_list = grid_param2 assert len(param1_list) == len(param2_list) p1grid, p2grid = np.meshgrid(param1_list, param2_list) true_ratio = Eratio(self.zl, self.zs, self.cosmo_truths) if grad_param is None: loglE = np.array([[loglikelihoodE(vparam_names, [param1_list[i], param2_list[j]], self.zl, self.zs, dTc=kwargs.get('dTc', self.dTc), set_cosmo=constants, Om0true=self.cosmo_truths['Om0'], Oktrue=self.cosmo_truths[ 'Ok'], Eratio_true=true_ratio, Ode0true=self.cosmo_truths['Ode0'], w0true=self.cosmo_truths['w0'], watrue=self.cosmo_truths['wa'], htrue=self.cosmo_truths['h']) for i in range(len(param1_list))] for j in range(len(param2_list))])[:, :] likelihood = np.exp(loglE) likelihood[np.isnan(likelihood)] = 0 like_rescaled = rescale_likelihood(likelihood) if self.grid_likelihood is None: self.grid_likelihood = {} self.grid_loglikelihood = {} self.grid_samples = {} self.param1_list = {} self.param2_list = {} self.grid_likelihood[','.join(vparam_names)] = like_rescaled self.grid_loglikelihood[','.join(vparam_names)] = loglE else: all_res = {} grad_param_range = np.linspace( grad_param_bounds[0], grad_param_bounds[1], ngrad) for g in grad_param_range: loglE = np.array([[loglikelihoodE(vparam_names, [param1_list[i], param2_list[j]], self.zl, self.zs, dTc=kwargs.get('dTc', self.dTc), set_cosmo={grad_param: g}, Om0true=self.cosmo_truths['Om0'], Oktrue=self.cosmo_truths['Ok'], Ode0true=self.cosmo_truths['Ode0'], w0true=self.cosmo_truths['w0'], watrue=self.cosmo_truths['wa'], htrue=self.cosmo_truths['h']) for i in range(len(param1_list))] for j in range(len(param2_list))])[:, :, 0] likelihood = np.exp(loglE) likelihood[np.isnan(likelihood)] = 0 like_rescaled = rescale_likelihood(likelihood) all_res[g] = like_rescaled self.grid_grad_res = all_res self.grid_grad_param = grad_param self.grid_grad_param_range = grad_param_range if self.grid_likelihood is None: self.param1_list = {} self.param2_list = {} self.grid_samples = {} self.grid_likelihood = {} self.grid_grad_res = {} self.grid_grad_param = {} self.grid_grad_param_range = {} self.grid_grad_res[','.join(vparam_names)] = all_res self.grid_grad_param[','.join(vparam_names)] = grad_param self.grid_grad_param_range[','.join( vparam_names)] = grad_param_range self.grid_likelihood[','.join(vparam_names)] = np.median( [all_res[g] for g in grad_param_range]) self.param1_list[','.join(vparam_names)] = param1_list self.param2_list[','.join(vparam_names)] = param2_list self.grid_samples[','.join(vparam_names)] = [p1grid, p2grid]
[docs] def survey_nestle(self, vparam_names, bounds, constants={}, npoints=100, **kwargs): """Calculate cosmological contours in an MCMC-like fashion. Parameters ------- vparam_names: list The names of parameters to vary bounds: dict Dictionary with param names as keys and list/tuple/array of bounds as values npoints: int The number of sample points grad_param: str Parameter to assume we're to have measured wrong (see C&M 2009 Figure 4) constants: dict Constants that are not the defaults grad_param_bounds: dict Bounds for grad_param, same format as bounds ngrad: int Number of grid points to vary grad_param Returns ------- Adds to class attribute "grid" (a dictionary), with a comma-separated list of the vparam_names as the key and grid values as the value. """ ppfs = {} if bounds is not None: for key, val in bounds.items(): a, b = val f = sncosmo.utils.Interp1D(0., 1., np.array([a, b])) ppfs[key] = f iparam_names = [key for key in vparam_names if key in ppfs] ppflist = [ppfs[key] for key in iparam_names] npdim = len(iparam_names) # length of u ndim = len(vparam_names) # length of v def prior_transform(u): d = {} for i in range(npdim): d[iparam_names[i]] = ppflist[i](u[i]) v = np.empty(ndim, dtype=np.float) for i in range(ndim): key = vparam_names[i] v[i] = d[key] return v def likelihood(parameters): newParams = {} newParams = {iparam_names[i]: parameters[i] for i in range(len(parameters))} for param in constants.keys(): newParams[param] = constants[param] return(loglikelihoodE(vparam_names, [newParams[p] for p in iparam_names], self.zl, self.zs, dTc=kwargs.get( 'dTc', self.dTc), Om0true=self.cosmo_truths['Om0'], Oktrue=self.cosmo_truths['Ok'], Ode0true=self.cosmo_truths['Ode0'], w0true=self.cosmo_truths['w0'], watrue=self.cosmo_truths['wa'], htrue=self.cosmo_truths['h'])) res = nestle.sample(likelihood, prior_transform, ndim, npdim=npdim, npoints=npoints, method=kwargs.get('method', 'single'), maxiter=kwargs.get('maxiter', None), maxcall=kwargs.get('maxcall', None), rstate=kwargs.get('rstate', None), callback=(nestle.print_progress if kwargs.get('verbose', False) else None), **kwargs) if self.nestle_result is None: self.nestle_result = {} self.nestle_cosmology_fit = {} self.nestle_result[','.join(vparam_names)] = res self.nestle_cosmology_fit[','.join(vparam_names)] = {p: weighted_quantile( res.samples[:, iparam_names.index(p)], [.16, .5, .84], res.weights) for p in iparam_names}
[docs] def survey_fisher(self, params, dx=1e-6): """ Create a fisher matrix using params, based on the overarching survey parameters. Parameters ---------- params: list List of parameters names to be included in fisher matrix dx: float The dx used for calculating numerical derivatives """ def _take_deriv(p2, p2name, p1, p1name): return deriv(deriv_like, p1, dx=dx, args=(p2, [p1name, p2name], self.zl, self.zs, self.dTc, self.cosmo_truths['Om0'], self.cosmo_truths['Ok'], self.cosmo_truths['Ode0'], self.cosmo_truths['w0'], self.cosmo_truths['wa'], self.cosmo_truths['h'], self.P)) fisher_matrix = np.zeros((len(params), len(params))) for i in range(len(params)): for j in range(len(params)): if i == j: fisher_matrix[i][j] = .5*np.abs(deriv(deriv_like, self.cosmo_truths[params[i]], dx=dx, args=(None, [params[i]], self.zl, self.zs, self.dTc, self.cosmo_truths['Om0'], self.cosmo_truths[ 'Ok'], self.cosmo_truths['Ode0'], self.cosmo_truths['w0'], self.cosmo_truths['wa'], self.cosmo_truths['h'], self.P), n=2)) else: fisher_matrix[i][j] = .5*deriv(_take_deriv, self.cosmo_truths[params[i]], dx=dx, args=(params[i], self.cosmo_truths[params[j]], params[j])) self.fisher_matrix = Fisher( data=fisher_matrix, params=params, name=self.name, cosmo_truths=self.cosmo_truths)
[docs] def plot_survey_gradient(self, params, math_labels=None): """ Plots the gradient survey grid, where one parameter is assumed to be systematically biased Parameters ---------- params: list 2 parameters to plot that have been included in a survey grid math_labels: list list of latex labels for params (for labeling plot) """ if self.grid_likelihood is None or (','.join(params) not in self.grid_likelihood.keys() and ','.join([params[1], params[0]]) not in self.grid_likelihood.keys()): print('Cannot plot survey gradient without running grid first.') return if ','.join(params) not in self.grid_likelihood.keys(): params = [params[1], params[0]] if math_labels is None: math_labels = self.grid_vparam_names ax = plot('scatter', [], [], x_lab=math_labels[0], y_lab=math_labels[1]) ind = -1 contours = [] for g in self.grid_grad_param_range: contours.append(ax.contour(self.grid_samples[','.join(params)][0], self.grid_samples[','.join(params)][1], self.grid_grad_res[','.join(params)][g], levels=[.4])) ind += 1 strs = ['%.2f' % g] fmt = {} for l, s in zip(contours[ind].levels, strs): fmt[l] = s ax.clabel(contours[ind], contours[ind].levels, inline=True, fmt=fmt, fontsize=10) plt.show()
[docs] def plot_survey_contour(self, params, math_labels=None, color='#1f77b4', filled=True, confidence=[.68, .95], fom=False, ax=None, alphas=[.9, .3], show_legend=False, show_nestle=True, use_H0=False, **kwargs): """ Plots the contours of a nestle or gridded survey Parameters ---------- params: list 2 parameters to plot that have been included in a survey grid or nestle survey math_labels: list list of latex labels for params (for labeling plot) color: str or tuple color for plotting in matplotlib filled: bool filled vs. outlined (if false) contours confidence: list or float confidence interval(s) to plot fom: bool if True, FOM is calculated. MAJOR WARNING: This will be enormously biased if the full contour is not being drawn, set limits accordingly ax: :class:'~plt.axes' If you want the contours drawn onto existing axes alphas: list or float draws confidence intervals with this alpha, must match up with "confidence" parameter show_legend: bool If True, a legend is shown show_nestle: bool If both nestle and grid have been completed, choose nestle if true to show use_H0: bool If true and one of the params is 'h', multiply by 100 """ if self.nestle_result is None and self.grid_likelihood is None \ or (self.nestle_result is not None and (','.join(params) not in self.nestle_result.keys() and ','.join([params[1], params[0]]) not in self.nestle_result.keys()) and (self.grid_likelihood is not None and ','.join(params) not in self.grid_likelihood.keys() and ','.join([params[1], params[0]]) not in self.grid_likelihood.keys())): print('Cannot plot nestle result without running nestle or grid first.') return if self.nestle_result is not None: if ','.join(params) not in self.nestle_result.keys(): params = [params[1], params[0]] if math_labels is None: math_labels = params param_inds = [] param_inds = [0, 1] fig = corner.corner(self.nestle_result[','.join(params)].samples[:, param_inds], weights=self.nestle_result[','.join(params)].weights, labels=math_labels, truths=[self.cosmo_truths[p] for p in params], no_fill_contours=filled, fill_contours=filled, levels=[.68]) else: if ax is None: fig = plt.figure(figsize=[9, 6]) ax = fig.gca() plt.setp(ax.xaxis.get_ticklabels(), fontsize='large') plt.setp(ax.yaxis.get_ticklabels(), fontsize='large') if ','.join(params) not in self.grid_likelihood.keys(): params = [params[1], params[0]] if math_labels is None: math_labels = params if isinstance(confidence, (float, int)): confidence = [confidence] alphas = np.linspace(.9, .3, len(confidence) ) if alphas is None else alphas if len(alphas) != len(confidence): alphas = [alphas[0]]*len(confidence) if params[0] == 'h' and use_H0: constx = 100 else: constx = 1 if params[1] == 'h' and use_H0: consty = 100 else: consty = 1 for i in range(len(confidence)): if filled: CS = ax.contourf(self.grid_samples[','.join(params)][0]*constx, self.grid_samples[','.join(params)][1]*consty, self.grid_likelihood[','.join( params)], colors=color, levels=[0, np.sort(confidence)[i]], alpha=alphas[i], zorder=10, **kwargs) lines = Line2D( [0], [0], color=color, alpha=alphas[0], linewidth=10, label=self.name) else: CS = ax.contour(self.grid_samples[','.join(params)][0]*constx, self.grid_samples[','.join(params)][1]*consty, self.grid_likelihood[','.join( params)], colors=color, levels=[0, np.sort(confidence)[i]], **kwargs) lines = Line2D([0], [0], color=color, alpha=alphas[0], linewidth=10, marker='.', label=self.name) if fom: CS954 = ax.contour(self.grid_samples[','.join(params)][0]*constx, self.grid_samples[','.join(params)][1]*consty, self.grid_likelihood[','.join(params)], alpha=0, levels=[.954]) contour = CS954.collections[0] vs = contour.get_paths()[0].vertices x = vs[:, 0] y = vs[:, 1] def PolyArea(x, y): return 0.5*np.abs(np.dot(x, np.roll(y, 1))-np.dot(y, np.roll(x, 1))) ax.scatter(x, y) #area=0.5*np.sum(y[:-1]*np.diff(x) - x[:-1]*np.diff(y)) area = np.abs(PolyArea(x, y)) line_name = self.name+': FOM=%.2f' % (np.pi/area) else: line_name = self.name ax.axhline(self.cosmo_truths[params[1]] * consty, ls='--', color='0.6', lw=3.5) ax.axvline(self.cosmo_truths[params[0]] * constx, ls='--', color='0.6', lw=3.5) ax.plot(self.cosmo_truths[params[0]], self.cosmo_truths[params[1]], marker='o', mec='k', mfc='k', ms=10) ax.set_xlabel(math_labels[0], fontsize=20) ax.set_ylabel(math_labels[1], fontsize=20) if show_legend: ax.legend(fontsize=14) return ax return(ax, lines, line_name)
[docs]class Fisher: """ Fisher class is more or less copied from Dan Coe's arxiv paper about Fisher matrices: https://arxiv.org/pdf/0906.4123.pdf Only some minor adjustment from me. Parameters --------- inroot: str base directory to read fisher matrices from if using "load" function xvar: str can optionally set an x variable as the "default for other functions" yvar: str can optionally set a y variable as the "default for other functions" fixes: str comma-separated str of parameters to fix (if fix is called) margs: list list of parameters to marginalize over (if marg is called) data: np.ndarray an input fisher matrix (NxN where N is the length of your params) data_is_cov: bool If true, assumes that "data" is actually a covariance matrix params: list Parameter names silent: bool verbosity flag for some functions name: str Name to plot in legend for figures cosmo_trues: dict True parameters to plot as dashed lines in plots """ def __init__(self, inroot='', xvar='', yvar='', fixes='', margs=[], data=[], data_is_cov=False, params=[], silent=False, name='my_fisher', cosmo_truths={'h': .7, 'w': 0, 'Ode0': .7, 'w0': 0, 'wa': -1, 'Om0': .3}): self.name = name self.xvar = xvar self.yvar = yvar self.fixes = fixes self.margs = margs if data_is_cov: self.data = np.linalg.inv(data) else: self.data = data self.params = params self.silent = silent self.nFish = 1 self.inroot = inroot self.cosmo_truths = cosmo_truths if len(self.params) > 0 and len(self.data) > 0: self.pretty_fish = pandas.DataFrame.from_dict({k: list(self.data[self.params.index(k)][:]) for k in self.params}, orient='index', columns=self.params) else: self.pretty_fish = None self.fish_list = [self]
[docs] def load(self, fishdir=''): # DETFast join """ Loads existing Fisher matrix Parameters ---------- fishdir: str The directory (default inroot) """ txt = loadfile(self.inroot+'.fisher', dir=fishdir, silent=self.silent) nparam = int(txt[0].split()[1]) self.params = [] for ivar in range(nparam): param = txt[ivar+4].split()[0] self.params.append(param) self.data = loaddata(self.inroot+'.fisher+', dir=fishdir, headlines=4+nparam, silent=1)
def _ii(self): self.ix = None self.iy = None if self.xvar: self.ix = self.params.index(self.xvar) if self.yvar: self.iy = self.params.index(self.yvar) self.ifixes = [] for i, param in enumerate(self.params): if param in self.fixes: self.ifixes.append(i) self.imargs = [] for i, param in enumerate(self.params): if param in self.margs: self.imargs.append(i)
[docs] def pindex(self, param): """ Index of parameter Parameters ---------- param: str Parameter you want the index of Returns ------- index: int The index of param """ return self.params.index(param)
def _take(self, iparams): self.data = self.data._take(iparams, 0) self.data = self.data._take(iparams, 1) self.params = list(_take(self.params, iparams))
[docs] def reorder(self, params): """ Matrix with params in new order Parameters ---------- params: list new list of parameters """ self._repar(params) iparams = list(map(self.pindex, params)) self._take(iparams)
[docs] def rename(self, pdict1=None): """ Rename parameters given a dictionary of names & nicknames Parameters ---------- pdict1: dict Dictionary containing old parameters as keys and new as vals """ pdict1 = pdict1 or pdict for i, param in enumerate(self.params): self.params[i] = pdict1.get(param, param) self.pretty_fish = pandas.DataFrame.from_dict({k: list(self.data[self.params.index(k)][:]) for k in self.params}, orient='index', columns=self.params)
[docs] def fix(self, fixes=[]): """ Fix parameters constant <==> Remove them from the Fisher matrix Parameters ---------- fixes: list List of parameters to fix, otherwise uses fixes attribute """ self.fixes = fixes or self.fixes self.fixes = strspl(self.fixes) self._ii() iall = arange(len(self.params)) ikeep = set(iall) - set(self.ifixes) # Sorts result ikeep = list(ikeep) self._take(ikeep)
[docs] def marg(self, margs=[]): """ Marginalize over variables: Remove them from the covariance matrix Parameters ---------- margs: list List of parameters to marginalize over, otherwise uses margs attribute """ self.margs = margs or self.margs self._ii() #ikeep = invertselection(arange(len(self.params)), self.fixes) iall = arange(len(self.params)) ikeep = set(iall) - set(self.imargs) # Sorts result ikeep = list(ikeep) C = self.cov() C = C._take(ikeep, 0) C = C._take(ikeep, 1) self.data = np.linalg.inv(C) self.params = list(_take(self.params, ikeep))
[docs] def transform(self, params, M): """ Transform to new set of parameters using matrix provided (see Coe 2009 above) Parameters ---------- params: list New list of parameters M: :class:`~numpy.ndarray` The new matrix """ self.data = matrix_multiply([transpose(M), self.data, M]) self.params = params
[docs] def prior(self, param, sig): """Set a prior of sig on param""" ivar = self.pindex(param) self.data[ivar, ivar] = self.data[ivar, ivar] + 1 / sig**2
[docs] def cov(self): """ Covariance matrix, by definition the inverse of the Fisher matrix """ return np.linalg.inv(self.data)
[docs] def dxdyp(self, xvar='', yvar=''): # , fixes=None """ Return uncertainty in two parameters and their correlation xvar: str x variable yvar: str y variable Returns ------- dx: float x uncertainty dy: float y uncertainty p: float rho (correlation parameter) """ self.xvar = xvar if len(xvar) > 0 else self.xvar self.yvar = yvar if len(yvar) > 0 else self.yvar self._ii() C = self.cov() C = C.take((self.ix, self.iy), 0) C = C.take((self.ix, self.iy), 1) dx = np.sqrt(C[0, 0]) dy = np.sqrt(C[1, 1]) dxy = C[0, 1] p = dxy / (dx * dy) self.C = C return dx, dy, p
[docs] def dx(self, xvar=''): # , fixes=None """ Return uncertainty in parameter (if marginalizing over others)""" self.xvar = xvar or self.xvar #self.fixes = strspl(fixes or self.fixes) self._ii() #dx = 1 / sqrt(self.data[self.ix,self.ix]) self.C = C = self.cov() dx = sqrt(C[self.ix, self.ix]) return dx
[docs] def addpar(self, param): """ Add a parameter to the list of parameters Parameters ---------- param: str The parameter to add """ npar = len(self.params) data = np.zeros((npar+1, npar+1)) data[:npar, :npar] = self.data self.data = data self.params.append(param)
def _repar(self, params): for param in params: if param not in self.params: self.addpar(param)
[docs] def pr(self): """ Print contents """ print(self.params) if self.pretty_fish is not None: print(self.pretty_fish) else: pint(self.data)
def __add__(self, sel2): """ Add Fisher matrices together, combining constraints (lets you use "+") Parameters ---------- sel2: The second fisher matrix to add Returns ------- :class:~sntd.survey_cosmo.Fisher A new fisher matrix class, with constraints from self and sel2 (still holds old matrices in "fish_list") """ pl1 = self.params pl2 = sel2.params n1 = len(pl1) n2 = len(pl2) F1 = self.data F2 = sel2.data if pl1 == pl2: pl = pl1 else: params1 = set(pl1) params2 = set(pl2) params = params1 | params2 pl = list(params) #pl = list(sort(pl)) n = len(pl) # Put F1 in merged F join _ii = [] for p in pl1: i = pl.index(p) _ii.append(i) FF1 = np.zeros((n, n)) for i1, i in enumerate(_ii): FF1[i].put(_ii, F1[i1]) # Put F2 in merged F join _ii = [] for p in pl2: i = pl.index(p) _ii.append(i) FF2 = np.zeros((n, n)) for i2, i in enumerate(_ii): FF2[i].put(_ii, F2[i2]) # Add new = deepcopy(self) new.data = FF1 + FF2 new.params = pl new.xvar = self.xvar new.yvar = self.yvar new.fixes = self.fixes new.name = self.name+'+'+sel2.name new.pretty_fish = pandas.DataFrame.from_dict({k: list(self.data[self.params.index(k)][:]) for k in self.params}, orient='index', columns=self.params) new.nFish = self.nFish+sel2.nFish new.fish_list = [new, sel2] new.fish_list = np.append(new.fish_list, self.fish_list) new.fish_list = [x for x in new.fish_list if x.nFish == 1 or x.nFish == new.nFish] return new def __mul__(self, fac): """ Multiply Fisher matrix by some factor (lets you use "*") Parameters ---------- fac: float The multiplicative factor Returns ------- :class:~sntd.survey_cosmo.Fisher A new fisher matrix class multiplied by fac """ new = Fisher() new.data = self.data * fac new.params = self.params new.xvar = self.xvar new.yvar = self.yvar new.fixes = self.fixes new.pretty_fish = pandas.DataFrame.from_items([(self.params[i], list(self.data[i][:])) for i in range(len(self.params))], orient='index', columns=self.params) return new def __str__(self): """ Replacement for print function """ if self.pretty_fish is not None: print(self.pretty_fish) else: print(self.data) return('')
[docs] def merit(self, param1, param2): """ Calculates the figure of merit for a pair of parameters. Parameters ---------- param1: str Parameter 1 to use param2: str Parameter 2 to use Returns ------- FOM: float The Figure of Merit """ dx, dy, p = self.dxdyp(param1, param2) # a,b,_=setell(dx,dy,p) # return(1./(6.17*a*b)) return(1./np.sqrt(np.linalg.det(self.C)))
[docs] def plot(self, param1, param2, x_limits, y_limits, math_label1=None, math_label2=None, bestfit1=None, bestfit2=None, alpha=0.9, color_list=None, print_merit=True, col_order=True, show_uncertainty=False): """ Plot contours from fisher matrix. This will plot all contours from matrices that have been added together make this matrix. Parameters ---------- param1: str Parameter 1 to plot param2: str Parameter 2 to plot xlimits: list or tuple or :class:`~numpy.ndarray` The x parameter limits for plotting ylimits: list or tuple or :class:`~numpy.ndarray` The y parameter limits for plotting math_label1: str A latex label for axis 1 math_label2: str A latex label for axis 2 bestfit1: float The true/best fit value for parameter 1 (default self.cosmo_truths) bestfit2: float The true/best fit value for parameter 2 (default self.cosmo_truths) alpha: float The alpha for plotting color_list: list List of colors to use for plotting print_merit: bool If True, the figure of merit it calculated and added to the legend show_uncertainty: bool If true, the final uncertainty in each parameter is printed on the plot """ xo = bestfit1 if bestfit1 is not None else self.cosmo_truths[param1] yo = bestfit2 if bestfit2 is not None else self.cosmo_truths[param2] if color_list is not None and not isinstance(color_list, (tuple, list)): color_list = [color_list] if color_list is None or len(color_list) != len(self.fish_list): color_list = [blues, reds, purples, yellows, oranges, darkoranges, greens, greys, lightblues, lightreds, lightgreens, lightyellows, lightgreys] fig = plt.figure() ax = fig.gca() i = 0 patches = [] merits = [x.merit(param1, param2) for x in self.fish_list] if col_order: original_order = np.argsort(merits) else: original_order = np.arange(0, len(self.fish_list), 1) for fish in np.array(self.fish_list)[np.argsort(merits)]: dx, dy, p = fish.dxdyp(param1, param2) plotellsp(xo, yo, dx, dy, p, colors=color_list[original_order[i]], alpha=alpha) if print_merit: m = ': FOM=%.1f' % np.sort(merits)[i] else: m = '' patches.append(plt.plot([], [], 's', ms=10, label=fish.name + m, color=color_list[original_order[i]][0])[0]) i += 1 if show_uncertainty: param1_prec = str(np.round(self.dx(param1), abs( int(np.round(math.log10(self.dx(param1))))))) param2_prec = str(np.round(self.dx(param2), abs( int(np.round(math.log10(self.dx(param2))))))) ax.annotate(r'$\delta $'+'%s=%s' % (param1, param1_prec), (.05, .1), xycoords='axes fraction') ax.annotate(r'$\delta $'+'%s=%s' % (param2, param2_prec), (.05, .05), xycoords='axes fraction') if x_limits is not None: plt.xlim(x_limits) if y_limits is not None: plt.ylim(y_limits) if math_label1 is None: plt.xlabel(param1, fontsize=16) else: plt.xlabel(math_label1, fontsize=16) if math_label2 is None: plt.ylabel(param2, fontsize=16) else: plt.ylabel(math_label2, fontsize=16) plt.figlegend(handles=patches, fontsize=14, bbox_to_anchor=(.88, .88)) return ax