import sncosmo
import os
from copy import deepcopy, copy
from collections import OrderedDict
from astropy.io import ascii
import numpy as np
from astropy.table import Table
from scipy.interpolate import interp1d
from sncosmo.utils import alias_map
from .util import _filedir_
from .curve_io import image_lc, MISN
from .ml import *
__all__ = ['createMultiplyImagedSN']
OBSERVATIONS_REQUIRED_ALIASES = ('time', 'band', 'zp', 'zpsys', 'gain',
'skynoise')
OBSERVATIONS_ALIASES = OrderedDict([
('time', set(['time', 'date', 'jd', 'mjd', 'mjdobs', 'mjd_obs'])),
('band', set(['band', 'bandpass', 'filter', 'flt'])),
('zp', set(['zp', 'zpt', 'zeropoint', 'zero_point'])),
('zpsys', set(['zpsys', 'zpmagsys', 'magsys'])),
('gain', set(['gain'])),
('skynoise', set(['skynoise']))
])
def _getAbsoluteDist():
absolutes = ascii.read(os.path.join(
_filedir_, 'sim', 'data', 'absolutes.ref'))
total = float(np.sum(absolutes['N'][absolutes['type'] != 'Ia']))
absDict = dict([])
for row in absolutes:
if row['type'] == 'Ia':
frac = 1
else:
frac = float(row['N'])/total
absDict[row['type']] = {
'dist': (row['mean'], row['sigma']), 'frac': frac}
return(absDict)
def _getAbsFromDist(dist):
mu, sigma = dist
return(np.random.normal(mu, sigma))
[docs]def createMultiplyImagedSN(
sourcename, snType, redshift, z_lens=None, telescopename='telescope',
objectName='object', time_delays=[10., 50.], magnifications=[2., 1.], sn_params={}, av_dists={},
numImages=2, cadence=5, epochs=30, clip_time=[-30, 150], bands=['F105W', 'F160W'], start_time=None,
gain=200., skynoiseRange=(1, 1.1), timeArr=None, zpsys='ab', zp=None, hostr_v=3.1, lensr_v=3.1,
microlensing_type=None, microlensing_params=[], ml_loc=[None, None],
dust_model='CCM89Dust', av_host=.3, av_lens=0, fix_luminosity=False,
minsnr=0.0, scatter=True, snrFunc=None):
"""
Generate a multiply-imaged SN light curve set, with user-specified time
delays and magnifications.
Parameters
----------
sourcename: :class:`~sncosmo.Source` or str
The model for the spectral evolution of the source. If a string
is given, it is used to retrieve a :class:`~sncosmo.Source` from
the registry.
snType : str
The classification of the supernova
redshift : float
Redshift of the source
z_lens : float
Redshift of the lens
telescopename : str
The name of the telescope used for observations
objectName : str
The name of the simulated supernova
time_delays : :class:`~list` of :class:`~float`
The relative time delays for the multiple images of the supernova. Must
be same length as numImages
magnifications : :class:`~list` of :class:`~float`
The relative magnifications for the multiple images of hte supernova. Must
be same length as numImages
sn_params: :class:`~dict`
A dictionary with SN model parameters as keys, and some sort of pdf or
other callable function that returns the model parameter.
av_dists: :class:`~dict`
A dictionary with 'host' or 'lens' as keys, and some sort of pdf or
other callable function that returns the relevant av parameter.
numImages : int
The number of images to simulate
cadence : float
The cadence of the simulated observations (if timeArr is not defined)
epochs : int
The number of simulated observations (if timeArr is not defined)
clip_time: :class:`~list`
Rest frame phase start and end time, will clip output table to these values.
bands : :class:`~list` of :class:`~sncosmo.Bandpass` or :class:`~str`
The bandpass(es) used for simulated observations
start_time: float
Start time for the leading image. If None, start will be the first value in
the time array, and the peak will be this ± the
relative time delay for the leading image.
gain : float
Gain of the telescope "obtaining" the simulated observations (if snrFunc
not defined)
skynoiseRange : :class:`~list` of :class:`~float`
The left and right bounds of sky noise used to define observational noise
(if snrFunc not defined)
timeArr : :class:`~list` of :class:`~float`
A list of times that define the simulated observation epochs
zpsys : str or :class:`~sncosmo.MagSystem`
The zero-point system used to define the photometry
zp : float or :class:`~list` of :class:`~float`
The zero-point used to define the photometry, list if simulating multiple
bandpasses. Then this list must be the same length as bands
hostr_v: float
The r_v parameter for the host.
lensr_v: float
The r_v parameter for the lens.
microlensing_type : str
If microlensing is to be included, defines whether it is
"AchromaticSplineMicrolensing" or "AchromaticMicrolensing"
microlensing_params: :class:`~numpy.array` or :class:`~list` of :class:`~int`
If using AchromaticSplineMicrolensing, then this params list must give
three values for [nanchor, sigmadm, nspl]. If using AchromaticMicrolensing,
then this must be a microcaustic defined by a 2D numpy array
ml_loc: :class:`~list`
List containing tuple locations of SN on microlensing map (random if Nones)
dust_model : str
The dust model to be used for simulations, see sncosmo documentation for options
av_host : float
The A<sub>V</sub> parameter for the simulated dust effect in the source plane
av_lens : float
The A<sub>V</sub> parameter for the simulated dust effect in the lens plane
fix_luminosity: bool
Set the luminosity of every SN to be the peak of the distribution
minsnr : float
A minimum SNR threshold for observations when defining uncertainty
scatter : bool
Boolean that decides whether Gaussian scatter is applied to simulated
observations
snrFunc : :class:`~scipy.interpolate.interp1d` or :class:`~dict`
An interpolation function that defines the signal to noise ratio (SNR)
as a function of magnitude in the AB system. Used to define the
observations instead of telescope parameters like gain and skynoise.
This can be a dictionary, so that it's different for each filter with
filters as keys and interpolation functions as values.
Returns
-------
MISN: :class:`~sntd.MISN`
A MISN object containing each of the multiply-imaged SN light curves
and the simulation parameters.
Examples
--------
>>> myMISN = sntd.createMultiplyImagedSN('salt2', 'Ia', 1.33,z_lens=.53, bands=['F110W'],
zp=[26.8], cadence=5., epochs=35.,skynoiseRange=(.001,.005),gain=70. , time_delays=[10., 78.],
magnifications=[7,3.5], objectName='My Type Ia SN', telescopename='HST',minsnr=5.0)
"""
if timeArr is not None:
times = timeArr
else:
times = np.linspace(0, int(cadence*epochs), int(epochs))
if start_time is not None:
times += start_time
leading_peak = times[0]
bandList = np.array([np.tile(b, len(times)) for b in bands]).flatten()
ms = sncosmo.get_magsystem(zpsys)
if zp is None:
zpList = [ms.band_flux_to_mag(1, b) for b in bandList]
elif isinstance(zp, (list, tuple)):
zpList = np.array([np.tile(z, len(times)) for z in zp]).flatten()
else:
zpList = [zp for i in range(len(bandList))]
# set up object to be filled by simulations
curve_obj = MISN(telescopename=telescopename, object_name=objectName)
curve_obj.bands = set(bandList)
# make sncosmo obs table
obstable = Table({'time': np.tile(times, len(bands)), 'band': bandList,
'zpsys': [zpsys.upper() for i in range(len(bandList))],
'zp': zpList,
'skynoise': np.random.uniform(
skynoiseRange[0], skynoiseRange[1], len(bandList)),
'gain': [gain for i in range(len(bandList))]})
absolutes = _getAbsoluteDist()
# Set up the dust_model extinction effects in the host galaxy and lens plane
# TODO allow additional dust screens, not in the host or lens plane?
# TODO sample from a prior for host and lens-plane dust A_V?
# TODO : allow different lens-plane dust_model for each image?
RV_lens = lensr_v
RV_host = hostr_v
dust_frames = []
dust_names = []
dust_effect_list = []
if dust_model and (av_lens or av_host or len(av_dists) > 0):
dust_effect = {'CCM89Dust': sncosmo.CCM89Dust,
'OD94Dust': sncosmo.OD94Dust,
'F99Dust': sncosmo.F99Dust}[dust_model]()
if av_host or 'host' in av_dists.keys():
dust_frames.append('rest')
dust_names.append('host')
dust_effect_list.append(dust_effect)
if av_lens or 'lens' in av_dists.keys():
dust_frames.append('free')
dust_names.append('lens')
dust_effect_list.append(dust_effect)
# The following is not needed, but may be resurrected when we allow user
# to provide additional dust screens.
# if not isinstance(dust_names, (list, tuple)):
# dust_names=[dust_names]
# if not isinstance(dust_frames, (list, tuple)):
# dust_frames=[dust_frames]
# The sncosmo Model is initially set up with only dust effects, because
# as currently constructed, dust has the same effect on all images.
# Microlensing effects are added separately for each SN image below.
model = sncosmo.Model(source=sourcename, effects=dust_effect_list,
effect_names=dust_names, effect_frames=dust_frames)
model.set(z=redshift)
# set absolute magnitude in b or r band based on literature
if snType in ['IIP', 'IIL', 'IIn']:
absBand = 'bessellb'
else:
absBand = 'bessellr'
if model.param_names[2] not in sn_params.keys():
if fix_luminosity:
model.set_source_peakabsmag(absolutes[snType]['dist'][0],
absBand, zpsys)
else:
model.set_source_peakabsmag(_getAbsFromDist(absolutes[snType]['dist']),
absBand, zpsys)
else:
model.parameters[2] = sn_params[model.param_names[2]]()
t0 = leading_peak
if snType == 'Ia':
x0 = model.get('x0')
params = {'z': redshift, 't0': t0, 'x0': x0}
if 'x1' not in sn_params.keys():
params['x1'] = np.random.normal(0., 1.)
else:
params['x1'] = sn_params['x1']()
if 'c' not in sn_params.keys():
params['c'] = np.random.normal(0., .1)
else:
params['c'] = sn_params['c']()
else:
amp = model.get('amplitude')
params = {'z': redshift, 't0': t0, 'amplitude': amp}
model.set(**params)
if av_host or 'host' in av_dists.keys():
if 'host' in av_dists.keys():
av_host = av_dists['host']()
ebv_host = av_host/RV_host
model.set(hostebv=ebv_host, hostr_v=RV_host)
else:
ebv_host = 0
if av_lens or 'lens' in av_dists.keys():
if z_lens is None:
print('No z_lens set, assuming half of source z...')
z_lens = redshift / 2.
if 'lens' in av_dists.keys():
av_lens = av_dists['lens']()
ebv_lens = av_lens/RV_lens
model.set(lensz=z_lens, lensebv=ebv_lens, lensr_v=RV_lens)
else:
ebv_lens = 0
# Step through each of the multiple SN images, adding time delays,
# macro magnifications, and microlensing effects.
for imnum, td, mu in zip(range(numImages), time_delays, magnifications):
# Make a separate model_i for each SN image, so that lensing effects
# can be reflected in the model_i parameters and propagate correctly
# into realize_lcs for flux uncertainties
model_i = deepcopy(model)
model_i._flux = _mlFlux
params_i = deepcopy(params)
if snType == 'Ia':
params_i['x0'] *= mu
else:
params_i['amplitude'] *= mu
params_i['t0'] += td
if microlensing_type is not None:
# add microlensing effect
if 'spline' in microlensing_type.lower():
# Initiate a spline-based mock ml effect (at this point,
# a set of random splines is generated and the microlensing
# magnification curve is fixed)
nanchor, sigmadm, nspl = microlensing_params
if microlensing_type.lower().startswith('achromatic'):
ml_spline_func = sncosmo.AchromaticSplineMicrolensing
else:
ml_spline_func = ChromaticSplineMicrolensing
ml_effect = ml_spline_func(nanchor=nanchor, sigmadm=sigmadm,
nspl=nspl)
else:
# get magnification curve from the defined microcaustic
mlTime = np.arange(
0, times[-1]/(1+redshift)-model_i._source._phase[0]+5, .1)
time, dmag = microcaustic_field_to_curve(
microlensing_params, mlTime, z_lens, redshift, loc=ml_loc[imnum])
dmag /= np.mean(dmag) # to remove overall magnification
ml_effect = AchromaticMicrolensing(
time+model_i._source._phase[0], dmag, magformat='multiply')
model_i.add_effect(ml_effect, 'microlensing', 'rest')
else:
ml_effect = None
# Generate the simulated SN light curve observations, make a `curve`
# object, and store the simulation metadata
model_i.set(**params_i)
table_i = realize_lcs(
obstable, model_i, [params_i],
trim_observations=True, scatter=scatter, thresh=minsnr, snrFunc=snrFunc)
tried = 0
while (len(table_i) == 0 or len(table_i[0]) < numImages) and tried < 50:
table_i = realize_lcs(
obstable, model_i, [params_i],
trim_observations=True, scatter=scatter, thresh=minsnr, snrFunc=snrFunc)
tried += 1
if tried == 50:
# this arbitrary catch is here in case your minsnr and observation parameters
# result in a "non-detection"
print("Your survey parameters detected no supernovae.")
return None
table_i = table_i[0]
if timeArr is None:
table_i = table_i[table_i['time'] < model_i.get(
't0')+clip_time[1]*(1+model_i.get('z'))]
table_i = table_i[table_i['time'] > model_i.get(
't0')+clip_time[0]*(1+model_i.get('z'))]
# create is curve with all parameters and add it to the overall MISN object from above
curve_i = image_lc()
curve_i.object_name = None
curve_i.zpsys = zpsys
curve_i.table = deepcopy(table_i)
curve_i.bands = list(set(table_i['band']))
curve_i.simMeta = deepcopy(table_i.meta)
curve_i.simMeta['sourcez'] = redshift
curve_i.simMeta['model'] = copy(model_i)
curve_i.simMeta['hostebv'] = ebv_host
curve_i.simMeta['lensebv'] = ebv_lens
curve_i.simMeta['lensz'] = z_lens
curve_i.simMeta['mu'] = mu
curve_i.simMeta['td'] = td
curve_i.simMeta['microlensing'] = ml_effect
curve_i.simMeta['microlensing_type'] = microlensing_type
if microlensing_type == 'AchromaticSplineMicrolensing':
curve_i.simMeta['microlensing_params'] = microlensing_params
elif microlensing_type is not None:
curve_i.simMeta['microlensing_params'] = interp1d(
time+model_i._source._phase[0], dmag, fill_value=1,bounds_error=False)
curve_obj.add_image_lc(curve_i)
# Store the un-lensed model as a component of the lensed SN object.
model.set(**params)
curve_obj.model = model
return(curve_obj)
def realize_lcs(observations, model, params, thresh=None,
trim_observations=False, scatter=True, snrFunc=None):
"""***A copy of SNCosmo's function, just to add a SNR function
Realize data for a set of SNe given a set of observations.
Parameters
----------
observations : :class:`~astropy.table.Table` or :class:`~numpy.ndarray`
Table of observations. Must contain the following column names:
``band``, ``time``, ``zp``, ``zpsys``, ``gain``, ``skynoise``.
model : `sncosmo.Model`
The model to use in the simulation.
params : :class:`~list` (or generator) of dict
List of parameters to feed to the model for realizing each light curve.
thresh : float, optional
If given, light curves are skipped (not returned) if none of the data
points have signal-to-noise greater than ``thresh``.
trim_observations : bool, optional
If True, only observations with times between
``model.mintime()`` and ``model.maxtime()`` are included in
result table for each SN. Default is False.
scatter : bool, optional
If True, the ``flux`` value of the realized data is calculated by
adding a random number drawn from a Normal Distribution with a
standard deviation equal to the ``fluxerror`` of the observation to
the bandflux value of the observation calculated from model. Default
is True.
Returns
-------
sne : list of :class:`~astropy.table.Table`
Table of realized data for each item in ``params``.
Notes
-----
``skynoise`` is the image background contribution to the flux measurement
error (in units corresponding to the specified zeropoint and zeropoint
system). To get the error on a given measurement, ``skynoise`` is added
in quadrature to the photon noise from the source.
It is left up to the user to calculate ``skynoise`` as they see fit as the
details depend on how photometry is done and possibly how the PSF is
is modeled. As a simple example, assuming a Gaussian PSF, and perfect
PSF photometry, ``skynoise`` would be ``4 * pi * sigma_PSF * sigma_pixel``
where ``sigma_PSF`` is the standard deviation of the PSF in pixels and
``sigma_pixel`` is the background noise in a single pixel in counts.
"""
RESULT_COLNAMES = ('time', 'band', 'flux', 'fluxerr', 'zp', 'zpsys')
lcs = []
# Copy model so we don't mess up the user's model.
model = copy(model)
# get observations as a Table
if not isinstance(observations, Table):
if isinstance(observations, np.ndarray):
observations = Table(observations)
else:
raise ValueError("observations not understood")
# map column name aliases
colname = alias_map(observations.colnames, OBSERVATIONS_ALIASES,
required=OBSERVATIONS_REQUIRED_ALIASES)
# result dtype used when there are no observations
band_dtype = observations[colname['band']].dtype
zpsys_dtype = observations[colname['zpsys']].dtype
result_dtype = ('f8', band_dtype, 'f8', 'f8', 'f8', zpsys_dtype)
for p in params:
model.set(**p)
# Select times for output that fall within tmin amd tmax of the model
if trim_observations:
mask = ((observations[colname['time']] > model.mintime()) &
(observations[colname['time']] < model.maxtime()))
snobs = observations[mask]
else:
snobs = observations
# explicitly detect no observations and add an empty table
if len(snobs) == 0:
if thresh is None:
lcs.append(Table(names=RESULT_COLNAMES,
dtype=result_dtype, meta=p))
continue
flux = model.bandflux(snobs[colname['band']],
snobs[colname['time']],
zp=snobs[colname['zp']],
zpsys=snobs[colname['zpsys']])
if snrFunc is not None:
if isinstance(snrFunc, dict):
fluxerr = np.ones(len(flux))
for b in snobs[colname['band']]:
inds = np.where(snobs[colname['band']] == b)[0]
fluxerr[inds] = np.abs(
flux[inds]/snrFunc[b](-2.5*np.log10(flux[inds])+snobs[colname['zp']][inds]))
else:
fluxerr = np.abs(
flux/snrFunc(-2.5*np.log10(flux)+snobs[colname['zp']]))
else:
fluxerr = np.sqrt(snobs[colname['skynoise']]**2 +
np.abs(flux) / snobs[colname['gain']])
# Scatter fluxes by the fluxerr
# np.atleast_1d is necessary here because of an apparent bug in
# np.random.normal: when the inputs are both length 1 arrays,
# the output is a Python float!
if scatter:
flux = np.atleast_1d(np.random.normal(flux, fluxerr))
# Check if any of the fluxes are significant
if thresh is not None:
if not np.any(flux/fluxerr > thresh):
continue
else:
inds = np.where(flux/fluxerr > thresh)[0]
data = [snobs[colname['time']][inds], snobs[colname['band']][inds], flux[inds], fluxerr[inds],
snobs[colname['zp']][inds], snobs[colname['zpsys']][inds]]
lcs.append(Table(data, names=RESULT_COLNAMES, meta=p))
return lcs