"""
Stellar library class
Intent to implement a generic module to manage stellar library from various
sources.
The interpolation is implemented from the pegase.2 fortran converted algorithm.
(this may not be super pythonic though)
.. note::
a cython version is available for speed up and should be used transparently when available
(run make once)
"""
import numpy as np
from scipy.interpolate import interp1d
from itertools import groupby
from .ezunits import unit, hasUnit
from .helpers import nbytes, isNestedInstance
from .future import Path
from .interpolator import NDLinearInterpolator, find_interpolator
lsun = 3.839e+26 # in W (Watts)
sig_stefan = 5.67037321 * 1e-8 # W * m**-2 * K**-4
rsun = 6.955e8 # in meters
_default_interpolator = find_interpolator('lejeune')
def _drop_units(q):
""" Drop the unit definition silently """
try:
return q.magnitude
except:
return q
[docs]class Stellib(object):
""" Basic stellar library class
Attributes
----------
interpolator: interpolator.BaseInterpolator
interpolator to use, default LeujeuneInterpolator
"""
def __init__(self, *args, **kwargs):
""" Contructor """
self.interpolator = find_interpolator(
kwargs.pop('interpolator', None),
osl=self)
if self.interpolator is None:
self.interpolator = _default_interpolator(self)
if not hasattr(self, 'wavelength_unit'):
self.wavelength_unit = None
self._dlogT = 0.5
self._dlogg = 0.5
[docs] def get_interpolation_data(self):
""" Default interpolation """
return np.array([self.logT, self.logg, self.logZ]).T
@property
def wavelength(self):
l = np.copy(self._wavelength)
if self.wavelength_unit is not None:
return l * unit[self.wavelength_unit]
else:
return l
@property
def flux_units(self):
if self.wavelength_unit is not None:
return unit['erg/s/' + self.wavelength_unit]
else:
return 1.
def _load_(self):
""" Load the library """
raise NotImplementedError
@property
def nbytes(self):
""" return the number of bytes of the object """
return nbytes(self)
[docs] def plot_boundary(self, ax=None, dlogT=0., dlogg=0., **kwargs):
"""
Parameters
----------
dlogT: float
margin in logT (see get_boundaries)
dlogg: float
margin in logg (see get_boundaries)
.. see also::
:func:`matplotlib.plot`
For additional kwargs
"""
import matplotlib.patches as patches
from pylab import gca
if ax is None:
ax = gca()
p = self.get_boundaries(dlogT=dlogT, dlogg=dlogg)
ax.add_patch(patches.PathPatch(p, **kwargs))
return p
def __add__(self, other):
if not isNestedInstance(other, Stellib):
raise ValueError('expecting a Stellib object, got {0}'.format(type(other)))
return CompositeStellib([self, other])
def __repr__(self):
return "{0:s}, ({1:s})\n{2:s}".format(self.name, nbytes(self, pprint=True),
object.__repr__(self))
[docs] def get_weights(self, logT, logg, logL, weights=None):
""" Returns the proper weights for the interpolation
in spectra libraries the default is to have Lbol=1 normalization
Parameters
----------
logT: float or ndarray
log-temperatures log(T/K)
logg: float or ndarray
log-gravity log(g)
logL: float or ndarray
bolometric luminosity (log (L/Lsun))
"""
# weights to apply during the interpolation (note that radii must be in cm)
# Stellar library models are given in cm^-2 ( 4 pi R)
# Compute radii of each point using log(T) and log(L)
L = 10 ** logL * unit['lsun'].to("ergs/s").magnitude
if weights is not None:
weights *= L
else:
weights = L
return weights
[docs] def generate_stellar_spectrum(self, logT, logg, logL, Z,
raise_extrapolation=True, **kwargs):
""" Generates individual spectrum for the given stars APs and the
stellar library
Returns NaN spectra if the boundary conditions are not met (no extrapolation)
Parameters
----------
logT: float
temperature
logg: float
log-gravity
logL: float
log-luminosity
Z: float
metallicity
raise_extrapolation: bool
if set throw error on extrapolation
null: value
value of the flux when extrapolation and raise_extrapolation is not set
returns
-------
s0: ndarray, shape=(len(stars), len(l0))
array of spectra, one per input star
Spectrum in ergs/s/AA or ergs/s/AA/Lsun
"""
null_value = kwargs.pop('null', np.nan)
# weights to apply during the interpolation (note that radii must be in cm)
weights = self.get_weights(logT, logg, logL)
logZ = np.log10(Z)
l0 = self.wavelength
# check boundary conditions, keep the data but do not compute the sed
# if outside
if not self.points_inside(np.atleast_2d([logT, logg]))[0]:
if raise_extrapolation:
raise RuntimeError('Outside library interpolation range')
else:
return l0, np.full(len(self.wavelength), null_value)
aps = logT, logg, logZ
spec = self.interpolator.interp(aps) * weights
return spec
[docs] def generate_individual_values(self, stars, values, **kwargs):
""" Generates individual spectra for the given stars and stellar library
Returns NaN spectra if the boundary conditions are not met (no extrapolation)
Parameters
----------
stars: Table
contains at least (logT, logg, logL, Z) of the considered stars
values: sequence or attribute name
value to interpolate
dlogT: float
margin in logT
dlogg: float
margin in logg
returns
-------
values: sequence
value to interpolate
"""
_values = np.atleast_1d(getattr(self, values, values))
null_value = kwargs.pop('null', np.nan)
dlogT = kwargs.pop('dlogT', self._dlogT)
dlogg = kwargs.pop('dlogg', self._dlogg)
ndata = len(stars)
logT, logg, logL, Z = stars['logT'], stars['logg'], stars['logL'], stars['Z']
# weights to apply during the interpolation (note that radii must be in cm)
weights = self.get_weights(logT, logg, logL)
# check boundary conditions, keep the data but do not compute the sed
# if outside
bound = self.points_inside(np.array([logT, logg]).T,
dlogT=dlogT, dlogg=dlogg)
if np.ndim(_values) == 1:
specs = np.empty(ndata, dtype=float)
else:
specs = np.empty((ndata, _values.shape[1]), dtype=float)
specs[~bound] = null_value
logZ = np.log10(Z)
aps = np.array([logT, logg, logZ]).T
s = self.interpolator.interp_other(aps[bound], _values) * weights[bound]
specs[bound] = np.squeeze(s)
return specs
[docs] def generate_individual_spectra(self, stars, **kwargs):
""" Generates individual spectra for the given stars and stellar library
Returns NaN spectra if the boundary conditions are not met (no extrapolation)
Parameters
----------
stars: Table
contains at least (logT, logg, logL, Z) of the considered stars
dlogT: float
margin in logT
dlogg: float
margin in logg
returns
-------
l0: ndarray, ndim=1
wavelength definition of the spectra
wavelength in AA
s0: ndarray, shape=(len(stars), len(l0))
array of spectra, one per input star
Spectrum in ergs/s/AA or lsun/AA
"""
null_value = kwargs.pop('null', np.nan)
dlogT = kwargs.pop('dlogT', self._dlogT)
dlogg = kwargs.pop('dlogg', self._dlogg)
ndata = len(stars)
logT, logg, logL, Z = stars['logT'], stars['logg'], stars['logL'], stars['Z']
# weights to apply during the interpolation (note that radii must be in cm)
weights = self.get_weights(logT, logg, logL)
# check boundary conditions, keep the data but do not compute the sed
# if outside
bound = self.points_inside(np.array([logT, logg]).T,
dlogT=dlogT, dlogg=dlogg)
specs = np.empty((ndata, len(self._wavelength)), dtype=float)
specs[~bound] = np.full(len(self.wavelength), null_value)
logZ = np.log10(Z)
aps = np.array([logT, logg, logZ]).T
s = self.interpolator.interp(aps[bound]) * weights[bound, None]
specs[bound] = s
l0 = self.wavelength
specs = specs * self.flux_units
return l0, specs
[docs] def points_inside(self, xypoints, dlogT=0.1, dlogg=0.5):
"""
Returns if a point is inside the polygon defined by the boundary of the library
Parameters
----------
xypoints: sequence
a sequence of N logg, logT pairs.
dlogT: float
margin in logT
dlogg: float
margin in logg
returns
-------
r: ndarray(dtype=bool)
a boolean ndarray, True for points inside the polygon.
A point on the boundary may be treated as inside or outside.
"""
p = self.get_boundaries(dlogT=dlogT, dlogg=dlogg)
return p.contains_points(xypoints)
[docs] def get_radius(self, logl, logt):
""" Returns the radius of a star given its luminosity and temperature
Assuming a black body, it comes:
.. math::
R ^ 2 = L / ( 4 \pi \sigma T ^ 4 ),
with:
* L, luminosity in W,
* pi, 3.141592...
* sig, Stefan constant in W * m**-2 * K**-4
* T, temperature in K
Parameters
----------
logl: ndarray[float, ndim=1]
log luminosities from the isochrones, in Lsun
logt: ndarray[float, ndim=1]
log temperatures from the isochrones, in K
returns
-------
radii: ndarray[float, ndim=1]
array of radii in m (SI units)
"""
return np.sqrt( (10 ** logl) * lsun / (4.0 * np.pi * sig_stefan * ((10 ** logt) ** 4)) )
[docs] def get_boundaries(self, dlogT=0.1, dlogg=0.3, **kwargs):
""" Returns the closed boundary polygon around the stellar library with
given margins
Parameters
----------
s: Stellib
Stellar library object
dlogT: float
margin in logT
dlogg: float
margin in logg
returns
-------
b: ndarray[float, ndim=2]
closed boundary edge points: [logT, logg]
.. note::
as computing the boundary could take time, it is saved in the object
and only recomputed when parameters are updated
"""
# if bbox is defined then assumes it is more precise and use it instead.
if dlogT is None:
dlogT = 0.1
if dlogg is None:
dlogg = 0.3
if hasattr(self, 'bbox'):
return Path(self.bbox(dlogT, dlogg))
if getattr(self, '_bound', None) is not None:
# check if recomputing is needed
if ((self._bound[1] - dlogT) < 1e-3) and (abs(self._bound[2] - dlogg) < 1e-3):
return self._bound[0]
leftb = [(np.max(self.logT[self.logg == k]) + dlogT, k ) for k in np.unique(self.logg)]
leftb += [(leftb[-1][1], leftb[-1][0] + dlogg)]
leftb = [(leftb[0][1], leftb[0][0] - dlogg)] + leftb
rightb = [(np.min(self.logT[self.logg == k]) - dlogT, k) for k in np.unique(self.logg)[::-1]]
rightb += [(rightb[-1][1], rightb[-1][0] - dlogg)]
rightb = [(rightb[0][1], rightb[0][0] + dlogg)] + rightb
b = leftb + rightb
b += [b[0]]
self._bound = (Path(np.array(b)), dlogT, dlogg)
return self._bound[0]
[docs]class AtmosphereLib(Stellib):
"""
Almost identical to a spectral library. The difference lies into the units
of the input libraries.
"""
[docs] def get_weights(self, logT, logg, logL, weights=None):
""" Returns the proper weights for the interpolation
Stellar atmospheres are normalized to Radius = 1
Parameters
----------
logT: float or ndarray
log-temperatures log(T/K)
logg: float or ndarray
log-gravity log(g)
logL: float or ndarray
bolometric luminosity (log (L/Lsun))
"""
# weights to apply during the interpolation (note that radii must be in cm)
# Stellar library models are given in cm^-2 ( 4 pi R)
# Compute radii of each point using log(T) and log(L)
radii = self.get_radius(logL, logT)
if weights is not None:
weights *= 4. * np.pi * (radii * 1e2) ** 2
else:
weights = 4. * np.pi * (radii * 1e2) ** 2
return weights
[docs]class CompositeStellib(Stellib):
""" Generates an object from the union of multiple individual libraries """
def __init__(self, osllist, *args, **kwargs):
self._olist = osllist
self._dlogT = 0.5
self._dlogg = 0.5
@property
def name(self):
return ' + '.join([sl.name for sl in self._olist])
def __add__(self, other):
""" Adding a library after """
if not isNestedInstance(other, Stellib):
raise ValueError('expecting a Stellib object, got {0}'.format(type(other)))
lst = [k for k in self._olist] + [other]
return CompositeStellib(lst)
def __radd__(self, other):
""" Adding a library before """
if not isNestedInstance(other, Stellib):
raise ValueError('expecting a Stellib object, got {0}'.format(type(other)))
lst = [other] + [k for k in self._olist]
return CompositeStellib(lst)
@property
def wavelength(self):
""" return a common wavelength sampling to all libraries. This can be
used to reinterpolate any spectrum onto a common definition """
# check units
has_units = [hasUnit(osl.wavelength) for osl in self._olist]
test = sum(has_units)
if (test == 0):
return np.unique(np.asarray([ osl._wavelength for osl in self._olist ]))
# which library sets the units
common_unit = self._olist[0].wavelength_unit
libset_unit = self._olist[0].name
# if some libraries do not have units... Should not happen often!
if (test < len(self._olist)):
for k, osl in enumerate(self._olist):
common_unit = osl.wavelength_unit
libset_unit = osl.name
if common_unit is not None:
break
print("Warning: Some libraries do not have units. Assuming consistency with {0:s}".format(libset_unit))
wave = []
for osl in self._olist:
wave.append(osl.wavelength.to(common_unit).magnitude)
return np.unique(np.array(wave)) * unit[common_unit]
@property
def _wavelength(self):
return _drop_units(self.wavelength)
@property
def source(self):
return ' + '.join([k.name for k in self._olist])
@property
def logT(self):
return np.hstack([osl.logT for osl in self._olist])
@property
def logg(self):
return np.hstack([osl.logg for osl in self._olist])
@property
def Teff(self):
return np.hstack([osl.Teff for osl in self._olist])
@property
def Z(self):
return np.hstack([osl.Z for osl in self._olist])
@property
def logZ(self):
return np.hstack([osl.logZ for osl in self._olist])
@property
def flux_units(self):
return self._olist[0].flux_units
[docs] def which_osl(self, xypoints, **kwargs):
"""
Returns the library indice that contains each point in xypoints
The decision is made from a two step search:
* first, each point is checked against the strict boundary of each
library (i.e., dlogT = 0, dlogg = 0).
* second, if points are not found in strict mode, the boundary is
relaxed and a new search is made.
Each point is associated to the first library matching the above conditions.
Parameters
----------
xypoints: sequence
a sequence of N logg, logT pairs.
dlogT: float
margin in logT
dlogg: float
margin in logg
returns
-------
res: ndarray(dtype=int)
a ndarray, 0 meaning no library covers the point, and 1, ... n, for the n-th library
"""
dlogT = kwargs.pop('dlogT', self._dlogT)
dlogg = kwargs.pop('dlogg', self._dlogg)
xy = np.atleast_2d(xypoints)
# check that all points are in the full boundary area
# MF: testing why points_inside does not agree on all computers...
# as we do not keep individual results, no need to store then all
# first, collapse directly
# res_temp = np.zeros((len(xy),len(self._olist)))
# for ek,ok in enumerate(self._olist):
# res_temp[:, ek] = ok.points_inside(xy, dlogT=dlogT, dlogg=dlogg).astype(int)
res_temp = np.zeros(len(xy), dtype=int)
for ek, ok in enumerate(self._olist):
res_temp += ok.points_inside(xy, dlogT=dlogT, dlogg=dlogg).astype(int)
ind = res_temp > 0
res = np.zeros(len(xy), dtype=int)
res[ind] = 1
res = res - 1
# res = self.points_inside(xy, dlogT=dlogT, dlogg=dlogg).astype(int) - 1
# if res == -1: invalid point, res == 0: proceed
if max(res) < 0:
# DEBUG: should generate an exeception in further functions
# TODO: get rid and replace
return res
# return res
# Strict mode
# ===========
# Not extrapolation allowed >> dlogT = 0, dlogg = 0
# 0 is used to flag points without a matching library yet
# libraries are then indexed from 1 to n
# -1 means point outside the compound library
for ek, ok in enumerate(self._olist):
if 0 in res:
ind = np.atleast_1d(np.squeeze(np.where(res == 0)))
r = ok.points_inside(xy[ind], dlogT=0., dlogg=0.)
res[ind[r]] = ek + 1
# Relaxed mode
# ============
# In this case we accept some flexibility in the boundary limits,
# which allows limited extrapolation ranges.
# this only affects points not already matched
if 0 in res:
for ek, ok in enumerate(self._olist):
if 0 in res:
ind = np.atleast_1d(np.squeeze(np.where(res == 0)))
r = ok.points_inside(xy[ind], dlogT=dlogT, dlogg=dlogg)
res[ind[r]] = ek + 1
return res
def __repr__(self):
return "CompositeStellib, {0}\n{1}".format(object.__repr__(self), '\n'.join([k.name for k in self._olist]))
[docs] def get_boundaries(self, **kwargs):
""" Returns the closed boundary polygon around the stellar library with
given margins
Parameters
----------
s: Stellib
Stellar library object
dlogT: float
margin in logT
dlogg: float
margin in logg
returns
-------
b: ndarray[float, ndim=2]
(closed) boundary points: [logg, Teff] (or [Teff, logg] is swap is True)
.. note::
as computing the boundary could take time, it is saved in the object
and only recomputed when parameters are updated
"""
dlogT = kwargs.pop('dlogT', self._dlogT)
dlogg = kwargs.pop('dlogg', self._dlogg)
if getattr(self, '_bound', None) is not None:
if ((self._bound[1] - dlogT) < 1e-3) and (abs(self._bound[2] - dlogg) < 1e-3):
return self._bound[0]
b = [osl.get_boundaries(dlogT=dlogT, dlogg=dlogg, **kwargs) for osl in self._olist]
self._bound = (Path.make_compound_path(*b), dlogT, dlogg)
return self._bound[0]
[docs] def generate_stellar_spectrum(self, logT, logg, logL, Z,
raise_extrapolation=True, **kwargs):
""" Generates individual spectrum for the given stars APs and the
stellar library
Returns NaN spectra if the boundary conditions are not met (no extrapolation)
Parameters
----------
logT: float
temperature
logg: float
log-gravity
logL: float
log-luminosity
Z: float
metallicity
raise_extrapolation: bool
if set throw error on extrapolation
null: value
value of the flux when extrapolation and raise_extrapolation is not set
returns
-------
s0: ndarray, shape=(len(stars), len(l0))
array of spectra, one per input star
Spectrum in ergs/s/AA or ergs/s/AA/Lsun
"""
try:
bounds = kwargs.pop('bounds', None)
dlogT = bounds.get('dlogT', self._dlogT)
dlogg = bounds.get('dlogg', self._dlogg)
except:
dlogT = None
dlogg = None
osl_index = self.which_osl(np.atleast_2d([logT, logg]), dlogT=dlogT, dlogg=dlogg)[0]
osl = self._olist[osl_index - 1]
specs = osl.generate_stellar_spectrum(logT, logg, logL, Z,
raise_extrapolation,
**kwargs)
specs = self.reinterpolate_spectra(osl.wavelength, specs, fill_value=0., bounds_error=False)
return specs
[docs] def reinterpolate_spectra(self, l0, specs, **kwargs):
""" One-dimensional linear interpolation onto the common wavelength.
Returns the one-dimensional interpolated spectrum
Parameters
----------
l0 : 1-D sequence of floats (with units or not)
wavelength of the spectrum to interpolate
specs : 1-D sequence of floats
spectrum to reinterpolate
left : float, optional
Value to return for `x < xp[0]`, default is `fp[0]`.
right : float, optional
Value to return for `x > xp[-1]`, default is `fp[-1]`.
period : None or float, optional
A period for the x-coordinates. This parameter allows the proper
interpolation of angular x-coordinates. Parameters `left` and `right`
are ignored if `period` is specified.
Returns
-------
spec : ndarray
The interpolated values
"""
# TODO: proper reinterpolation that conserves energy... but makes a new
# resolution
wave = self.wavelength
try:
wave = wave.to(l0.unit)
except:
wave = _drop_units(wave)
func = interp1d(_drop_units(l0), _drop_units(specs), **kwargs)
f = func(_drop_units(wave))
return f
[docs] def generate_individual_spectra(self, stars, **kwargs):
""" Generates individual spectra for the given stars and stellar library
Returns NaN spectra if the boundary conditions are not met (no extrapolation)
Parameters
----------
stars: Table
contains at least (logT, logg, logL, Z) of the considered stars
returns
-------
l0: ndarray, ndim=1
wavelength definition of the spectra
wavelength in AA
s0: ndarray, shape=(len(stars), len(l0))
array of spectra, one per input star
Spectrum in ergs/s/AA or ergs/s/AA/Lsun
"""
null_value = kwargs.pop('null', np.nan)
try:
bounds = kwargs.pop('bounds', None)
dlogT = bounds.get('dlogT', self._dlogT)
dlogg = bounds.get('dlogg', self._dlogg)
except:
dlogT = None
dlogg = None
ndata = len(stars)
logT, logg, logL, Z = stars['logT'], stars['logg'], stars['logL'], stars['Z']
# find which library per star
osl_index = self.which_osl(list(zip(logT, logg)), dlogT=dlogT, dlogg=dlogg)
# group calculations per library
groups = [(osl_i, [k[0] for k in grp])
for osl_i, grp in groupby(enumerate(osl_index), lambda x:x[1])]
# Do the actual interpolation, avoiding exptrapolations
spectra = np.empty( (ndata, len(self.wavelength)), dtype=float )
for osl_i, idx_group in groups:
ind = np.array(idx_group)
if osl_i > 0:
_osl = self._olist[osl_i - 1]
lambdas, specs = _osl.generate_individual_spectra(stars[ind])
spectra[ind] = self.reinterpolate_spectra(
_drop_units(lambdas),
_drop_units(specs),
bounds_error=False,
fill_value=0.,
**kwargs
)
else:
spectra[ind] = null_value
l0 = self.wavelength
spectra = spectra * self.flux_units
return l0, spectra, osl_index
[docs] def generate_individual_values(self, stars, values, **kwargs):
""" Generates individual spectra for the given stars and stellar library
Returns NaN spectra if the boundary conditions are not met (no extrapolation)
Parameters
----------
stars: Table
contains at least (logT, logg, logL, Z) of the considered stars
values: sequence or attribute name
value to interpolate
returns
-------
values: sequence
value to interpolate
"""
null_value = kwargs.pop('null', np.nan)
try:
bounds = kwargs.pop('bounds', None)
dlogT = bounds.get('dlogT', self._dlogT)
dlogg = bounds.get('dlogg', self._dlogg)
except:
dlogT = None
dlogg = None
ndata = len(stars)
logT, logg, logL, Z = stars['logT'], stars['logg'], stars['logL'], stars['Z']
# find which library per star
osl_index = self.which_osl(list(zip(logT, logg)), dlogT=dlogT, dlogg=dlogg)
# group calculations per library
groups = [(osl_i, [k[0] for k in grp])
for osl_i, grp in groupby(enumerate(osl_index), lambda x:x[1])]
# Do the actual interpolation, avoiding exptrapolations
spectra = np.empty(ndata, dtype=float )
for osl_i, idx_group in groups:
ind = np.array(idx_group)
if osl_i > 0:
_osl = self._olist[osl_i - 1]
spectra[ind] = _osl.generate_individual_values(stars[ind], values, **kwargs)
else:
spectra[ind] = null_value
return spectra, osl_index