"""Read and write HDF5 files with pytables preserving metadata (tables, https://www.pytables.org/)
.. important::
This module relies on `pytables <https://www.pytables.org/>`_
"""
from os import PathLike
from os.path import basename
from typing import Literal, Optional, Union
import h5py
import numpy as np
import pandas as pd
from .header import HeaderInfo
def _decode_string_ifneeded(s: str) -> str:
"""Silently decode a string if it is bytes"""
if isinstance(s, bytes):
return s.decode("utf-8")
return s
[docs]
def from_hdf5(
filename: str,
tablename: Optional[str] = None,
*,
silent: bool = True,
**kwargs,
) -> tuple[pd.DataFrame, HeaderInfo]:
"""Generate the corresponding ascii Header that contains all necessary info
Parameters
----------
filename: str
file to read from
tablename: str
node containing the table
silent: bool
skip verbose messages
Returns
-------
hdr: str
string that will be be written at the beginning of the file
"""
with h5py.File(filename, **kwargs) as source:
tablename = tablename or "/"
if not tablename.startswith("/"):
tablename = "/" + tablename
node = source[tablename]
if not node:
raise ValueError(
f"Table '{tablename}' not found in file '{filename}'"
)
if not isinstance(node, h5py.Dataset):
raise TypeError(
f"Node '{tablename}' is not a dataset (found {type(node)})"
)
attrs = node.attrs
if not silent:
print(f"\tLoading table: {tablename}")
header = {}
aliases = {}
# read header
exclude = ["NROWS", "VERSION", "CLASS", "EXTNAME", "TITLE"]
for k, v in attrs.items():
if k not in exclude:
if not k.startswith("FIELD") and not k.startswith("ALIAS"):
header[k] = _decode_string_ifneeded(v)
elif k.startswith("ALIAS"):
c0, c1 = _decode_string_ifneeded(v).split("=")
aliases[c0] = c1
title = attrs.get("TITLE", "")
if title not in ["", "None", "Noname", None]:
header["NAME"] = _decode_string_ifneeded(title)
else:
header["NAME"] = f"{filename:s}/{node.name:s}"
# read column meta
units = {}
desc = {}
colnames = node.dtype.names
for k, colname in enumerate(colnames):
_u = attrs.get(f"FIELD_{k:d}_UNIT", None)
_u = attrs.get(f"{colname:s}_UNIT", _u)
_d = attrs.get(f"FIELD_{k:d}_DESC", None)
_d = attrs.get(f"{colname:s}_DESC", _d)
if _u is not None:
units[colname] = _decode_string_ifneeded(_u)
if _d is not None:
desc[colname] = _decode_string_ifneeded(_d)
data = node[:]
hdr = HeaderInfo(
header=header,
alias=aliases,
units=units,
comments=desc,
)
return pd.DataFrame(data), hdr
raise ValueError(
"Something went wrong without much information from pytables"
)
[docs]
def to_hdf5(
df: pd.DataFrame,
filename: Union[str, h5py.File, PathLike],
*,
tablename: Optional[str] = None,
header_info: Optional[HeaderInfo] = None,
mode: "Literal['r', 'w', 'a', 'r+', 'w-', 'x']" = "w",
append: bool = False,
**kwargs,
) -> None:
"""
Write a pandas DataFrame to an HDF5 file.
Parameters
----------
df : pd.DataFrame
The DataFrame to write.
filename : str or tables.File or PathLike
The filename or open HDF5 file to write to.
tablename : str, optional
The name of the table to write to.
header_info : HeaderInfo, optional
The header information to write. Default is to use from df.attrs
mode : {'r', 'w', 'a', 'r+'}, default 'w'
The mode to open the file in.
append : bool, default False
Whether to append data to an existing file.
**kwargs
Additional keyword arguments to pass to h5py.File.
Raises
------
Exception
If the HDF backend does not implement stream.
tables.FileModeError
If the file is already opened in a different mode.
ValueError
If something went wrong without much information from pytables.
"""
if hasattr(filename, "read"):
raise Exception("HDF backend does not implement stream")
# ensure mode is valid for appending data
mode = "a" if append is True else mode
# open output file, or if provided tables.File, check it's in the correct mode
if isinstance(filename, h5py.File):
if (filename.mode != mode) & (mode != "r"):
raise RuntimeError(
f"The file {basename(filename.filename)} is already opened in a different mode (mode {filename.mode})"
)
hd5 = filename
else:
hd5 = h5py.File(str(filename), mode=mode)
if header_info is None:
# attempt to get it from the dataframe attributes
header_info = HeaderInfo(
header={
k: v
for k, v in df.attrs.items()
if k not in ["aliases", "units", "comments"]
},
alias=df.attrs.get("aliases", {}),
units=df.attrs.get("units", {}),
comments=df.attrs.get("comments", {}),
)
# check table name and path
tablename_path = (
tablename
or df.name
or header_info.header.get("name", None)
or header_info.header.get("NAME", None)
)
if tablename_path in ("", None, "Noname", "None"):
tablename_path = "/data"
if not tablename_path.startswith("/"):
tablename_path = "/" + tablename_path
if append:
try:
tab = hd5[tablename_path]
if not isinstance(tab, h5py.Dataset):
raise TypeError(f"Node is not a table (got {type(tab)})")
if tab.dtype is None:
raise ValueError("Table dtype description is missing")
dtypes = tab.dtype # pyright: ignore / it is there
data = df.to_records(index=False, column_dtypes=dtypes)
data_length = data.shape[0]
# try resize the table first
# this requires the table to be resizable
try:
tab.resize(tab.shape[0] + data_length, axis=0)
# write the data to the table
tab[-data_length:] = data
except TypeError:
# Only chunked datasets can be resized
# We need to make a new data array with all data and replace the node
attrs = {k: v for k, v in tab.attrs.items()}
new_data = np.hstack([tab[:], data])
del hd5[tablename_path]
tab = hd5.create_dataset(
tablename_path, data=new_data, dtype=dtypes
)
tab.attrs.update(attrs)
tab.flush()
except KeyError:
print(
f"Warning: Table {tablename_path} does not exist. A new table will be created."
)
append = False
if not append:
# we need the path and the name separately
w_ = tablename_path.split("/")
where = "/".join(w_[:-1])
tablename = str(w_[-1])
if tablename in (None, ""):
raise ValueError(
"Table name cannot be empty. Did you leave a trailing slash?"
)
if where in ("", None):
where = "/"
data = df.to_records(index=False)
tab = hd5.create_dataset(f"{where}/{tablename}", data=data, **kwargs)
# update hdr attrs with header_info.header
for k, v in header_info.header.items():
tab.attrs[k] = v
if "TITLE" not in header_info.header:
tab.attrs["TITLE"] = tablename
# add column descriptions and units
for e, colname in enumerate(df.columns):
_u = header_info.units.get(colname, None)
_d = header_info.comments.get(colname, None)
if _u is not None:
tab.attrs["FIELD_{0:d}_UNIT"] = _u
if _d is not None:
tab.attrs["FIELD_{0:d}_DESC"] = _d
# add aliases
for i, (k, v) in enumerate(header_info.alias.items()):
tab.attrs[f"ALIAS{i:d}"] = f"{k:s}={v:s}"
tab.flush()
if not isinstance(filename, h5py.File):
hd5.flush()
hd5.close()