Source code for pyphot.libraries

"""Collections of Filters"""

from typing import Optional, List, Sequence, Union, Literal
import os
import pathlib
import numpy as np
import numpy.typing as npt
import tables

from . import config
from .helpers import progress_enumerate
from .phot import Filter, QuantityType


[docs] class Library: """Common grounds for filter libraries""" source: Optional[str] """Source of the library"""
[docs] def __init__( self, source: Optional[str] = None, *args, **kwargs, ): """Construct the library""" self.source = source or str(config.__default_passband_lib__)
def __repr__(self) -> str: msg = "Filter Library: {0}\n{1:s}" return msg.format(self.source, object.__repr__(self)) def __enter__(self): """Enter context""" return self def __exit__(self, *exc_info): """end context""" return False def __len__(self) -> int: """Size of the library""" return len(self.content)
[docs] def to_csv(self, directory="./", progress=True, **kwargs): """Export each filter into a csv file with its own name Parameters ---------- directory: str directory to write into progress: bool show progress if set """ os.makedirs(directory, exist_ok=True) def export_filter(f: Filter): if f.wavelength_unit in (None, ""): f.wavelength_unit = "AA" f.write_to( f"{directory:s}/{f.name:s}.csv".lower(), fmt="%.6f", **kwargs, ) with self as s: for _, k in progress_enumerate( s.content, desc="export", show_progress=progress ): f = s[k] # s[k] can return a list if isinstance(f, Filter): export_filter(f) else: for fk in f: export_filter(fk)
[docs] def to_hdf(self, fname="filters.hd5", progress=True, **kwargs): """Export each filter into a csv file with its own name Parameters ---------- directory: str directory to write into progress: bool show progress if set """ def export_filter(f): if f.wavelength_unit in (None, ""): f.wavelength_unit = "AA" f.write_to( f"{fname:s}", tablename=f"/filters/{f.name}", createparents=True, append=True, silent=True, **kwargs, ) with self as s: for _, k in progress_enumerate( s.content, desc="export", show_progress=progress ): f = s[k] if isinstance(f, Filter): export_filter(f) else: for fk in f: export_filter(fk)
[docs] @classmethod def from_hd5(cls, filename, **kwargs) -> "HDF_Library": return HDF_Library(filename, **kwargs)
[docs] @classmethod def from_ascii(cls, filename, **kwargs) -> "Ascii_Library": """Read in an ASCII library""" return Ascii_Library(filename, **kwargs)
@property def content(self) -> List[str]: """Get the content list""" return self.get_library_content() def __getitem__( self, name: Union[str, Sequence[str]] ) -> Union[Filter, List[Filter]]: """Make this object like a dictionary and load one or multiple filters""" with self as s: try: f = s._load_filter(name) except TypeError: f = [s._load_filter(k) for k in name] return f def _load_filter(self, *args, **kwargs) -> Filter: """Load a given filter from the library""" raise NotImplementedError
[docs] def get_library_content(self) -> List[str]: """get the content of the library""" raise NotImplementedError
[docs] def load_all_filters( self, *, interp: bool = True, lamb: Optional[Union[npt.NDArray[np.floating], QuantityType]] = None, ) -> List[Filter]: """load all filters from the library Parameters ---------- interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter Returns ------- filters: list[filter] list of filter objects """ raise NotImplementedError
[docs] def add_filter(self, f: Filter): """add a filter to the library""" raise NotImplementedError
[docs] def find(self, name: str, case_sensitive=True) -> List[str]: """Search for a filter in the library""" r = [] if case_sensitive: _n = name.lower() for k in self.get_library_content(): if _n in k.lower(): r.append(k) else: for k in self.content: if name in k: r.append(k) return r
[docs] class Ascii_Library(Library): """Interface one or multiple directory or many files as a filter :class:`Library` >>> lib = Ascii_Library(['ground', 'hst', 'myfilter.csv']) """ def _load_filter( self, fname: str, interp: bool = True, lamb: Union[None, npt.NDArray[np.floating], QuantityType] = None, *args, **kwargs, ): """Load a given filter from the library Parameters ---------- fname : str Name of the filter to load. interp : bool, optional Whether to interpolate the filter to a given wavelength grid. lamb : array_like, optional Wavelength grid to interpolate the filter to. args : tuple, optional Additional arguments to pass to the filter constructor. kwargs : dict, optional Additional keyword arguments to pass to `Filter.from_ascii`. Returns ------- Filter The loaded filter. """ try: # attempt to load filter from ascii file fil = Filter.from_ascii(fname, *args, **kwargs) except Exception: content = self.content r = [k for k in content if fname in k] # if nothing matched, try all lowercase for names if len(r) <= 0: r = [k for k in content if fname.lower() in k] if len(r) > 1: raise ValueError( "Auto correction found multiple choices." "Refine name to one of {}".format(r) ) elif len(r) <= 0: raise ValueError(f"Cannot find filter {fname}") else: fil = Filter.from_ascii(r[0], *args, **kwargs) if (interp is True) and (lamb is not None): return fil.reinterp(lamb) else: return fil
[docs] def get_library_content(self) -> List[str]: """get the content of the library""" from glob import glob if self.source is None: raise ValueError("Library source not set") # Assume source is either a directory or a pattern or a single file try: os.path.isdir(self.source) lst = glob(self.source + "/*") except TypeError: lst = [self.source] # expand directories dircheck = True while dircheck is True: dircheck = False newlst = [] for entry in lst: if os.path.isdir(entry): newlst.extend(glob(entry + "/*")) dircheck = True else: newlst.append(entry) lst = newlst return lst
[docs] def load_all_filters( self, *, interp: bool = True, lamb: Optional[Union[npt.NDArray[np.floating], QuantityType]] = None, ) -> List[Filter]: """load all filters from the library Parameters ---------- interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter Returns ------- filters: list[filter] list of filter objects """ return self.load_filters(self.content, interp=interp, lamb=lamb)
[docs] def load_filters( self, names: List[str], *, interp: bool = True, lamb: Optional[Union[npt.NDArray, QuantityType]] = None, ) -> List[Filter]: """load a limited set of filters Parameters ---------- names: list[str] normalized names according to filtersLib interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter filterLib: path path to the filter library hd5 file Returns ------- filters: list[filter] list of filter objects """ filters = [ self._load_filter(fname, interp=interp, lamb=lamb) for fname in names ] return filters
[docs] def add_filters( self, filter_object: Filter, fmt="%.6f", **kwargs, ): """Add a filter to the library permanently Parameters ---------- filter_object: Filter object filter to add """ if not isinstance(filter_object, Filter): msg = "Argument of type Filter expected. Got type {0}" raise TypeError(msg.format(type(filter_object))) if filter_object.wavelength_unit is None: msg = "Filter wavelength must have units for storage." raise AttributeError(msg) fname = f"{self.source:s}/{filter_object.name:s}.csv" filter_object.write_to(fname.lower(), fmt=fmt, **kwargs)
[docs] class HDF_Library(Library): """:class:`Library` for storage based on HDF files""" hdf: Optional[tables.File] """Source file stream of the library""" mode: "Literal['r', 'w', 'a', 'r+']" = "r" """Mode of the library (file). It can be one of the following: * *'r'*: Read-only; no data can be modified. * *'w'*: Write; a new file is created (an existing file with the same name would be deleted). * *'a'*: Append; an existing file is opened for reading and writing, and if the file does not exist it is created. * *'r+'*: It is similar to 'a', but the file must already exist. """ _in_context: int """Number of times the library is in context (potentially nested)"""
[docs] def __init__( self, source: Optional[str] = None, mode: "Literal['r', 'w', 'a', 'r+']" = "r", ): super().__init__(source) self.hdf = None self.mode = mode self._in_context = 0
def __enter__(self): """Enter context""" if self.source is None: raise ValueError("Source must be provided") if self.hdf is None: self.hdf = tables.open_file(self.source, self.mode) self._in_context += 1 return self def __exit__(self, *exc_info): """end context""" if (self.hdf is not None) and (self._in_context < 2): self.hdf.close() self.hdf = None self._in_context -= 1 return False def _load_filter( self, fname: str, interp: bool = True, lamb: Union[None, npt.NDArray[np.floating], QuantityType] = None, ) -> Filter: """Load a given filter from the library Parameters ---------- fname: str normalized names according to filtersLib interp: bool, optional reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter integrationFilter: bool, optional set True for specail integraion filter such as Qion or E_uv if set, lamb should be given Returns ------- filter: Filter instance filter object """ with self as s: ftab = s.hdf if ftab is None: raise ValueError("Library not initialized") if hasattr(fname, "decode"): fnode = ftab.get_node("/filters/" + fname.decode("utf8")) # type: ignore else: fnode = ftab.get_node("/filters/" + fname) flamb = fnode[:]["WAVELENGTH"] transmit = fnode[:]["THROUGHPUT"] dtype = "photon" unit = None attrs = fnode.attrs if "DETECTOR" in attrs: dtype = attrs["DETECTOR"] if "WAVELENGTH_UNIT" in attrs: unit = attrs["WAVELENGTH_UNIT"] fil = Filter( flamb, transmit, name=fnode.name, dtype=dtype, unit=unit, ) if (lamb is not None) and interp: fil = fil.reinterp(lamb) return fil
[docs] def get_library_content(self) -> List[str]: """get the content of the library""" with self as s: if s.hdf is None: raise ValueError("Library not initialized") try: filters = s.hdf.root.content.cols.TABLENAME[:] except Exception: filters = list(s.hdf.root.filters._v_children.keys()) if hasattr(filters[0], "decode"): filters = [k.decode("utf8") for k in filters] return filters
[docs] def load_all_filters( self, *, interp: bool = True, lamb: Optional[Union[npt.NDArray[np.floating], QuantityType]] = None, ) -> List[Filter]: """load all filters from the library Parameters ---------- interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter Returns ------- filters: list[filter] list of filter objects """ return self.load_filters(self.content, interp=interp, lamb=lamb)
[docs] def load_filters( self, names: List[str], *, interp: bool = True, lamb: Optional[Union[npt.NDArray, QuantityType]] = None, ) -> List[Filter]: """load a limited set of filters Parameters ---------- names: list[str] normalized names according to filtersLib interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter filterLib: path path to the filter library hd5 file Returns ------- filters: list[filter] list of filter objects """ with self as s: filters = [ s._load_filter(fname, interp=interp, lamb=lamb) for fname in names ] return filters
[docs] def add_filter(self, f: Filter, **kwargs): """Add a filter to the library permanently Parameters ---------- f: Filter object filter to add """ if not isinstance(f, Filter): msg = "Argument of type Filter expected. Got type {0}" raise TypeError(msg.format(type(f))) if f.wavelength_unit is None: msg = "Filter wavelength must have units for storage." raise AttributeError(msg) append = kwargs.pop("append", True) f.write_to( f"{self.source:s}", tablename=f"/filters/{f.name}", createparents=True, append=append, **kwargs, )
[docs] def get_library(fname: Optional[str] = None, **kwargs): """Finds the appropriate class to load the library""" fname = fname or str(config.__default_passband_lib__) library_path = pathlib.Path(fname) if (library_path.suffix in (".hdf5", ".hd5", ".h5")) and (library_path.is_file()): return HDF_Library(fname, **kwargs) else: return Ascii_Library(fname, **kwargs)