Source code for sntd.curve_io

import numpy as np
import os
import string
import sncosmo
import sys
import corner
import math
from collections import OrderedDict as odict
from astropy.io import ascii
from astropy.table import Table, vstack, Column
from scipy.stats import mode
from copy import deepcopy, copy
import matplotlib.pyplot as plt
from sncosmo.snanaio import read_snana_fits
from mpl_toolkits.axes_grid1.axes_divider import make_axes_locatable
import matplotlib.gridspec as gridspec

try:
    import pickle
except:
    import cPickle

from .util import *

__all__ = ['image_lc', 'MISN', 'read_data', 'write_data', 'table_factory']

_comment_char = {'#', '='}
_meta__ = {'@', '$', '%', '!', '&'}


def _sntd_deepcopy(obj):
    newMISN = MISN()
    for k in obj:
        setattr(newMISN, k, obj[k])
    return(newMISN)


[docs]class image_lc(dict): """ SNTD class that describes each image light curve of a MISN """ def __init__(self, zpsys='AB'): """ Constructor for curve class """ # todo populate param documentation super(image_lc, self).__init__() self.meta = {'info': ''} """@type: :class:`dict` The metadata for the MISN object, intialized with an empty "info" key value pair. It's populated by added _metachar__ characters into the header of your data file. """ self.table = None """@type: :class:`astropy.table.Table` A table containing the data, used for SNCosmo functions, etc. """ self.bands = [] """@type: str band names used """ self.zpsys = zpsys """@type: str The zero-point system for this curve object """ self.simMeta = dict([]) """@type: :class:`dict` A dictionary containing simulation metadata if this is a simulated curve object""" self.fits = None """@type: :class:`~sntd.fitting.newDict` Contains fit information from fit_data""" self.microlensing = newDict() # these three functions allow you to access the MISN via "dot" notation __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ __getattr__ = dict.__getitem__ def __getstate__(self): """ A function necessary for pickling :return: self """ return self def __setstate__(self, d): """ A function necessary for pickling :param d: A value :return: self.__dict__ """ self.__dict__ = d @property def chisq(self): """ A property that calculates the chisq based on your best fit model. """ if self.fits is None: return None model_observations = self.fits.model.bandflux(self.table['band'], self.table['time'], zp=self.table['zp'], zpsys=self.table['zpsys']) chi = (self.table['flux']-model_observations)/self.table['fluxerr'] chisq = np.dot(chi, chi) return chisq @property def reduced_chisq(self): """ A property that calculates the reduced chisq based on your best fit model. """ if self.fits is None: return None return self.chisq/(len(self.table)+len(self.fits.res.vparam_names)-1) def __str__(self): """ A replacement for the print method of the class, so that when you run print(image_lc()), this is how it shows up. """ print('SNTD MISN Image') print('------------------') print('Number of bands: %d' % len(self.bands)) print('Bands: {}'.format(self.bands)) print('Date Range: %.5f->%.5f' % ( min(self.table[get_default_prop_name('time')]), max(self.table[get_default_prop_name('time')]))) print('Number of points: %d' % len(self.table)) if self.simMeta.keys(): print('') print('Metadata:') print('\n'.join(' {}:{}'.format(*t) for t in zip(self.simMeta.keys( ), self.simMeta.values()) if isinstance(t[1], (str, float, int)))) if self.fits is not None: print('Best fit model: %s (reduced chisq = %.2f)'%(self.fits.model._source.name,self.reduced_chisq)) print('Parameters:') for i in range(len(self.fits.model.parameters)): print(' %s: %.2f'%(self.fits.model.param_names[i], self.fits.model.parameters[i])) return ''
[docs]class MISN(dict): """ The main object for SNTD. This organizes a MISN, containing the multiple light curves in self.images, all the fits, etc. """ def __deepcopy__(self, memo): return deepcopy(dict(self)) def __init__(self, telescopename="Unknown", object_name="Unknown"): """ Constructor for MISN class. Inherits from the dictionary class, and is the main object of organization used by SNTD. Parameters ---------- telescopename : str Name of the telescope that the data were gathered from object_name : str Name of object of interest Returns ------- MISN : :class:`~sntd.MISN` """ super(MISN, self).__init__() # init for the super class self.meta = {'info': ''} """@type: :class:`dict` The metadata for the MISN object, intialized with an empty "info" key value pair. It's populated by added _metachar__ characters into the header of your data file. """ self.bands = set() """ @type: :class:`list` The list of bands contained inside this MISN """ self.table = None """ @type: :class:`~astropy.table.Table` The astropy table containing all of the data in your data file """ self.telescopename = telescopename """ @type: str Name of the telescope that the data were gathered from """ self.object = object_name """ @type: str Object of interest """ self.images = dict([]) self.parallel = image_lc() self.series = image_lc() self.color = image_lc() self.constants = dict([]) # these three functions allow you to access the MISN via "dot" notation __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ __getattr__ = dict.__getitem__ def __getstate__(self): """ A function necessary for pickling :return: self """ return self def __setstate__(self, d): """ A function necessary for pickling :param d: A value :return: self.__dict__ """ self.__dict__ = d def __str__(self): """ A replacement for the print method of the class, so that when you run print(MISN()), this is how it shows up. """ print('SNTD MISN') print('------------------') print('Telescope: %s' % self.telescopename) print('Object: %s' % self.object) print('Number of bands: %d' % len(self.bands)) print('') for c in np.sort([x for x in self.images.keys()]): print('------------------') print('Image: %s:' % c) print('Bands: {}'.format(self.images[c].bands)) print('Date Range: %.5f->%.5f' % ( min(self.images[c].table[get_default_prop_name('time')]), max(self.images[c].table[get_default_prop_name('time')]))) print('Number of points: %d' % len(self.images[c].table)) if self.images[c].simMeta.keys(): print('') print('Metadata:') print('\n'.join(' {}:{}'.format(*t) for t in zip(self.images[c].simMeta.keys( ), self.images[c].simMeta.values()) if isinstance(t[1], (str, float, int)))) return '------------------' @property def series_chisq(self): """ A property that calculates the chisq based on your best fit series model. """ if self.series is None: return None model_observations = self.series.fits.model.bandflux(self.series.table['band'], self.series.table['time'], zp=self.series.table['zp'], zpsys=self.series.table['zpsys']) chi = (self.series.table['flux']-model_observations)/self.series.table['fluxerr'] chisq = np.dot(chi, chi) return chisq @property def series_reduced_chisq(self): """ A property that calculates the reduced chisq based on your best fit series model. """ if self.series is None: return None return self.chisq/(len(self.series.table)+len(self.series.fits.res.vparam_names)-1) @property def color_chisq(self): """ A property that calculates the chisq based on your best fit color model. """ if self.color is None: return None model_observations = self.color.fits.model.bandflux(self.color.table['band'], self.color.table['time'], zp=self.color.table['zp'], zpsys=self.color.table['zpsys']) chi = (self.color.table['flux']-model_observations)/self.color.table['fluxerr'] chisq = np.dot(chi, chi) return chisq @property def color_reduced_chisq(self): """ A property that calculates the reduced chisq based on your best fit color model. """ if self.color is None: return None return self.chisq/(len(self.color.table)+len(self.color.fits.res.vparam_names)-1)
[docs] def add_image_lc(self, myImage, key=None): """Adds an image_lc object to the existing MISN (i.e. adds an image to a MISN) Parameters ---------- myImage: :class:`~sntd.image_lc` The curve to add to self. key: str The key you want to save this as, default is 'image_1,image_2,etc.' Returns ------- self: :class:`sntd.curve_io.MISN` """ self.bands.update([x for x in myImage.bands if x not in self.bands]) if key is None: myImage.object = 'image_'+str(len(self.images)+1) else: myImage.object = key if 'image' not in myImage.table.colnames: myImage.table.add_column( Column([myImage.object for i in range(len(myImage.table))], name='image')) if 'zpsys' not in myImage.table.colnames: print('Assuming AB magsystem...') myImage.table['zpsys'] = 'AB' else: myImage.zpsys = myImage.table['zpsys'][0] tempMISN = copy(myImage) self.images[myImage.object] = tempMISN if self.table: self.table = vstack([tempMISN.table]) else: self.table = copy(tempMISN.table) return(self)
[docs] def combine_curves(self, time_delays=None, magnifications=None, referenceImage='image_1', static=False, model=None, minsnr=0): """ Takes the multiple images in self.images and combines the data into a single light curve using defined time delays and magnifications or best (quick) guesses. Parameters ---------- time_delays: :class:`dict` Dictionary with image names as keys and relative time delays as values (e.g. {'image_1':0,'image_2':20}). Guessed if None. magnifications: :class:`dict` Dictionary with image names as keys and relative magnifications as values (e.g. {'image_1':0,'image_2':20}). Guessed if None. referenceImage: str The image you want to be the reference (e.g. image_1, image_2, etc.) ignore_images: :class:`~list` List of images you do not want to include in the color curve. static: bool Make the color curve, don't shift the data model: :class:`~sncosmo.Model` If you want to use an sncosmo Model (and the guess_t0_amplitude method) to guess time delays minsnr: float Cut data that don't meet this threshold before making the color curve. Returns ------- self: :class:`sntd.curve_io.MISN` """ if len(self.images) < 2: print("Not enough curves to combine!") return(self) if time_delays is None: if model is not None: time_delays = {} magnifications = {} model = sncosmo.Model(model) if isinstance( model, str) else model ref_t0, ref_amp = sncosmo.fitting.guess_t0_and_amplitude(sncosmo.photdata.photometric_data( self.images[referenceImage].table), model, minsnr) self.series.meta['reft0'] = ref_t0 self.series.meta['refamp'] = ref_amp time_delays[referenceImage] = 0 magnifications[referenceImage] = 1 for k in self.images.keys(): if k == referenceImage: continue guess_t0, guess_amp = sncosmo.fitting.guess_t0_and_amplitude(sncosmo.photdata.photometric_data( self.images[k].table), model, minsnr) time_delays[k] = guess_t0-ref_t0 magnifications[k] = guess_amp/ref_amp else: # TODO fix these guessing functions time_delays = guess_time_delays(self, referenceImage) if magnifications is None: magnifications = guess_magnifications(self, referenceImage) self.series.table = Table(names=self.table.colnames, dtype=[ self.table.dtype[x] for x in self.table.colnames]) for k in np.sort(list(self.images.keys())): temp = deepcopy(self.images[k].table) if not static: temp['time'] -= time_delays[k] temp['flux'] /= magnifications[k] temp.meta = dict([]) self.series.table = vstack([self.series.table, temp]) self.series.table.sort('time') self.series.bands = self.bands self.series.meta['td'] = { k: float(time_delays[k]) for k in time_delays.keys()} self.series.meta['mu'] = { k: float(magnifications[k]) for k in magnifications.keys()} return(self)
[docs] def color_table(self, band1s, band2s, time_delays=None, referenceImage='image_1', ignore_images=[], static=False, model=None, minsnr=0.0): """ Takes the multiple images in self.images and combines the data into a single color curve using defined time delays and magnifications or best (quick) guesses. Parameters ---------- band1s: str or list The first band(s) for color curve(s) band2s: str or list The second band(s) for color curve(s) time_delays: :class:`dict` Dictionary with image names as keys and relative time delays as values (e.g. {'image_1':0,'image_2':20}). Guessed if None. referenceImage: str The image you want to be the reference (e.g. image_1, image_2, etc.) ignore_images: :class:`~list` List of images you do not want to include in the color curve. static: bool Make the color curve, don't shift the data model: :class:`~sncosmo.Model` If you want to use an sncosmo Model (and the guess_t0_amplitude method) to guess time delays minsnr: float Cut data that don't meet this threshold before making the color curve. Returns ------- self: :class:`sntd.curve_io.MISN` """ ignore_images = list(ignore_images) if not isinstance( ignore_images, (list, tuple)) else ignore_images names = ['time', 'image', 'zpsys'] dtype = [self.table.dtype[x] for x in names] names = np.append(names, np.append(np.array([[band1+'-'+band2, band1+'-'+band2+'_err'] for band1, band2 in zip(band1s, band2s)]).flatten(), np.unique([['flux_%s' % band1, 'fluxerr_%s' % band1, 'flux_%s' % band2, 'fluxerr_%s' % band2, 'zp_%s' % band1, 'zp_%s' % band2] for band1, band2 in zip(band1s, band2s)]).flatten())) dtype = np.append(dtype, [dtype[0]]*(len(names)-len(dtype))) self.color.table = Table(names=names, dtype=dtype) if time_delays is None: if model is not None: time_delays = {} model = sncosmo.Model(model) if isinstance( model, str) else model ref_t0, ref_amp = sncosmo.fitting.guess_t0_and_amplitude(sncosmo.photdata.photometric_data( self.images[referenceImage].table), model, minsnr) self.color.meta['reft0'] = ref_t0 time_delays[referenceImage] = 0 for k in self.images.keys(): if k == referenceImage: continue guess_t0, guess_amp = sncosmo.fitting.guess_t0_and_amplitude(sncosmo.photdata.photometric_data( self.images[k].table), model, minsnr) time_delays[k] = guess_t0-ref_t0 else: # TODO fix these guessing functions time_delays = guess_time_delays(self, referenceImage) self.color.meta['td'] = time_delays for im in [x for x in self.images.keys() if x not in ignore_images]: for band1, band2 in zip(band1s, band2s): to_add = {} temp2 = deepcopy( self.images[im].table[self.images[im].table['band'] == band2]) temp1 = deepcopy( self.images[im].table[self.images[im].table['band'] == band1]) temp1 = temp1[temp1['flux'] > 0] temp2 = temp2[temp2['flux'] > 0] temp1 = temp1[temp1['flux']/temp1['fluxerr'] > minsnr] temp2 = temp2[temp2['flux']/temp2['fluxerr'] > minsnr] if not static: temp1['time'] -= time_delays[im] temp2['time'] -= time_delays[im] temp2['mag'] = -2.5*np.log10(temp2['flux'])+temp2['zp'] temp2['magerr'] = 1.0857*temp2['fluxerr']/temp2['flux'] temp1['mag'] = -2.5*np.log10(temp1['flux'])+temp1['zp'] temp1['magerr'] = 1.0857*temp1['fluxerr']/temp1['flux'] temp1 = temp1[~np.isnan(temp1['mag'])] temp2 = temp2[~np.isnan(temp2['mag'])] temp1_remove = [i for i in range( len(temp1)) if temp1['time'][i] not in temp2['time']] temp1.remove_rows(temp1_remove) temp2_remove = [i for i in range( len(temp2)) if temp2['time'][i] not in temp1['time']] temp2.remove_rows(temp2_remove) temp1['magerr'] = np.sqrt( temp2['magerr']**2+temp1['magerr']**2) temp1['mag'] -= temp2['mag'] to_add['time'] = temp1['time'] to_add['image'] = [im]*len(temp1) to_add['zpsys'] = temp1['zpsys'] to_add[band1+'-'+band2] = temp1['mag'] to_add[band1+'-'+band2+'_err'] = temp1['magerr'] to_add['flux_%s' % band1] = temp1['flux'] to_add['fluxerr_%s' % band1] = temp1['fluxerr'] to_add['flux_%s' % band2] = temp2['flux'] to_add['fluxerr_%s' % band2] = temp2['fluxerr'] to_add['zp_%s' % band1] = temp1['zp'] to_add['zp_%s' % band2] = temp2['zp'] for col in [x for x in names if x not in to_add.keys()]: to_add[col] = [np.nan]*len(temp1) for i in range(len(temp1)): self.color.table.add_row( {k: to_add[k][i] for k in to_add.keys()}) self.color.table.meta = {} self.color.table.sort('time') return(self)
[docs] def clip_data(self, im, minsnr=-np.inf, mintime=-np.inf, maxtime=np.inf, peak=0, remove_bands=[], max_cadence=None, rm_NaN=True): """ Clips the data of an image based on various properties. Parameters ---------- im: str The image to clip minsnr: float Clip based on a minimum SNR mintime: float Clip based on a minimum time (observer frame relative to peak) maxtime: float Clip based on a maximum time (observer frame relative to peak) peak: float Used in conjunction with min/max time remove_bands: list List of bands to remove from the light curve max_cadence: float Clips data so that points are spread by at least max_cadence rm_NaN: bool If True, removed NaNs from the data. """ self.images[im].table = self.images[im].table[~np.isnan( self.images[im].table['flux'])] self.images[im].table = self.images[im].table[~np.isnan( self.images[im].table['fluxerr'])] self.images[im].table = self.images[im].table[~np.isnan( self.images[im].table['time'])] self.images[im].table = self.images[im].table[self.images[im].table['flux'] / self.images[im].table['fluxerr'] > minsnr] self.images[im].table = self.images[im].table[self.images[im].table['time'] > mintime+peak] self.images[im].table = self.images[im].table[self.images[im].table['time'] < maxtime+peak] for b in remove_bands: self.images[im].table = self.images[im].table[self.images[im].table['band'] != b] if max_cadence is not None and isinstance(max_cadence, (int, float)): to_remove = [] for b in np.unique(self.images[im].table['band']): binds = np.where(self.images[im].table['band'] == b)[0] t = self.images[im].table['time'][binds[0]] for i in range(1, len(binds)): if self.images[im].table[binds[i]]['time'] < t+max_cadence: to_remove.append(binds[i]) else: t = self.images[im].table[binds[i]]['time'] self.images[im].table.remove_rows(to_remove)
[docs] def quality_check(self, min_n_bands=1, min_n_points_per_band=1, clip=False, method='parallel'): """ Checks the images of a SN to make sure they pass minimum thresholds for fitting. Parameters ---------- min_n_bands: int The minimum number of bands needed to pass min_n_points_per_band: int The minimum number of bands in a given band to pass clip: bool If True, "bad" bands are clipped in place method: str Should be parallel, series, or color. Checks all images (parallel), or the series table (series), or the color table (color) Returns ------- self: :class:`sntd.curve_io.MISN` """ if method == 'parallel': good_bands = [] for im in self.images.keys(): ngood_bands = 0 for b in np.unique(self.images[im].table['band']): temp_n_for_b = len( self.images[im].table[self.images[im].table['band'] == b]) if temp_n_for_b < min_n_points_per_band: if clip: self.images[im].table = self.images[im].table[self.images[im].table['band'] != b] else: ngood_bands += 1 good_bands.append(b) if ngood_bands < min_n_bands: return False self.bands = np.unique(good_bands) elif method == 'series': ngood_bands = 0 for b in np.unique(self.series.table['band']): temp_n_for_b = len( self.series.table[self.series.table['band'] == b]) if temp_n_for_b < min_n_points_per_band: if clip: self.series.table = self.series.table[self.series.table['band'] != b] else: ngood_bands += 1 if ngood_bands < min_n_bands: return False elif method == 'color': if len(self.color.table) < min_n_points_per_band: return False else: print('method unknown for quality_check') sys.exit(1) return True
[docs] def plot_microlensing_fit(self,show_all_samples = False): """ Shows a plot of the best-fit microlensing curve from GPR. Parameters ---------- show_all_samples: bool If True, show all GPR samples. Returns ------- figure object: :class:`~matplotlib.pyplot.figure` """ if len(self.images['image_1'].microlensing) == 0: print('Have not yet run microlensing fit.') return fig=plt.figure(figsize = (12,12)) gs = gridspec.GridSpec(math.ceil(len(self.images.keys())/2),2) i = 0 j = 0 nimage = 1 axes = [] row_ax = [] for im in self.images.keys(): if j==2: i += 1 j = 0 axes.append(row_ax) row_ax = [] temp_ax = fig.add_subplot(gs[i,j]) if show_all_samples: for k in range(self.images[im].microlensing.samples_y.shape[1]): if k == 0: temp_ax.plot(self.images[im].microlensing.micro_x, self.images[im].microlensing.samples_y[:, k], alpha = .1, label = 'Posterior Samples', color = 'b') else: temp_ax.plot(self.images[im].microlensing.micro_x, self.images[im].microlensing.samples_y[:, k], alpha = .1, color = 'b') temp_ax.errorbar(self.images[im].microlensing.resid_x, self.images[im].microlensing.resid_y, self.images[im].microlensing.resid_err, fmt = 'r.', markersize = 10, label = 'Observation Residuals') temp_ax.plot(self.images[im].microlensing.micro_x, self.images[im].microlensing.micro_y - 3 * self.images[im].microlensing.sigma, '--g') temp_ax.plot(self.images[im].microlensing.micro_x, self.images[im].microlensing.micro_y + 3 * self.images[im].microlensing.sigma, '--g', label = r'$3\sigma$ Bounds') temp_ax.plot(self.images[im].microlensing.micro_x, self.images[im].microlensing.micro_y, 'k-.', label = "GPR Prediction") pred_range = np.array([np.min(self.images[im].microlensing.micro_y - 3 * \ self.images[im].microlensing.sigma), np.max(self.images[im].microlensing.micro_y + 3 * \ self.images[im].microlensing.sigma)]) temp_ax.set_ylim(pred_range) if 'simMeta' in self.images[im].keys(): z = self.images[im].simMeta['sourcez'] time_model = np.arange(self.images[im].table['time'].min(), self.images[im].table['time'].max(), 0.1) time_model -= self.images[im].fits.model.get('t0') true_micro = self.images[im].simMeta['microlensing_params']( time_model/(1+z)) true_range = np.array([np.min(true_micro),np.max(true_micro)]) temp_ax.plot(time_model, true_micro, 'k', label = r'True $\mu$-Lensing') temp_ax.set_ylim([np.min([pred_range[0],true_range[0]]), np.max([pred_range[1],true_range[1]])]) temp_ax.legend(fontsize = 10) if j == 0: temp_ax.set_ylabel('Magnification',fontsize=20) if i == math.ceil(len(self.images.keys())/2): temp_ax.set_xlabel('Observer Days (-%.1f)'%self.images[im].fits.model.get('t0'), fontsize=20) temp_ax.set_title('Image %i'%nimage,fontsize=15) row_ax.append(temp_ax) j += 1 nimage += 1 plt.tight_layout() return fig,axes
[docs] def plot_fit(self, method='parallel', par_image=None): """ Makes a corner plot based on one of the fitting methods Parameters ---------- method: str parallel, series, or color Returns ------- figure object: :class:`~matplotlib.pyplot.figure` """ if method == 'parallel': if par_image is None: par_image = self.parallel.fitOrder[0] res = self.images[par_image].fits.res samples = res.samples try: truths = [self.images[par_image].simMeta['model'].get( x) for x in res.vparam_names] except: truths = None elif method == 'series': res = self.series.fits.res samples = res.samples try: truths = [] for p in res.vparam_names: if p.startswith('mu_'): im = [x for x in self.images.keys() if x[-1] == p[-1]][0] truths.append(self.images[im].simMeta['model'].parameters[2] / self.images[self.series.refImage].simMeta['model'].parameters[2]) elif p.startswith('dt_'): im = [x for x in self.images.keys() if x[-1] == p[-1]][0] truths.append(self.images[im].simMeta['model'].get('t0') - self.images[self.series.refImage].simMeta['model'].get('t0')) else: im = self.series.refImage truths.append(self.images[im].simMeta['model'].get(p)) except: truths = None else: res = self.color.fits.res samples = res.samples try: truths = [] for p in res.vparam_names: if p.startswith('dt_'): im = [x for x in self.images.keys() if x[-1] == p[-1]][0] truths.append(self.images[im].simMeta['model'].get('t0') - self.images[self.color.refImage].simMeta['model'].get('t0')) else: im = list(self.images.keys())[0] truths.append( self.images[self.color.refImage].simMeta['model'].get(p)) except: truths = None fig = corner.corner( samples, weights=res.weights, labels=res.vparam_names, truths=truths, quantiles=(0.16, .5, 0.84), bins=30, color='k', show_titles=True, title_fmt='.2f', smooth1d=False, smooth=True, fill_contours=True, plot_contours=True, plot_density=True, use_mathtext=True, title_kwargs={"fontsize": 11}, label_kwargs={'fontsize': 16}) for ax in fig.get_axes(): ax.tick_params(axis='both', labelsize=14) return(fig)
[docs] def plot_object(self, bands='all', savefig=False, plot3D=False, filename='mySN', orientation='horizontal', method='separate', showModel=False, showFit=False, showMicro=False, plot_unresolved=False,**kwargs): """Plot the multiply-imaged SN light curves and show/save to a file. Each subplot shows a single-band light curve, for all images of the SN. Parameters ---------- bands : str or :class:`~list` of :class:`~str` 'all' = plot all bands; or provide a list of bands to plot savefig : bool boolean to save or not save plot plot3D : bool boolean to plot in 3D with plotly filename : str if savefig is True, this is the output filename orientation : str 'horizontal' = all subplots are in a single row 'vertical' = all subplots are in a single column method : str Plots the result of separate, series, or color curve method showModel : bool If true, the underlying model before microlensing is plotted as well showFit : bool If true and it exists, the best fit model from self.images['image'].fits.model is plotted showMicro : bool If true and it exists, the simulated microlensing is plotted as well plot_unresolved: bool If true the individual models of an unresolved fit will be plotted Returns ------- figure : `~matplotlib.pyplot.figure` """ colors = ['r', 'g', 'b', 'k', 'm'] colors3d = ['red', 'green', 'blue', 'black', 'purple'] i = 0 leg = [] if method == 'series': if isinstance(bands, str) and bands == 'all': bands = set(self.series.table['band']) nbands = len(bands) if orientation.startswith('v'): ncols = 1 nrows = nbands else: ncols = nbands nrows = 1 fig = None axlist = None if plot3D: try: import plotly.graph_objects as go from plotly.subplots import make_subplots fig = go.FigureWidget(make_subplots(rows=nrows, cols=ncols, subplot_titles=list(bands), specs=[[{'type': 'scatter3d'}]*ncols]*nrows)) n3dPlots = ncols*nrows axlist = [None]*len(bands) except RuntimeError: print( 'Asked for 3D plot but do not have plotly installed, switching to 2D...') fig = None plot3D = False if not plot3D or fig is None: fig, axlist = plt.subplots(nrows=nrows, ncols=ncols, sharex=True, sharey=False, figsize=(10, 10)) if nbands == 1: axlist = [axlist] for lc in np.sort([x for x in self.images.keys()]): temp = self.series.table[self.series.table['image'] == lc] if nrows == 1: ccol = 0 crow = 1 else: crow = 0 ccol = 1 try: delay = self.series.time_delays[lc] delayerr = self.series.time_delay_errors[lc] mag = self.series.magnifications[lc] magerr = self.series.magnification_errors[lc] except: delay = 0 delayerr = [0, 0] mag = 1 magerr = [0, 0] for b, ax in zip(bands, axlist): if nrows == 1: ccol += 1 else: crow += 1 mintime = np.min( self.series.table['time'][self.series.table['band'] == b]) maxtime = np.max( self.series.table['time'][self.series.table['band'] == b]) if b == list(bands)[0]: if plot3D: fig.add_trace(go.Scatter3d(x=temp['time'][temp['band'] == b]+delay, y=temp['time'][temp['band'] == b], z=temp['flux'][temp['band'] == b], error_y=dict(symmetric=False, width=4, array=[delayerr[0]]*len(temp['time'][temp['band'] == b]), arrayminus=[delayerr[1]]*len(temp['time'][temp['band'] == b])), error_z=dict(symmetric=False, width=4, array=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[0]/mag)**2), arrayminus=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[1]/mag)**2)), mode='markers', marker=dict(color=colors3d[i]), name='Image %s' % lc[-1], **kwargs), row=crow, col=ccol) else: leg.append( ax.errorbar(temp['time'][ temp['band'] == b], temp['flux'][ temp['band'] == b], yerr=temp['fluxerr'][ temp['band'] == b], markersize=4, fmt=colors[i]+'.')) else: if plot3D: fig.add_trace(go.Scatter3d(x=temp['time'][temp['band'] == b]+delay, y=temp['time'][temp['band'] == b], z=temp['flux'][temp['band'] == b], error_y=dict(symmetric=False, width=4, array=[delayerr[0]]*len(temp['time'][temp['band'] == b]), arrayminus=[delayerr[1]]*len(temp['time'][temp['band'] == b])), error_z=dict(symmetric=False, width=4, array=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[0]/mag)**2), arrayminus=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[1]/mag)**2)), mode='markers', marker=dict(color=colors3d[i]), showlegend=False, **kwargs), row=crow, col=ccol) else: ax.errorbar(temp['time'][ temp['band'] == b], temp['flux'][ temp['band'] == b], yerr=temp['fluxerr'][ temp['band'] == b], markersize=4, fmt=colors[i]+'.') if showFit: if plot3D: fig.add_trace(go.Scatter3d(x=np.arange(np.min(temp['time'][temp['band'] == b]), np.max(temp['time'][temp['band'] == b]), 1)+delay, y=np.arange(np.min(temp['time'][temp['band'] == b]), np.max( temp['time'][temp['band'] == b]), 1), z=self.series.fits.model.bandflux(b, np.arange(np.min(temp['time'][temp['band'] == b]), np.max(temp['time'][temp['band'] == b]), 1), zp=temp['zp'][temp['band'] == b][0], zpsys=temp['zpsys'][temp['band'] == b][0]), mode='lines', line=dict(color='yellow', width=8), showlegend=False, **kwargs), row=crow, col=ccol) else: ax.plot(np.arange(mintime, maxtime, .1), self.series.fits.model.bandflux(b, np.arange(mintime, maxtime, .1), zp=temp['zp'][temp['band'] == b][0], zpsys=temp['zpsys'][temp['band'] == b][0]), color='y') if not plot3D: ax.text(0.95, 0.95, b.upper(), fontsize='large', transform=ax.transAxes, ha='right', va='top') i += 1 elif method == 'color': n3dPlots = 1 if isinstance(bands, str) and bands == 'all': if len([x for x in self.color.table.colnames if '-' in x and '_' not in x]) != 1: print("Want to plot color curves but need 2 bands specified.") sys.exit(1) else: colname = [ x for x in self.color.table.colnames if '-' in x and '_' not in x][0] bands = [colname[:colname.find( '-')], colname[colname.find('-')+1:]] elif len(bands) != 2: print("Want to plot color curves but need 2 bands specified.") sys.exit(1) if plot3D: try: import plotly.graph_objects as go from plotly.subplots import make_subplots fig = go.FigureWidget() ax = None except RuntimeError: print( 'Asked for 3D plot but do not have plotly installed, switching to 2D...') fig = None plot3D = False if not plot3D or fig is None: fig = plt.figure(figsize=(10, 10)) ax = fig.gca() for lc in np.sort([x for x in self.images.keys()]): temp = self.color.table[self.color.table['image'] == lc] try: delay = self.color.time_delays[lc] delayerr = self.color.time_delay_errors[lc] except: delay = 0 delayerr = [0, 0] if plot3D: fig.add_trace(go.Scatter3d(x=temp['time']+delay, y=temp['time'], z=temp[bands[0]+'-'+bands[1]], error_y=dict(symmetric=False, width=4, array=[delayerr[0]]*len(temp['time']), arrayminus=[delayerr[1]]*len(temp['time'])), error_z=dict(symmetric=True, width=4, array=temp[bands[0]+'-'+bands[1]+'_err']), mode='markers', marker=dict(color=colors3d[i]), name='Image %s' % lc[-1], **kwargs)) else: leg.append(ax.errorbar(temp['time'], temp[bands[0]+'-'+bands[1] ], yerr=temp[bands[0]+'-'+bands[1]+'_err'], markersize=4, fmt=colors[i]+'.')) ax.text(0.95, 0.95, bands[0].upper()+'-'+bands[1].upper(), fontsize='large', transform=ax.transAxes, ha='right', va='top') i += 1 if showFit: mod_time = np.arange(np.min(self.color.table['time']), np.max( self.color.table['time']), 1) modCol = self.color.fits.model.color( bands[0], bands[1], self.color.table['zpsys'][0], mod_time) if plot3D: fig.add_trace(go.Scatter3d(x=mod_time+delay, y=mod_time, z=modCol, mode='lines', line=dict(color='yellow', width=8), showlegend=False, **kwargs)) else: ax.plot(mod_time, modCol, color='y') ax.invert_yaxis() else: if isinstance(bands, str): if bands == 'all': bands = self.bands else: bands = [bands] nbands = len(bands) if orientation.startswith('v'): ncols = 1 nrows = nbands else: ncols = nbands nrows = 1 n3dPlots = nrows*ncols fig = None axlist = None if plot3D: try: import plotly.graph_objects as go from plotly.subplots import make_subplots fig = go.FigureWidget(make_subplots(rows=nrows, cols=ncols, subplot_titles=list(bands), specs=[[{'is_3d': True}]*ncols]*nrows)) axlist = [None]*len(bands) except RuntimeError: print( 'Asked for 3D plot but do not have plotly installed, switching to 2D...') fig = None plot3D = False if not plot3D or fig is None: fig, axlist = plt.subplots(nrows=nrows, ncols=ncols, sharex=True, sharey=True, figsize=(10, 10)) if nbands == 1: axlist = [axlist] microAx = {b: False for b in bands} for lc in np.sort([x for x in self.images.keys()]): if nrows == 1: ccol = 0 crow = 1 else: crow = 0 ccol = 1 try: delay = self.parallel.time_delays[lc] delayerr = self.parallel.time_delay_errors[lc] if showFit: mag = self.parallel.magnifications[lc] magerr = self.parallel.magnification_errors[lc] else: mag = 1 magerr = [0, 0] except: delay = 0 delayerr = [0, 0] mag = 1 magerr = [0, 0] for b, ax in zip(bands, axlist): if nrows == 1: ccol += 1 else: crow += 1 if b == list(bands)[0]: if plot3D: temp = self.images[lc].table fig.add_trace(go.Scatter3d(x=temp['time'][temp['band'] == b], y=temp['time'][temp['band'] == b]-delay, z=temp['flux'][temp['band'] == b]/mag, error_y=dict(symmetric=False, width=4, array=[delayerr[0]]*len(temp['time'][temp['band'] == b]), arrayminus=[delayerr[1]]*len(temp['time'][temp['band'] == b])), error_z=dict(symmetric=False, width=4, array=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[0]/mag)**2), arrayminus=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[1]/mag)**2)), mode='markers', marker=dict(color=colors3d[i]), name='Image %s' % lc[-1], **kwargs), row=crow, col=ccol) else: leg.append( ax.errorbar(self.images[lc].table['time'][ self.images[lc].table['band'] == b], self.images[lc].table['flux'][ self.images[lc].table['band'] == b], yerr=self.images[lc].table['fluxerr'][ self.images[lc].table['band'] == b], markersize=4, fmt=colors[i]+'.')) if showMicro: ax.set_ylabel('Flux', fontsize='large') else: if plot3D: fig.add_trace(go.Scatter3d(x=temp['time'][temp['band'] == b], y=temp['time'][temp['band'] == b]-delay, z=temp['flux'][temp['band'] == b]/mag, error_y=dict(symmetric=False, width=4, array=[delayerr[0]]*len(temp['time'][temp['band'] == b]), arrayminus=[delayerr[1]]*len(temp['time'][temp['band'] == b])), error_z=dict(symmetric=False, width=4, array=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[0]/mag)**2), arrayminus=temp['flux'][temp['band'] == b] * np.sqrt((temp['fluxerr'][temp['band'] == b]/temp['flux'][temp['band'] == b])**2 + (magerr[1]/mag)**2)), mode='markers', marker=dict(color=colors3d[i]), name='Image %s' % lc[-1], showlegend=False, **kwargs), row=crow, col=ccol) else: ax.errorbar(self.images[lc].table['time'][ self.images[lc].table['band'] == b], self.images[lc].table['flux'][ self.images[lc].table['band'] == b], yerr=self.images[lc].table['fluxerr'][ self.images[lc].table['band'] == b], markersize=4, fmt=colors[i]+'.') if showFit: time_model = np.arange(self.images[lc].table['time'].min(), self.images[lc].table['time'].max( ), 0.1) flux_model = self.images[lc].fits.model.bandflux(b, time_model, self.images[lc].table['zp'][self.images[lc].table['band'] == b][0], self.images[lc].table['zpsys'][self.images[lc].table['band'] == b][0]) if plot3D: fig.add_trace(go.Scatter3d(x=time_model, y=time_model-delay, z=flux_model/mag, mode='lines', line=dict(color='yellow', width=8), showlegend=False, **kwargs), row=crow, col=ccol) else: ax.plot(time_model, flux_model, linestyle='-', color=colors[i]) if plot_unresolved: for im_mod in self.images[lc].fits.model.model_list: im_flux_model = im_mod.bandflux(b, time_model, self.images[lc].table['zp'][self.images[lc].table['band'] == b][0], self.images[lc].table['zpsys'][self.images[lc].table['band'] == b][0]) ax.plot(time_model, im_flux_model, linestyle='-', color='cyan') if not plot3D: ax.text(0.95, 0.95, b.upper(), fontsize='large', transform=ax.transAxes, ha='right', va='top') if showMicro and not plot3D: time_model = np.arange(self.images[lc].table['time'].min(), self.images[lc].table['time'].max( ), 0.1) if microAx[b] is False: ax_divider = make_axes_locatable(ax) ax_ml = ax_divider.append_axes( "bottom", size="25%", pad=.4) if b == list(bands)[0]: ax_ml.set_ylabel( r'Microlensing ($\mu$)', fontsize='large') microAx[b] = ax_ml else: ax_ml = microAx[b] ax_ml.plot(time_model, self.images[lc].simMeta['microlensing_params']( (time_model-self.images[lc].simMeta['t0'])/\ (1+self.images[lc].simMeta['sourcez'])), color=colors[i], linewidth=3) if showModel: # Plot the underlying model, including dust and lensing # effects other than microlesning, as a black curve for # each simulated SN image time_model = np.arange(self.images[lc].table['time'].min(), self.images[lc].table['time'].max( ), 0.1) time_shifted = time_model - \ self.images[lc].simMeta['td'] flux_magnified = self.model.bandflux( b, time_shifted, self.images[lc].table['zp'][self.images[lc].table['band'] == b][0], self.images[lc].zpsys) * \ self.images[lc].simMeta['mu'] if plot3D: fig.add_trace(go.Scatter3d(x=time_shifted, y=time_model, z=flux_magnified, mode='lines', line=dict(color='orange', width=8), showlegend=False, **kwargs)) else: ax.plot(time_model, flux_magnified, 'r-') i += 1 if plot3D: zname = 'Flux' if method != 'color' else bands[0] + \ '-'+bands[1]+' Color' tempscene = dict(aspectmode='cube', camera=dict(projection=dict(type='orthographic'), eye=dict( x=0, y=10, z=0, )), xaxis=dict( gridcolor='rgb(255, 255, 255)', zerolinecolor='rgb(255, 255, 255)', showbackground=True, backgroundcolor='rgb(230, 230, 230)', title='Time (Observer Frame)', mirror=False, autorange='reversed' ), yaxis=dict( gridcolor='rgb(255, 255, 255)', zerolinecolor='rgb(255, 255, 255)', showbackground=True, backgroundcolor='rgb(230, 230, 230)', title='Corrected Time (Observer Frame)', mirror=False ), zaxis=dict( gridcolor='rgb(255, 255, 255)', zerolinecolor='rgb(255, 255, 255)', showbackground=True, backgroundcolor='rgb(230, 230, 230)', title=zname, mirror=False )) scenes = dict([]) for i in range(n3dPlots): if i > 0: key = 'scene'+str(i+1) scenes[key] = tempscene else: key = 'scene' scenes[key] = tempscene fig.update_layout( **scenes ) else: plt.figlegend(leg, np.sort(['$'+x+'$' for x in self.images.keys()]), frameon=False, loc='center right', fontsize='medium', numpoints=1) if not showMicro: if method.lower() != 'color': fig.text(0.02, .5, 'Flux', va='center', rotation='vertical', fontsize='large') else: fig.text(0.02, .5, bands[0].upper()+'-'+bands[1].upper()+' Color', va='center', rotation='vertical', fontsize='large') fig.text(0.5, 0.02, r'Observer-frame time (days)', ha='center', fontsize='large') plt.suptitle('Multiply-Imaged SN "'+self.object + '"--'+self.telescopename, fontsize=16) if savefig: plt.savefig(filename+'.pdf', format='pdf', overwrite=True) return fig
[docs]def table_factory(tables, telescopename="Unknown", object_name='Unknown'): """This function will create a new curve object using an astropy table or tables. Parameters ---------- tables : `~astropy.table.Table` or :class:`~list` Astropy table with all of your data from your data file, or a list of such tables. telescopename : str Name of telescope for labeling purposes inside curve object object_name : str Name of object for labeling purposes inside curve object (e.g. SN2006jf, etc.) Returns ------- curve : :class:`~sntd.curve` """ new_SN = MISN(telescopename, object_name) if not isinstance(tables, (list, tuple)): tables = [tables] for table in tables: newlc = image_lc() table = standardize_table_colnames(table) newlc.bands = {x for x in table[get_default_prop_name('band')]} zp_dict = dict([]) zpsys_dict = dict([]) for band in newlc.bands: zp = set(table[get_default_prop_name('zp')] [table[get_default_prop_name('band')] == band]) zpsys = set(table[get_default_prop_name('zpsys')] [table[get_default_prop_name('band')] == band]) zp_dict[band] = zp.pop() if len(zp) == 1 else np.array(zp) zpsys_dict[band] = zpsys.pop() if len( zpsys) == 1 else np.array(zpsys) newlc.table = table newlc.telescopename = telescopename newlc.object = object_name if 'image' in newlc.table.colnames: im_key = newlc.table['image'][0] new_SN.add_image_lc(newlc, key=im_key) else: new_SN.add_image_lc(newlc) return new_SN
def _switch(ext): switcher = { '.pkl': _read_pickle } return switcher.get(ext, _read_data)
[docs]def write_data(curves, filename=None, protocol=-1): """Used to write a MISN object to a pickle to be read later Parameters ---------- curves : `~sntd.MISN` filename : str Name of output file protocol : int Pickling protocol Returns ------- None """ if not filename: filename = curves.object with open(filename, 'wb') as handle: try: pickle.dump(curves, handle, protocol=protocol) except: cPickle.dump(curves, handle, protocol=protocol) return
[docs]def read_data(filename, **kwargs): """Used to read a light curve or curve object in pickle format. Either way, it'll come out as a curve object. Parameters ---------- filename : str Name of the file to be read (ascii or pickle) Returns ------- curve : :class:`~sntd.curve` or :class:`~sntd.MISN` """ return(_switch(os.path.splitext(filename)[1])(filename, **kwargs))
def _read_pickle(filename, telescopename="Unknown", object="Unknown", **kwargs): try: return (pickle.load(open(filename, 'rb'))) except: return (cPickle.load(open(filename, 'rb'))) def standardize_table_colnames(table): for col in table.colnames: if isinstance(table[col][0],str): table[col] = [x.lower() for x in table[col]] if col != get_default_prop_name(col.lower()): table.rename_column(col, get_default_prop_name(col.lower())) return table def _read_data(filename, **kwargs): table_done = True try: table = sncosmo.read_lc(filename, masked=True, **kwargs) for col in table.colnames: if col != get_default_prop_name(col.lower()): table.rename_column(col, get_default_prop_name(col.lower())) except: try: table = ascii.read(filename, masked=True, **kwargs) for col in table.colnames: if col != get_default_prop_name(col.lower()): table.rename_column( col, get_default_prop_name(col.lower())) except: table = Table(masked=True) table_done = False delim = kwargs.get('delim', None) myCurve = image_lc() with anyOpen(filename) as f: lines = f.readlines() # uses the most common line length as the correct length length = mode([len(l.split()) for l in lines])[0][0] for i, line in enumerate(lines): if np.any([x in line for x in _comment_char]): continue line = line.strip(delim) if len(line) == 0: continue if np.any([x in line for x in _meta__]): pos = line.find(' ') if (lines[i + 1][0] in _meta__ or lines[i + 1][0] in _comment_char or len( line) != length): # just making sure we're not about to put the col names into meta if (pos == -1 or not any([_isfloat(x) for x in line.split()])): if line[-1] not in string.punctuation: line = line+'.' else: myCurve.meta[line[1:pos]] = _cast_str(line[pos:]) continue line = line.split() if len(line) != length: raise(RuntimeError, "Make sure your data are in a square matrix.") if table.colnames: colnames = table.colnames else: colnames = odict.fromkeys( [get_default_prop_name(x.lower()) for x in line]) if len(colnames) != len(line): raise (RuntimeError, "Do you have duplicate column names?") colnames = odict(zip(colnames, range(len(colnames)))) startLine = i break f.close() if not table_done: lines = [x.strip().split() for x in lines[startLine + 1:] if not np.any([y in x for y in _comment_char])] for col in colnames: table[col] = np.asarray( [_cast_str(x[colnames[col]]) for x in lines]) colnames = colnames.keys() for col in [get_default_prop_name(x) for x in ['band', 'zp', 'zpsys']]: if col not in colnames: temp = kwargs.get(col, None) if temp is not None: table[col] = temp else: print( 'Column "%s" is not in your file and you did not define it in kwargs.' % col) sys.exit(1) table = standardize_table_colnames(table) bnds = {x for x in table[get_default_prop_name('band')]} table = _norm_flux_mag(table) for band in bnds: if _isfloat(band[0]): band = 'band_'+band try: if band[0:5] == 'band_': sncosmo.get_bandpass(band[5:]) else: sncosmo.get_bandpass(band) except: print('Skipping band %s, not in registry.' % band) table.mask[table[get_default_prop_name('band')] == band] = True continue myCurve.bands.append(band) myCurve.table = table myCurve.zpsys = table['zpsys'][0] return myCurve def _norm_flux_mag(table): if 'mag' not in table.colnames: if 'flux' not in table.colnames: print('No Mag or Flux data.') sys.exit(1) table = _flux_to_mag(table) else: if 'flux' not in table.colnames: table = _mag_to_flux(table) return table def _flux_to_mag(table): table[get_default_prop_name('mag')] = np.asarray(map(lambda x, y: -2.5 * np.log10(x) + y, table[get_default_prop_name('flux')], table[get_default_prop_name('zp')])) table[get_default_prop_name('magerr')] = np.asarray(map(lambda x, y: 2.5 * np.log10(np.e) * y / x, table[get_default_prop_name('flux')], table[get_default_prop_name('fluxerr')])) table[get_default_prop_name('magerr')][np.isnan( table[get_default_prop_name('mag')])] = np.nan return table def _mag_to_flux(table): table[get_default_prop_name('flux')] = np.asarray( map(lambda x, y: 10 ** (-.4 * (x - y)), table[get_default_prop_name('mag')], table[get_default_prop_name('zp')])) table[get_default_prop_name('fluxerr')] = np.asarray( map(lambda x, y: x * y / (2.5 * np.log10(np.e)), table[get_default_prop_name('magerr')], table[get_default_prop_name('flux')])) return table