Source code for pyphot.libraries

"""Library defintions as Collections of Filters"""

import os
import pathlib
from typing import List, Literal, Optional, Sequence, Union

import numpy as np
import numpy.typing as npt
import tables

from . import config
from .helpers import progress_enumerate
from .phot import Filter, QuantityType


[docs] class Library: """Common grounds for filter libraries""" source: Optional[str] """Source of the library"""
[docs] def __init__( self, source: Optional[str] = None, *args, **kwargs, ): """Construct the library""" self.source = source or str(config.__default_passband_lib__)
def __repr__(self) -> str: msg = "Filter Library: {0}\n{1:s}" return msg.format(self.source, object.__repr__(self)) def __enter__(self): """Enter context""" return self def __exit__(self, *exc_info): """end context""" return False def __len__(self) -> int: """Size of the library""" return len(self.content)
[docs] def to_csv(self, directory="./", progress=True, **kwargs): """Export each filter into a csv file with its own name Parameters ---------- directory: str directory to write into progress: bool show progress if set """ os.makedirs(directory, exist_ok=True) def export_filter(f: Filter): if f.wavelength_unit in (None, ""): f.wavelength_unit = "AA" f.write_to( f"{directory:s}/{f.name:s}.csv".lower(), fmt="%.6f", **kwargs, ) with self as s: for _, k in progress_enumerate( s.content, desc="export", show_progress=progress ): f = s[k] # s[k] can return a list if isinstance(f, Filter): export_filter(f) else: for fk in f: export_filter(fk)
[docs] def to_hdf(self, fname="filters.hd5", progress=True, **kwargs): """Export each filter into a csv file with its own name Parameters ---------- directory: str directory to write into progress: bool show progress if set """ def export_filter(f): if f.wavelength_unit in (None, ""): f.wavelength_unit = "AA" f.write_to( f"{fname:s}", tablename=f"/filters/{f.name}", createparents=True, append=True, silent=True, **kwargs, ) with self as s: for _, k in progress_enumerate( s.content, desc="export", show_progress=progress ): f = s[k] if isinstance(f, Filter): export_filter(f) else: for fk in f: export_filter(fk)
[docs] @classmethod def from_hd5(cls, filename, **kwargs) -> "HDF_Library": """Read in an HDF5 library""" return HDF_Library(filename, **kwargs)
[docs] @classmethod def from_ascii(cls, filename, **kwargs) -> "Ascii_Library": """Read in an ASCII library""" return Ascii_Library(filename, **kwargs)
@property def content(self) -> List[str]: """Get the content list""" return self.get_library_content() def __getitem__( self, name: Union[str, Sequence[str]] ) -> Union[Filter, List[Filter]]: """Make this object like a dictionary and load one or multiple filters""" with self as s: try: f = s._load_filter(name) except TypeError: f = [s._load_filter(k) for k in name] return f def _load_filter(self, *args, **kwargs) -> Filter: """Load a given filter from the library""" raise NotImplementedError
[docs] def get_library_content(self) -> List[str]: """get the content of the library""" raise NotImplementedError
[docs] def load_all_filters( self, *, interp: bool = True, lamb: Optional[Union[npt.NDArray[np.floating], QuantityType]] = None, ) -> List[Filter]: """load all filters from the library Parameters ---------- interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter Returns ------- filters: list[filter] list of filter objects """ raise NotImplementedError
[docs] def add_filter(self, f: Filter): """add a filter to the library""" raise NotImplementedError
[docs] def find(self, name: str, case_sensitive=True) -> List[str]: """Search for a filter in the library""" r = [] if case_sensitive: _n = name.lower() for k in self.get_library_content(): if _n in k.lower(): r.append(k) else: for k in self.content: if name in k: r.append(k) return r
[docs] class Ascii_Library(Library): """Interface one or multiple directory or many files as a filter :class:`Library` >>> lib = Ascii_Library(["ground", "hst", "myfilter.csv"]) """
[docs] def __init__( self, source: Optional[str] = None, glob_pattern: str = "*", *args, **kwargs, ): """Construct the library Parameters ---------- source: Optional[str] directory source of the library glob_pattern: str glob filter to apply in that directory, default "*" """ super().__init__(source, *args, **kwargs) self._glob_pattern = glob_pattern
def _load_filter( self, fname: str, interp: bool = True, lamb: Union[None, npt.NDArray[np.floating], QuantityType] = None, *args, **kwargs, ): """Load a given filter from the library Parameters ---------- fname : str Name of the filter to load. interp : bool, optional Whether to interpolate the filter to a given wavelength grid. lamb : array_like, optional Wavelength grid to interpolate the filter to. args : tuple, optional Additional arguments to pass to the filter constructor. kwargs : dict, optional Additional keyword arguments to pass to `Filter.from_ascii`. Returns ------- Filter The loaded filter. """ try: # attempt to load filter from ascii file fil = Filter.from_ascii(fname, *args, **kwargs) except Exception: content = self.content r = [k for k in content if fname in k] # if nothing matched, try all lowercase for names if len(r) <= 0: r = [k for k in content if fname.lower() in k] if len(r) > 1: raise ValueError( "Auto correction found multiple choices." "Refine name to one of {}".format(r) ) elif len(r) <= 0: raise ValueError(f"Cannot find filter {fname}") else: fil = Filter.from_ascii(r[0], *args, **kwargs) if (interp is True) and (lamb is not None): return fil.reinterp(lamb) else: return fil
[docs] def get_library_content(self) -> List[str]: """get the content of the library""" from glob import glob if self.source is None: raise ValueError("Library source not set") # Assume source is either a directory or a pattern or a single file try: os.path.isdir(self.source) lst = glob(os.path.join(self.source, self._glob_pattern)) except TypeError: lst = [self.source] # expand directories dircheck = True while dircheck is True: dircheck = False newlst = [] for entry in lst: if os.path.isdir(entry): newlst.extend(glob(os.path.join(entry, self._glob_pattern))) dircheck = True else: newlst.append(entry) lst = newlst return lst
[docs] def load_all_filters( self, *, interp: bool = True, lamb: Optional[Union[npt.NDArray[np.floating], QuantityType]] = None, ) -> List[Filter]: """load all filters from the library Parameters ---------- interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter Returns ------- filters: list[filter] list of filter objects """ return self.load_filters(self.content, interp=interp, lamb=lamb)
[docs] def load_filters( self, names: List[str], *, interp: bool = True, lamb: Optional[Union[npt.NDArray, QuantityType]] = None, ) -> List[Filter]: """load a limited set of filters Parameters ---------- names: list[str] normalized names according to filtersLib interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter filterLib: path path to the filter library hd5 file Returns ------- filters: list[filter] list of filter objects """ filters = [ self._load_filter(fname, interp=interp, lamb=lamb) for fname in names ] return filters
[docs] def add_filters( self, filter_object: Filter, fmt="%.6f", **kwargs, ): """Add a filter to the library permanently Parameters ---------- filter_object: Filter object filter to add """ if not isinstance(filter_object, Filter): msg = "Argument of type Filter expected. Got type {0}" raise TypeError(msg.format(type(filter_object))) if filter_object.wavelength_unit is None: msg = "Filter wavelength must have units for storage." raise AttributeError(msg) fname = f"{self.source:s}/{filter_object.name:s}.csv" filter_object.write_to(fname.lower(), fmt=fmt, **kwargs)
[docs] class HDF_Library(Library): """:class:`Library` for storage based on HDF files""" hdf: Optional[tables.File] """Source file stream of the library""" mode: "Literal['r', 'w', 'a', 'r+']" = "r" """Mode of the library (file). It can be one of the following: * *'r'*: Read-only; no data can be modified. * *'w'*: Write; a new file is created (an existing file with the same name would be deleted). * *'a'*: Append; an existing file is opened for reading and writing, and if the file does not exist it is created. * *'r+'*: It is similar to 'a', but the file must already exist. """ _in_context: int """Number of times the library is in context (potentially nested)"""
[docs] def __init__( self, source: Optional[str] = None, mode: "Literal['r', 'w', 'a', 'r+']" = "r", ): super().__init__(source) self.hdf = None self.mode = mode self._in_context = 0
def __enter__(self): """Enter context""" if self.source is None: raise ValueError("Source must be provided") if self.hdf is None: self.hdf = tables.open_file(self.source, self.mode) self._in_context += 1 return self def __exit__(self, *exc_info): """end context""" if (self.hdf is not None) and (self._in_context < 2): self.hdf.close() self.hdf = None self._in_context -= 1 return False def _load_filter( self, fname: str, interp: bool = True, lamb: Union[None, npt.NDArray[np.floating], QuantityType] = None, ) -> Filter: """Load a given filter from the library Parameters ---------- fname: str normalized names according to filtersLib interp: bool, optional reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter integrationFilter: bool, optional set True for specail integraion filter such as Qion or E_uv if set, lamb should be given Returns ------- filter: Filter instance filter object """ with self as s: ftab = s.hdf if ftab is None: raise ValueError("Library not initialized") if hasattr(fname, "decode"): fnode = ftab.get_node("/filters/" + fname.decode("utf8")) # type: ignore else: fnode = ftab.get_node("/filters/" + fname) flamb = fnode[:]["WAVELENGTH"] transmit = fnode[:]["THROUGHPUT"] dtype = "photon" unit = None attrs = fnode.attrs if "DETECTOR" in attrs: dtype = attrs["DETECTOR"] if "WAVELENGTH_UNIT" in attrs: unit = attrs["WAVELENGTH_UNIT"] fil = Filter( flamb, transmit, name=fnode.name, dtype=dtype, unit=unit, ) if (lamb is not None) and interp: fil = fil.reinterp(lamb) return fil
[docs] def get_library_content(self) -> List[str]: """get the content of the library""" with self as s: if s.hdf is None: raise ValueError("Library not initialized") try: filters = s.hdf.root.content.cols.TABLENAME[:] except Exception: filters = list(s.hdf.root.filters._v_children.keys()) if hasattr(filters[0], "decode"): filters = [k.decode("utf8") for k in filters] return filters
[docs] def load_all_filters( self, *, interp: bool = True, lamb: Optional[Union[npt.NDArray[np.floating], QuantityType]] = None, ) -> List[Filter]: """load all filters from the library Parameters ---------- interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter Returns ------- filters: list[filter] list of filter objects """ return self.load_filters(self.content, interp=interp, lamb=lamb)
[docs] def load_filters( self, names: List[str], *, interp: bool = True, lamb: Optional[Union[npt.NDArray, QuantityType]] = None, ) -> List[Filter]: """load a limited set of filters Parameters ---------- names: list[str] normalized names according to filtersLib interp: bool reinterpolate the filters over given lambda points lamb: ndarray[float, ndim=1] desired wavelength definition of the filter filterLib: path path to the filter library hd5 file Returns ------- filters: list[filter] list of filter objects """ with self as s: filters = [ s._load_filter(fname, interp=interp, lamb=lamb) for fname in names ] return filters
[docs] def add_filter(self, f: Filter, **kwargs): """Add a filter to the library permanently Parameters ---------- f: Filter object filter to add """ if not isinstance(f, Filter): msg = "Argument of type Filter expected. Got type {0}" raise TypeError(msg.format(type(f))) if f.wavelength_unit is None: msg = "Filter wavelength must have units for storage." raise AttributeError(msg) append = kwargs.pop("append", True) f.write_to( f"{self.source:s}", tablename=f"/filters/{f.name}", createparents=True, append=append, **kwargs, )
[docs] def get_library(fname: Optional[str] = None, **kwargs): """Finds the appropriate class to load the library""" fname = fname or str(config.__default_passband_lib__) library_path = pathlib.Path(fname) if (library_path.suffix in (".hdf5", ".hd5", ".h5")) and ( library_path.is_file() ): return HDF_Library(fname, **kwargs) else: return Ascii_Library(fname, **kwargs)