Source code for sntd.ml

import os,sys,math,subprocess,sncosmo,abc
from textwrap import dedent

import numpy as np
from astropy.io import fits

from astropy import units as u
from astropy import constants as const
from astropy.cosmology import WMAP9 as cosmo
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.patches import Circle
import matplotlib.colors as colors
from mpl_toolkits.axes_grid1.axes_divider import make_axes_locatable
import matplotlib.mlab as mlab
from scipy.interpolate import interp1d,interp2d
from sncosmo.models import _ModelBase
import extinction

from .util import __dir__,__current_dir__
from .mldata import MicrolensingData

__all__=['_mlProp','_mlFlux','realizeMicro','microcaustic_field_to_curve',
         'AchromaticMicrolensing',
         'ChromaticFilterMicrolensing',
         '_CCM89Dust','_OD94Dust','_F99Dust']
#def identifyML(lc):
[docs]def realizeMicro(arand=.25,debug=0,kappas=.75,kappac=.15,gamma=.76,eps=.6,nray=300,minmass=10,maxmass=10,power=-2.35, pixmax=5,pixminx=0,pixminy=0,pixdif=10,fracpixd=.3,iwrite=0,verbose=False): """ Creates a microcaustic realization based on Wambsganss 1990 microlens code. All parameters are optional as they have defaults, see Wambsganss documentation for details on parameters. """ types=['%.3f','%i','%.2f','%.2f','%.2f','%.3f','%i','%.6f','%.6f','%.3f','%.3f','%.3f','%.3f','%.3f','%.3f','%i'] inData=[arand,debug,kappas,kappac,gamma,eps,nray,minmass,maxmass,power,pixmax,pixminx,pixminy,pixdif,fracpixd,iwrite] inputFile=np.loadtxt(os.path.join(__dir__,'microlens','default_input'),dtype='str',delimiter='tab') outFile=[] for i in range(len(inputFile)-1): dat=str(inData[i]) outFile.append(dat) thefile=open(os.path.join(__dir__,'microlens','input'),'w') for i in range(len(outFile)-1): thefile.write((types[i]+'\n')%(float(outFile[i]))) thefile.write(outFile[-1]) thefile.close() num=np.loadtxt(os.path.join(__dir__,'microlens','jobnum'),dtype='str') try: os.remove(os.path.join(__dir__,'microlens','IRIS'+str(num))) except: pass try: os.remove(os.path.join(__dir__,'microlens','IRIS'+str(num)+'.fits')) except: pass os.chdir(os.path.join(__dir__,'microlens')) if verbose: subprocess.call(r'./microlens') else: with open(os.devnull,'w') as f: subprocess.call(r'./microlens',stdout=f) np.savetxt(os.path.join(__dir__,'microlens','jobnum'),[num],fmt='%s') #print(num) #sys.exit() try: fitsFile=fits.open(os.path.join(__dir__,'microlens','IRIS'+str(num)+'.fits')) lensPlane=fitsFile[0].data fitsFile.close() except RuntimeError: print('There was an error with the inputs of your microcaustic.') sys.exit() os.chdir(__current_dir__) return(lensPlane)
[docs]def microcaustic_field_to_curve(field,time,zl,zs,velocity=(10**4)*(u.kilometer/u.s),M=(1*u.solMass).to(u.kg), loc='Random',plot=False): """ Convolves an expanding photosphere (achromatic disc) with a microcaustic to generate a magnification curve. Parameters ---------- field: :class:`numpy.ndarray` An opened fits file of a microcaustic, can be generated by realizeMicro time: :class:`numpy.array` Time array you want for microlensing magnification curve, explosion time is 0 zl: float redshift of the lens zs: float redshift of the source velocity: float* :class:`astropy.units.Unit` The average velocity of the expanding photosphere M: float* :class:`~astropy.units.Unit` The mass of the deflector loc: str or tuple Random is defualt for location of the supernova, or pixel (x,y) coordiante can be specified plot: bool If true, plots the expanding photosphere on the microcaustic Returns ------- time: :class:`numpy.array` The time array for the magnification curve dmag: :class:`numpy.array` The magnification curve. """ D=cosmo.angular_diameter_distance_z1z2(zl,zs)*cosmo.angular_diameter_distance(zs)/cosmo.angular_diameter_distance(zl) D=D.to(u.m) try: M.to(u.kg) except: print('Assuming mass is in kg.') einsteinRadius=np.sqrt(4*const.G*M*D/const.c**2) einsteinRadius=einsteinRadius.to(u.kilometer) try: velocity.to(u.kilometer/u.s) except: print('Assuming velocity is in km/s.') velocity*=(u.kilometer/u.s) h,w=field.shape height=10*einsteinRadius.value width=10*einsteinRadius.value pixwidth=width/w pixheight=height/h if pixwidth!=pixheight: print('Hmm, you are not using squares...') sys.exit() maxRadius=((np.max(time)*u.d).to(u.s))*velocity maxRadius=maxRadius.value maxx=int(math.floor(maxRadius/pixwidth)) maxy=int(math.floor(maxRadius/pixheight)) mlimage=field[maxx:-maxx][maxy:-maxy] if loc=='Random' or not isinstance(loc,(list,tuple)): loc=(int(np.random.uniform(maxx,w-maxx)),int(np.random.uniform(maxy,h-maxy))) tempTime=np.array([((x*u.d).to(u.s)).value for x in time]) snSize=velocity.value*tempTime/pixwidth dmag=mu_from_image(field,loc,snSize,'disk',plot,time) return(time,dmag)
def createCircularMask(h, w, center=None, radius=None): if center is None: # use the middle of the image center = [int(w/2), int(h/2)] if radius is None: # use the smallest distance between the center and image walls radius = min(center[0], center[1], w-center[0], h-center[1]) Y, X = np.ogrid[:h, :w] dist_from_center = np.sqrt((X - center[0])**2 + (Y-center[1])**2) mask = dist_from_center <= radius return mask def createGaussMask(h,w,center=None,radius=None): if center is None: # use the middle of the image center = [int(w/2), int(h/2)] if radius is None: # use the smallest distance between the center and image walls radius = min(center[0], center[1], w-center[0], h-center[1]) #Set up the 2D Gaussian: delta = 0.025 x = np.arange(-3.0, 3.0, delta) y = np.arange(-3.0, 3.0, delta) X, Y = np.meshgrid(x, y) sigma = 1.0 Z = mlab.bivariate_normal(X, Y, sigma, sigma, 0.0, 0.0) #Get Z values for contours 1, 2, and 3 sigma away from peak: z1 = mlab.bivariate_normal(0, 1 * sigma, sigma, sigma, 0.0, 0.0) z2 = mlab.bivariate_normal(0, 2 * sigma, sigma, sigma, 0.0, 0.0) z3 = mlab.bivariate_normal(0, 3 * sigma, sigma, sigma, 0.0, 0.0) #plt.figure() #plot Gaussian: #im = plt.imshow(Z, interpolation='bilinear', origin='lower', #extent=(-50,50,-50,50),cmap=cm.gray) #Plot contours at whatever z values we want: #CS = plt.contour(Z, [z1, z2, z3], origin='lower', extent=(-50,50,-50,50),colors='red') #plt.show() class MidpointNormalize(colors.Normalize): """ Normalise the colorbar so that diverging bars work there way either side from a prescribed midpoint value) e.g. im=ax1.imshow(array, norm=MidpointNormalize(midpoint=0.,vmin=-100, vmax=100)) """ def __init__(self, vmin=None, vmax=None, midpoint=None, clip=False): self.midpoint = midpoint colors.Normalize.__init__(self, vmin, vmax, clip) def __call__(self, value, clip=None): # I'm ignoring masked values and all kinds of edge cases to make a # simple example... x, y = [self.vmin, self.midpoint, self.vmax], [0, 0.5, 1] return np.ma.masked_array(np.interp(value, x, y), np.isnan(value)) def mu_from_image(image, center,sizes,brightness,plot,time): h, w = image.shape mu = [] if plot: fig=plt.figure(figsize=(10,10)) ax=fig.gca() plt.imshow(-(image-1024)/256., aspect='equal', interpolation='nearest', cmap=cm.bwr, norm=MidpointNormalize(vmin=-2,vmax=2,midpoint=0), vmin=-2, vmax=2, origin='lower') ax.set_xticklabels([0,0,2,4,6,8,10],fontsize=14) ax.set_yticklabels([0,0,2,4,6,8,10],fontsize=14) ax.set_xlabel('$R_E$',fontsize=18,labelpad=0) ax.set_ylabel('$R_E$',fontsize=18) image=10**(.4*(image-1024)/256.) i=0 alphas=[1,.5,.7] for r in sizes: if r in [sizes[int(len(sizes)/5)],sizes[int(len(sizes)/2)],sizes[int(len(sizes)-1)]]: circle = Circle(center, r, color='#004949', alpha=alphas[i]) i+=1 if plot: ax.add_patch(circle) if brightness=='disk': mask = createCircularMask(h,w,center=center,radius=r) try: totalMag=float((image[mask]).sum())/float(mask.sum()) except: totalMag=0 if totalMag==0: mu.append(1024) else: mu.append(totalMag) else: mask1,mask2,mask3=createGaussMask(h,w,center=center,radius=r/3) scale=np.array([.68,.27,.05]) totalMags=[] for mask in [mask1,mask2,mask3]: try: if mask.sum()==0: totalMags.append(0) continue tempMag=float(image[mask].sum())/float(mask.sum()) except RuntimeError: tempMag=0 totalMags.append(tempMag) if np.max(totalMags==0): mu.append(1024) else: mu.append(np.dot(np.array(totalMags),scale)) mu = np.array(mu) mu/=np.mean(mu) dmag=-2.5*np.log10(mu) if plot: cbaxes = fig.add_axes([.82, 0.33, 0.04, 0.55]) cb = plt.colorbar(cax = cbaxes) cb.ax.set_ylabel('Magnification (Magnitudes)',fontsize=18,rotation=270,labelpad=25) cb.ax.invert_yaxis() cb.ax.tick_params(labelsize=14) ax_divider = make_axes_locatable(ax) ax_ml = ax_divider.append_axes("bottom", size="25%", pad=.7) for tick in ax_ml.xaxis.get_major_ticks(): tick.label.set_fontsize(14) for tick in ax_ml.yaxis.get_major_ticks(): tick.label.set_fontsize(14) ax_ml.plot(time,dmag,ls='-',marker=' ', color='#004949') ax_ml.set_ylabel(r'$\Delta m$ (mag)',fontsize=18) ax_ml.set_xlabel('Time from Explosion (days)',fontsize=18) ax_ml.invert_yaxis() #ax.plot(sizes[10:-10],dmag[10:-10]) #plt.savefig('sntd_microlensing.pdf',format='pdf',overwrite=True) #plt.show() #plt.clf() #plt.close() return(mu)
[docs]class AchromaticMicrolensing(sncosmo.PropagationEffect): """ An achromatic microlensing object, defined filter to filter. """ _param_names = [] param_names_latex = [] _minwave = 0. _maxwave = 10.**6 #def __init__(self, mlfilename, magformat='multiply', **kwargs): def __init__(self, time,dmag,magformat='multiply', **kwargs): """ Parameters ---------- time: :class:`~list` or :class:`~numpy.array` A time array for your microlensing dmag: :class:`~list` or :class:`~numpy.array` microlensing magnification magformat : str Format of the magnification column. May be ``multiply`` or ``add,`` where ``multiply`` means the magnification column provides a multiplicative magnification factor, mu, so the effect is applied to the source as flux * mu, and ``add`` means the magnification column provides an additive magnitude, DeltaM=-2.5*log10(mu). """ self._magformat=magformat self._parameters = np.array([]) mldata=MicrolensingData(data={'phase':time,'magnification':dmag},magformat=magformat) self.mu = mldata.magnification_interpolator() #Now always a multiplicative mu
[docs] def propagate(self,phase, wave, flux): """ Propagate the magnification onto the model's flux output. """ mu = np.expand_dims(self.mu(phase), 1) return flux * mu
def getBandNorm(band): band=sncosmo.get_bandpass(band) wave, dwave = sncosmo.utils.integration_grid(band.minwave(), band.maxwave(), sncosmo.constants.MODEL_BANDFLUX_SPACING) trans = band(wave) f = np.ones((1,len(wave))) return np.sum(wave * trans * f, axis=1) * dwave / sncosmo.constants.HC_ERG_AA
[docs]class ChromaticFilterMicrolensing(sncosmo.PropagationEffect): """ A chromatic microlensing object, defined filter to filter. """ _param_names = [] param_names_latex = [] _minwave = 0. _maxwave = 10.**6 def __init__(self, times,dmags,bands, magformat='multiply', **kwargs): """ Parameters ---------- times: 1D or 2D :class:`~list` or :class:`~numpy.array` A list of the time arrays for your microlensing, with nrows=len(bands), ncols=len(dmags) dmags: 1D or 2D :class:`~list` or :class:`~numpy.array` same as times, but for microlensing magnification bands: :class:`~list` or :class:`~numpy.array` list of bands defining microlensing magformat : str Format of the magnification column. May be ``multiply`` or ``add,`` where ``multiply`` means the magnification column provides a multiplicative magnification factor, mu, so the effect is applied to the source as flux * mu, and ``add`` means the magnification column provides an additive magnitude, DeltaM=-2.5*log10(mu). """ if not isinstance(bands,(list,tuple,np.ndarray)): bands=[bands] if len(bands)!=len(times): times=[times]*len(bands) if len(bands)!=len(dmags): dmags=[dmags]*len(bands) self.bandNorms=[getBandNorm(b) for b in bands] self._magformat=magformat self._parameters = np.array([]) ml_list=[MicrolensingData(data={'phase':times[i],'magnification':dmags[i]},magformat=magformat).magnification_interpolator() for i in\ range(len(bands))] self.bandwaves=[[sncosmo.get_bandpass(band).wave[0], sncosmo.get_bandpass(band).wave[-1]] for band in bands] self.bandtimes=[[t[0],t[-1]] for t in times] self.wave=np.arange(np.max([self._minwave,np.min(self.bandwaves)]),np.min([self._maxwave,np.max(self.bandwaves)])+.01, sncosmo.constants.MODEL_BANDFLUX_SPACING) self.phase=np.arange(np.min(times),np.max(times)+.01,.1) all_mu=np.ones((len(self.phase),len(self.wave))) for i in range(len(bands)): indsp=np.where(np.logical_and(self.phase>=self.bandtimes[i][0],self.phase<=self.bandtimes[i][1]))[0] indsw=np.where(np.logical_and(self.wave>=self.bandwaves[i][0],self.wave<=self.bandwaves[i][1]))[0] for ind in indsp: all_mu[ind,indsw]=ml_list[i](self.phase[ind])*np.ones(len(indsw))#/self.bandNorms[i] self.mu=interp2d(self.phase,self.wave,np.array(all_mu).T,fill_value=1,bounds_error=True)
[docs] def propagate(self,phase, wave, flux): """ Propagate the magnification onto the model's flux output. """ return flux * self.mu(phase,wave).T
def _mlFlux(self,time, wave): """Replacement for sncosmo Array flux function.""" a = 1. / (1. + self._parameters[0]) phase = (time - self._parameters[1]) * a minphase = (self.mintime() - self._parameters[1]) * a maxphase = (self.maxtime() - self._parameters[1]) * a restwave = wave * a # Note that below we multiply by the scale factor to conserve # bolometric luminosity. f = a * self._source._flux(np.round(phase,1).astype(np.double), np.round(restwave,1).astype(np.double)) # Pass the flux through the PropagationEffects. for effect, frame, zindex in zip(self._effects, self._effect_frames, self._effect_zindicies): if frame == 'obs': effect_wave = wave effect_phase=phase*(1./a) elif frame == 'rest': effect_wave = restwave effect_phase=phase else: # frame == 'free' effect_a = 1. / (1. + self._parameters[zindex]) effect_wave = wave * effect_a effect_phase=phase/a*(1.+self._parameters[zindex]) f = effect.propagate(effect_phase,effect_wave, f) return f def _mlProp(_ModelBase): """Abstract base class for propagation effects. Derived classes must define _minwave (float), _maxwave (float). """ __metaclass__ = abc.ABCMeta def minwave(self): return self._minwave def maxwave(self): return self._maxwave @abc.abstractmethod def propagate(self, wave, flux, phase=None): pass def _headsummary(self): summary = """\ class : {0} wavelength range: [{1:.6g}, {2:.6g}] Angstroms""" \ .format(self.__class__.__name__, self._minwave, self._maxwave) return dedent(summary) class _CCM89Dust(sncosmo.PropagationEffect): """Cardelli, Clayton, Mathis (1989) extinction model dust.""" _param_names = ['ebv', 'r_v'] param_names_latex = ['E(B-V)', 'R_V'] _minwave = 1000. _maxwave = 33333.33 def __init__(self): self._parameters = np.array([0., 3.1]) def propagate(self, phase,wave, flux): """Propagate the flux.""" ebv, r_v = self._parameters return extinction.apply(extinction.ccm89(wave, ebv * r_v, r_v), flux) class _OD94Dust(sncosmo.PropagationEffect): """O'Donnell (1994) extinction model dust.""" _param_names = ['ebv', 'r_v'] param_names_latex = ['E(B-V)', 'R_V'] _minwave = 909.09 _maxwave = 33333.33 def __init__(self): self._parameters = np.array([0., 3.1]) def propagate(self, phase,wave, flux): """Propagate the flux.""" ebv, r_v = self._parameters return extinction.apply(extinction.odonnell94(wave, ebv * r_v, r_v), flux) class _F99Dust(sncosmo.PropagationEffect): """Fitzpatrick (1999) extinction model dust with fixed R_V.""" _minwave = 909.09 _maxwave = 60000. def __init__(self, r_v=3.1): self._param_names = ['ebv'] self.param_names_latex = ['E(B-V)'] self._parameters = np.array([0.]) self._r_v = r_v self._f = extinction.Fitzpatrick99(r_v=r_v) def propagate(self, phase,wave, flux): """Propagate the flux.""" ebv = self._parameters[0] return extinction.apply(self._f(wave, ebv * self._r_v), flux)