Source code for sntd.models

import sncosmo
import numpy as np
from copy import copy
from astropy.table import Table
from scipy.interpolate import CubicSpline, interp1d
from sncosmo.utils import integration_grid
from sncosmo.constants import HC_ERG_AA, MODEL_BANDFLUX_SPACING
from scipy.stats import exponnorm
from collections import Counter

__all__ = ['unresolvedMISN']


class unresolvedSource(sncosmo.Source):
    def __init__(self, model_list):
        super(unresolvedSource, self).__init__()
        self.source_list = [model._source for model in model_list]
        self.name = self.source_list[0].name
        self.version = self.source_list[0].version
        self._phase = self.source_list[0]._phase
        self._wave = self.source_list[0]._wave
        self._parameters = self.source_list[0]._parameters
        self._param_names = self.source_list[0]._param_names
        self.param_names_latex = self.source_list[0].param_names_latex
        try:
            self._model_flux = self.source_list[0]._model_flux
            self._zero_before = self.source_list[0]._zero_before
        except:
            pass

    def _flux(self, phase, wave):
        return(np.sum([source._flux(phase, wave) for source in self.source_list], axis=0))


[docs]class unresolvedMISN(sncosmo.Model): """ Wrapper class of SNCosmo.Model to provide support for unresolved lensed SN fitting. """ def __init__(self, curve_models, delays=None, magnifications=None): """ Constructor for the unresolvedMISN class. Parameters ---------- curve_models: list of `~sncosmo.Model` List of models representing each (unresolved) image delays: list of float A list of relative delays between models magnifications: list of float A list of relative magnifications between models """ self.nimages = len(curve_models) super(unresolvedMISN, self).__init__(curve_models[0]._source) for model in curve_models: for effect_name,effect_frame in zip(model.effect_names,model._effect_frames): if effect_frame != 'free': continue effect_params = [x for x in model.param_names if effect_name in x] self._param_names = list(np.append(self._param_names,effect_params)) print(list(zip(effect_params,[model.get(p) for p in effect_params]))) self._parameters = np.append(self._parameters,[model.get(p) for p in effect_params]) self._param_names = list(np.append(self._param_names,[['dt_%i'%(i+1),'mu_%i'%(i+1)] for i in range(len(curve_models))]).flatten()) self._parameters = np.append(self._parameters,[[0,1] for i in range(len(curve_models))]).flatten() self.model_list = curve_models self._source = unresolvedSource(self.model_list) if delays is not None: self.set_delays(delays) if magnifications is not None: self.set_magnifications(magnifications) self.delays = delays self.magnifications = magnifications self.nparams = len(self.param_names) self.current_parameters = np.array(list(self._parameters)) self.param_indices = {p: self.param_names.index( p) for p in self.param_names} def __copy__(self): new = unresolvedMISN( [copy(model) for model in self.model_list]) new._parameters = self._parameters.copy() new.current_parameters = np.array(list(new._parameters)) new.set_delays(self.delays,base_t0=self.parameters[1]) new.set_magnifications(self.magnifications,base_x0=self.parameters[2]) return new
[docs] def set_delays(self, delays, base_t0=0): """ Set the relative delays between blended image models. Parameters ---------- delays: list of float A list of relative delays between models """ if len(delays) != len(self.model_list): print('Cannot set delays, must match length of model list.') for i in range(len(delays)): self.model_list[i].parameters[1] = delays[i] + base_t0 self.parameters[self.param_indices['dt_%i'%(i+1)]] = delays[i] self.current_parameters[self.param_indices['dt_%i'%(i+1)]] = delays[i] self.delays = delays
[docs] def set_magnifications(self, magnifications,base_x0=1): """ Set the relative magnifications between blended image models. Parameters ---------- magnifications: list of float A list of relative magnifications between models """ if len(magnifications) != len(self.model_list): print('Cannot set magnifications, must match length of model list.') for i in range(len(magnifications)): self.model_list[i].parameters[2] = magnifications[i] * base_x0 self.parameters[self.param_indices['mu_%i'%(i+1)]] = magnifications[i] self.current_parameters[self.param_indices['mu_%i'%(i+1)]] = magnifications[i] self.magnifications = magnifications
def _flux(self, phase, wave): param_dict = {self.param_names[i]: self.parameters[i] for i in range(self.nparams) if self.parameters[i] != self.current_parameters[i]} if len(param_dict) > 0: self.set(**param_dict) return(np.sum([model._flux(phase, wave) for model in self.model_list], axis=0))
[docs] def set(self, **param_dict): do_delays = False do_mags = False for key in param_dict.keys(): if key == 't0': for model in self.model_list: model.parameters[1] += (param_dict[key] - self.current_parameters[self.param_indices[key]]) elif key == self.param_names[2]: for model in self.model_list: model.parameters[2] *= (param_dict[key] / self.current_parameters[self.param_indices[key]]) elif 'dt_' in key: do_delays = True pass elif 'mu_' in key: do_mags = True pass else: for model in self.model_list: if key in model.param_names: model.update({key: param_dict[key]}) self.current_parameters[self.param_indices[key]] = param_dict[key] if do_delays: self.set_delays([param_dict['dt_%i'%(i+1)] if 'dt_%i'%(i+1) in param_dict.keys() else 0 for i in range(self.nimages)], base_t0=self.current_parameters[1]) if do_mags: self.set_magnifications([param_dict['mu_%i'%(i+1)] if 'mu_%i'%(i+1) in param_dict.keys() else 0 for i in range(self.nimages)], base_x0=self.current_parameters[2]) self.update(param_dict)
[docs] def add_effect(self, effect, name, frame): """ Add a PropagationEffect to the model. Parameters ---------- effect : `~sncosmo.PropagationEffect` Propagation effect. name : str Name of the effect. frame : {'rest', 'obs', 'free'} """ self._add_effect_partial(effect, name, frame) self._sync_parameter_arrays_effects() self._update_description() self.current_parameters = np.array(list(self._parameters)) self.param_indices = {p: self.param_names.index( p) for p in self.param_names} for model in self.model_list: model.add_effect(effect, name, frame)
def _sync_parameter_arrays_effects(self): # save a reference to old parameter values, in case there are # effect redshifts that have been set. old_parameters = self._parameters # Calculate total length of model's parameter array l = len(old_parameters) for effect, frame in zip(self._effects, self._effect_frames): l += (frame == 'free') + len(effect._parameters) # allocate new array (zeros so that new 'free' effects redshifts # initialize to 0) self._parameters = np.zeros(l, dtype=np.float) # copy old parameters: we do this to make sure we copy # non-default values of any parameters that the model alone # holds, such as z, t0 and effect redshifts. self._parameters[0:len(old_parameters)] = old_parameters # cross-reference source's parameters pos = 2 l = len(self._source._parameters) self._parameters[pos:pos+l] = self._source._parameters # copy self._source._parameters = self._parameters[pos:pos+l] # reference pos = len(old_parameters)#+= l # initialize a list of ints that keeps track of where the redshift # parameter of each effect is. Value is 0 if effect_frame is not 'free' self._effect_zindicies = [] # for each effect, cross-reference the effect's parameters for i in range(len(self._effects)): effect = self._effects[i] # for 'free' effects, add a redshift parameter if self._effect_frames[i] == 'free': self._effect_zindicies.append(pos) pos += 1 else: self._effect_zindicies.append(-1) # add all of this effect's parameters l = len(effect._parameters) self._parameters[pos:pos+l] = effect._parameters # copy effect._parameters = self._parameters[pos:pos+l] # reference pos += l
def _param_to_source(source, wave, color_curve=None, band1=None, band2=None, ref_color=False): band = None for b in np.unique(source.lc['band']): temp = sncosmo.get_bandpass(b) if temp.minwave() <= min(wave) and temp.maxwave() >= max(wave): band = sncosmo.get_bandpass(b) if band is None: raise RuntimeError( "Hmm, your data do not contain the band you want to fit.") finalPhase = source._phase if color_curve is not None and not ref_color: zp1 = source.lc['zp'][source.lc['band'] == band1][0] zp2 = source.lc['zp'][source.lc['band'] == band2][0] flux1 = sncosmo.Model(source._ts_sources[band1]).bandflux(band1, source._phase, zp1, source.lc['zpsys'][source.lc['band'] == band1][0]) temp_flux = flux1/10**(-.4*(color_curve(source._phase)-(zp1-zp2))) else: temp_flux = np.ones(len(finalPhase)) zpnorm = 10.**(0.4 * source.lc['zp'][np.array([x.lower() for x in source.lc['band']]) == band.name][0]) wave, dwave = integration_grid(band.minwave(), band.maxwave(), MODEL_BANDFLUX_SPACING) ms = sncosmo.get_magsystem(source.lc['zpsys'][0]) zpnorm = zpnorm / ms.zpbandflux(band) flux = temp_flux*HC_ERG_AA/(dwave*np.sum(wave*band(wave))*zpnorm) finalWave = np.arange(wave[0]-dwave*10, wave[-1]+dwave*10, dwave) finalFlux = np.zeros((len(finalPhase), len(finalWave))) for i in range(len(finalPhase)): # if finalPhase[i]>= np.min(timeArr) and finalPhase[i] <= np.max(timeArr): for j in range(len(finalWave)): if finalWave[j] >= wave[0] and finalWave[j] <= wave[-1]: finalFlux[i][j] = flux[i] # offset=np.zeros(len(finalPhase)) out = sncosmo.TimeSeriesSource(np.array(finalPhase), np.array( finalWave), np.array(finalFlux), zero_before=False) return (out) class PierelSource(sncosmo.Source): _param_names = ['amplitude', 'k', 'sigma', 's'] param_names_latex = ['A', 'K', '\sigma', 'Shift'] def __init__(self, data, name='PierelSource', version=None, tstep=1): super(sncosmo.Source, self).__init__() # init for the super class self.name = name self.version = version self._model = {} self.lc = _removeDupes(data) wave = [] for b in np.unique(data['band']): wave = np.append(wave, sncosmo.get_bandpass(b).wave) wave = np.sort(np.unique(wave)) wave = np.append([.99*wave[0]], wave) wave = np.append(wave, [1.01*wave[-1]]) self._wave = wave # self._phase=np.arange(0,np.max(data['time'])-np.min(data['time']),1) # self._phase=np.arange(-(np.max(data['time'])-np.min(data['time'])),np.max(data['time'])-np.min(data['time']),tstep) self._phase = np.arange(-800, 800, 1) self._parameters = np.array([1., 1., 1., 0.]) self._tstep = tstep self._ts_sources = {b: _param_to_source(self, self._phase, sncosmo.get_bandpass( b).wave) for b in np.unique(self.lc['band'])} def _param_flux(self, phase): temp = exponnorm.pdf( phase, self._parameters[1], scale=self._parameters[2]) if np.max(temp) == 0: return(np.zeros(len(phase))) pierelFlux = self._parameters[0]*exponnorm.pdf(phase, self._parameters[1], loc=-phase[temp == np.max( temp)], scale=self._parameters[2])/np.max(temp)+self._parameters[3] # print(phase[0],self._parameters[4]) if np.inf in pierelFlux or np.any(np.isnan(pierelFlux)): # print(self._parameters) return(np.zeros(len(phase))) return(pierelFlux) def _flux(self, phase, wave): band = [x for x in np.unique(self.lc['band']) if sncosmo.get_bandpass( x).wave[0] <= wave[0] and sncosmo.get_bandpass(x).wave[-1] >= wave[-1]][0] src = self._ts_sources[band] return(src._flux(phase, wave)*(self._param_flux(phase)[:, None])) class BazinSource(sncosmo.Source): _param_names = ['amplitude', 'B', 'fall', 'rise'] param_names_latex = ['A', 'B', 't_{fall}', 't_{rise}'] def __init__(self, data, name='BazinSource', version=None, tstep=1, colorCurve=None): super(sncosmo.Source, self).__init__() # init for the super class self.name = name self.version = version self._model = {} self.lc = _removeDupes(data) wave = [] for b in np.unique(data['band']): wave = np.append(wave, sncosmo.get_bandpass(b).wave) wave = np.sort(np.unique(wave)) wave = np.append([.99*wave[0]], wave) wave = np.append(wave, [1.01*wave[-1]]) self._wave = wave # self._phase=np.arange(-(np.max(data['time'])-np.min(data['time'])),np.max(data['time'])-np.min(data['time']),tstep) self._phase = np.arange(-300, 300, 1) self._parameters = np.array([1., 0., 30., 15.]) self._tstep = tstep if colorCurve is not None: color = [x for x in colorCurve.colnames if x != 'time'][0] curve = interp1d( colorCurve['time'], colorCurve[color], fill_value=0., bounds_error=False) self._ts_sources = dict([]) i = 0 for b in [color[0:color.find('-')], color[color.find('-')+1:]]: self._ts_sources[b] = _param_to_source(self, sncosmo.get_bandpass(b).wave, curve, color[0:color.find('-')], color[color.find('-')+1:], ref_color=i == 0) i += 1 else: self._ts_sources = {b: _param_to_source( self, sncosmo.get_bandpass(b).wave) for b in np.unique(self.lc['band'])} def _constantBazin(self, length, B): return(np.ones(length)*B) def _param_flux(self, phase): if self._parameters[0] == 0: return(self._constantBazin(len(phase), self._parameters[1])) elif self._parameters[2] <= self._parameters[3]: return(np.zeros(len(phase))) else: temp = ( np.exp(-phase/self._parameters[2])/(1+np.exp(-phase/self._parameters[3]))) if np.max(temp) == 0: return(np.zeros(len(phase))) bazinFlux = self._parameters[0]*(np.exp(-(phase+np.median(phase[np.where(temp == np.max(temp))[0]]))/self._parameters[2]) / (1+np.exp(-(phase+np.median(phase[np.where(temp == np.max(temp))[0]]))/self._parameters[3])))/np.max(temp) + self._parameters[1] if np.inf in bazinFlux or np.any(np.isnan(bazinFlux)): return(np.zeros(len(phase))) return(bazinFlux) def _flux(self, phase, wave): band = [x for x in np.unique(self.lc['band']) if sncosmo.get_bandpass( x).wave[0] <= wave[0] and sncosmo.get_bandpass(x).wave[-1] >= wave[-1]][0] src = self._ts_sources[band] return(src._flux(phase, wave)*(self._param_flux(phase)[:, None])) class NewlingSource(sncosmo.Source): _param_names = ['A', 'psi', 'sigma', 'k', 'phi'] param_names_latex = ['A', '\psi', '\sigma', 'k', 'phi'] def __init__(self, data, name='NewlingSource', version=None, tstep=1, flip=False): super(sncosmo.Source, self).__init__() # init for the super class self.name = name self.version = version self._model = {} self.lc = _removeDupes(data) wave = [] for b in np.unique(data['band']): wave = np.append(wave, sncosmo.get_bandpass(b).wave) wave = np.sort(np.unique(wave)) wave = np.append([.99*wave[0]], wave) wave = np.append(wave, [1.01*wave[-1]]) self._wave = wave self._phase = np.arange(-200, 500, 5) self._parameters = np.array([1., 0., 1., 1., -1.]) self._tstep = tstep def _param_flux(self, phase): # self._parameters[4]=np.min([np.min(phase),self._parameters[4]]) # self._parameters[4]=-self._parameters[3]*self._parameters[2] splPhase = phase[phase >= self._parameters[4]] splPhase = splPhase[splPhase <= ( self._parameters[3]*self._parameters[2]+self._parameters[4])] spl = CubicSpline([self._parameters[4], 0.], [0, self._parameters[1]]) Psi = np.zeros(len(phase)) for i in range(len(Psi)): if phase[i] in splPhase: Psi[i] = spl(phase[i]) elif phase[i] >= (self._parameters[3]*self._parameters[2]+self._parameters[4]): Psi[i] = self._parameters[1] # Psi[phase==splPhase]=spl(splPhase) # print(((phase+self._parameters[4])/self._parameters[2]),((phase+self._parameters[4])/self._parameters[2])**self._parameters[3],self._parameters,np.any(np.isnan(phase))) newlingFlux = (self._parameters[0]*((phase-self._parameters[4])/self._parameters[2])**self._parameters[3])*np.exp(-( phase-self._parameters[4])/self._parameters[2])*(self._parameters[3]**(-self._parameters[3]))*(np.exp(self._parameters[3]))+Psi # print(phase[0],self._parameters[4]) if np.inf in newlingFlux or np.any(np.isnan(newlingFlux)): # print(self._parameters) return(np.zeros(len(phase))) return(newlingFlux) def _flux(self, phase, wave): # if self._parameters[2]<=self._parameters[3]: # return np.ones((len(phase),len(wave)))*(-9999) mod = _param_to_source(self, phase, wave) return(mod._flux(phase, wave)) class KarpenkaSource(sncosmo.Source): _param_names = ['A', 'B', 't1', 'rise', 'fall'] param_names_latex = ['A', 'B', 't_1', 't_{rise}', 't_{fall}'] def __init__(self, data, name='KarpenkaSource', version=None, tstep=1): super(sncosmo.Source, self).__init__() # init for the super class self.name = name self.version = version self._model = {} self.lc = _removeDupes(data) wave = [] for b in np.unique(data['band']): wave = np.append(wave, sncosmo.get_bandpass(b).wave) wave = np.sort(np.unique(wave)) wave = np.append([.99*wave[0]], wave) wave = np.append(wave, [1.01*wave[-1]]) self._wave = wave self._phase = np.arange(-50, 150, 1) self._parameters = np.array([1., 0., 0., 1., 1., ]) self._tstep = tstep def _param_flux(self, phase): karpenkaFlux = (self._parameters[0]*(1+self._parameters[1]*((phase+self._parameters[2])**2)))*( np.exp(-phase/self._parameters[4])/(1+np.exp(-phase/self._parameters[3]))) if np.inf in karpenkaFlux or np.any(np.isnan(karpenkaFlux)): return(np.zeros(len(phase))) return(karpenkaFlux) def _flux(self, phase, wave): # if self._parameters[2]<=self._parameters[3]: # return np.ones((len(phase),len(wave)))*(-9999) mod = _param_to_source(self, phase, wave) return(mod._flux(phase, wave)) def _removeDupes(data): tempTable = Table(names=data.colnames, dtype=[ data.dtype[x] for x in data.colnames]) for b in np.unique(data['band']): dupes = [item for item, count in Counter( data['time'][data['band'] == b]).items() if count > 1] duped = [] temp = data[data['band'] == b] for row in temp: if row['time'] not in dupes: tempTable.add_row(row) else: if row['time'] in duped: continue row['flux'] = np.average(temp['flux'][temp['time'] == row['time']], weights=1./(temp['fluxerr'][temp['time'] == row['time']])**2) row['fluxerr'] = np.sqrt( 1./np.sum(1./(temp['fluxerr'][temp['time'] == row['time']])**2)) duped.append(row['time']) tempTable.add_row(row) return (tempTable)