Skip to content

halocat.xi_AB_io

xi_AB_io

On-disk storage for the cumulative-threshold ξ_AB cube.

Each per-realisation file holds one (gravity, redshift, imodel, ibox) measurement of

xi_AB[N_r, N_t, N_t]  =  xi_hh(r | n_h(>=t_i), n_h(>=t_j))

on a fixed mass-threshold grid plus the halo counts n_h_cum[N_t] above each threshold. The schema is dense and symmetric in the last two axes — emulator inputs read the cube directly without any pair-sparse densification.

Atomic writes via tempfile + os.replace give a corruption guarantee on POSIX-rename-atomic filesystems (Lustre, GPFS). Cache invalidation hashes a spec tuple (not raw array bytes) over the parameters that actually affect the measurement.

SCHEMA_VERSION module-attribute

SCHEMA_VERSION = '1'

XiABRecord dataclass

XiABRecord(r_edges: ndarray, r: ndarray, thresholds: ndarray, xi_AB: ndarray, n_h_cum: ndarray, n_pairs: ndarray | None, box_size: float, attrs: dict = dict())

In-memory view of one xi_AB_thresh.hdf5 file.

Attributes:

Name Type Description
r_edges (N_r+1,) float64

Separation bin edges (Mpc/h).

r (N_r,) float64

Arithmetic mean of r_edges — never pycorr.sepavg. Filled in by :func:read_xi_AB from r_edges on read; the constructor will fill it on demand if the user passes r=None.

thresholds (N_t,) float64

Sorted log10(M / [Msun/h]) thresholds.

xi_AB (N_r, N_t, N_t) float64

Bin-averaged correlation cube. Symmetric in the last two axes.

n_h_cum (N_t,) int64

Halo counts above each threshold (raw count in the simulation box, not density). Convert to density via n_h_cum / box_size**3.

n_pairs (N_t, N_t) int64 or None

Optional diagnostic: pair counts per threshold pair, for shot-noise estimates. None if not stored.

box_size float

Periodic box side length (Mpc/h).

attrs dict

Top-level HDF5 attributes — see :func:write_xi_AB for the canonical keys.

r_edges instance-attribute

r_edges: ndarray

r instance-attribute

r: ndarray

thresholds instance-attribute

thresholds: ndarray

xi_AB instance-attribute

xi_AB: ndarray

n_h_cum instance-attribute

n_h_cum: ndarray

n_pairs instance-attribute

n_pairs: ndarray | None

box_size instance-attribute

box_size: float

attrs class-attribute instance-attribute

attrs: dict = field(default_factory=dict)

gravity property

gravity: str

redshift property

redshift: float

imodel property

imodel: int

ibox property

ibox: int

cache_key property

cache_key: str

xi_AB_path

xi_AB_path(gravity: str, redshift: float, imodel: int, ibox: int, *, root: str | None = None) -> str

Path to the per-realisation xi_AB_thresh.hdf5.

Mirrors the layout of xi_hh.hdf5 / hmf.hdf5<OUTPUT_ROOT>/gravity_<g>/z_<z:.2f>/imodel_<m>/box_<b>/xi_AB_thresh.hdf5.

Parameters:

Name Type Description Default
root str

Override :data:halocat.config.OUTPUT_ROOT.

None
Source code in halocat/xi_AB_io.py
def xi_AB_path(gravity: str, redshift: float, imodel: int, ibox: int,
               *, root: str | None = None) -> str:
    """Path to the per-realisation ``xi_AB_thresh.hdf5``.

    Mirrors the layout of ``xi_hh.hdf5`` / ``hmf.hdf5`` —
    ``<OUTPUT_ROOT>/gravity_<g>/z_<z:.2f>/imodel_<m>/box_<b>/xi_AB_thresh.hdf5``.

    Parameters
    ----------
    root : str, optional
        Override :data:`halocat.config.OUTPUT_ROOT`.
    """
    if root is None:
        d = C.get_output_dir(gravity, redshift, imodel, ibox)
    else:
        d = os.path.join(root, f"gravity_{gravity}",
                         f"z_{redshift:.2f}", f"imodel_{imodel}",
                         f"box_{ibox}")
    return os.path.join(d, "xi_AB_thresh.hdf5")

xi_AB_cache_key

xi_AB_cache_key(*, thresholds, r_edges, mass_definition: str, code_version: str, source_dat: str) -> str

blake2b-128 hex digest of the canonicalised measurement spec.

Encodes everything that changes the measurement: - r-binning summary (_r_spec); - thresholds (sorted, rounded to 6 dp); - mass column ("Mtot"); - halocat code version (cache invalidates on releases that change the measurement code path); - the source_dat path from halo.hdf5 (the upstream catalogue identity — invalidates if the user re-runs against a different DAT under the same realisation slot).

Source code in halocat/xi_AB_io.py
def xi_AB_cache_key(*,
                    thresholds,
                    r_edges,
                    mass_definition: str,
                    code_version: str,
                    source_dat: str) -> str:
    """blake2b-128 hex digest of the canonicalised measurement spec.

    Encodes everything that *changes the measurement*:
      - r-binning summary (``_r_spec``);
      - thresholds (sorted, rounded to 6 dp);
      - mass column (``"Mtot"``);
      - halocat code version (cache invalidates on releases that
        change the measurement code path);
      - the ``source_dat`` path from ``halo.hdf5`` (the upstream
        catalogue identity — invalidates if the user re-runs against
        a different DAT under the same realisation slot).
    """
    spec = (
        "xi_AB_v1",
        _r_spec(r_edges),
        tuple(round(float(t), 6) for t in sorted(np.asarray(thresholds,
                                                             dtype=np.float64))),
        str(mass_definition),
        str(code_version),
        str(source_dat),
    )
    payload = json.dumps(spec, sort_keys=True, default=float).encode("utf-8")
    return hashlib.blake2b(payload, digest_size=16).hexdigest()

write_xi_AB

write_xi_AB(record: XiABRecord, path: str, *, atomic: bool = True) -> None

Write record to path.

With atomic=True (default), the writer first dumps to <path>.tmp.<host>.<pid> then os.replace(...) — a POSIX-atomic rename on Lustre / GPFS, so concurrent writers cannot corrupt the destination. A soft warning is logged if sibling tmp files already exist (likely concurrent writer).

Parameters:

Name Type Description Default
record XiABRecord
required
path str
required
atomic bool

Default True. Set False for in-place writes (faster, but not crash-safe).

True
Source code in halocat/xi_AB_io.py
def write_xi_AB(record: XiABRecord, path: str, *, atomic: bool = True) -> None:
    """Write ``record`` to ``path``.

    With ``atomic=True`` (default), the writer first dumps to
    ``<path>.tmp.<host>.<pid>`` then ``os.replace(...)`` — a
    POSIX-atomic rename on Lustre / GPFS, so concurrent writers
    cannot corrupt the destination. A soft warning is logged if
    sibling tmp files already exist (likely concurrent writer).

    Parameters
    ----------
    record : XiABRecord
    path : str
    atomic : bool, optional
        Default True. Set False for in-place writes (faster, but not
        crash-safe).
    """
    if not atomic:
        _h5_write_into(path, record)
        return

    siblings = glob.glob(f"{path}.tmp.*")
    if siblings:
        _log.warning(
            "atomic write: %d existing tmp file(s) detected next to %s; "
            "possible concurrent writer (this is a soft check, not a lock)",
            len(siblings), path,
        )
    tmp = f"{path}.tmp.{socket.gethostname()}.{os.getpid()}"
    try:
        _h5_write_into(tmp, record)
        os.replace(tmp, path)
    except BaseException:
        try:
            os.unlink(tmp)
        except FileNotFoundError:
            pass
        raise

read_xi_AB

read_xi_AB(path: str) -> XiABRecord

Read one xi_AB_thresh.hdf5 file into an :class:XiABRecord.

r is recomputed as the arithmetic mean of r_edges — never read from disk, mirroring the project-wide rule.

Raises:

Type Description
ValueError

If schema_version is missing or unknown.

Source code in halocat/xi_AB_io.py
def read_xi_AB(path: str) -> XiABRecord:
    """Read one ``xi_AB_thresh.hdf5`` file into an :class:`XiABRecord`.

    ``r`` is recomputed as the arithmetic mean of ``r_edges`` — never
    read from disk, mirroring the project-wide rule.

    Raises
    ------
    ValueError
        If ``schema_version`` is missing or unknown.
    """
    with h5py.File(path, "r") as f:
        attrs = {k: _decode(v) for k, v in f.attrs.items()}
        sv = str(attrs.get("schema_version", ""))
        if sv != SCHEMA_VERSION:
            raise ValueError(
                f"{path}: schema_version={sv!r}; this halocat expects "
                f"{SCHEMA_VERSION!r}. Re-measure with overwrite=True."
            )
        r_edges = f["r_edges"][...]
        thresholds = f["thresholds"][...]
        xi_AB = f["xi_AB"][...]
        n_h_cum = f["n_h_cum"][...]
        n_pairs = f["n_pairs"][...] if "n_pairs" in f else None
        box_size = float(attrs.get("box_size", np.nan))

    r = 0.5 * (r_edges[:-1] + r_edges[1:])
    return XiABRecord(
        r_edges=np.asarray(r_edges, dtype=np.float64),
        r=np.asarray(r, dtype=np.float64),
        thresholds=np.asarray(thresholds, dtype=np.float64),
        xi_AB=np.asarray(xi_AB, dtype=np.float64),
        n_h_cum=np.asarray(n_h_cum, dtype=np.int64),
        n_pairs=(np.asarray(n_pairs, dtype=np.int64)
                 if n_pairs is not None else None),
        box_size=box_size,
        attrs=attrs,
    )

is_cache_valid

is_cache_valid(path: str, expected_cache_key: str) -> bool

True iff path exists, schema matches, and cache_key agrees.

Reads only attrs (no datasets) — ~ms per file even on cold Lustre.

Source code in halocat/xi_AB_io.py
def is_cache_valid(path: str, expected_cache_key: str) -> bool:
    """True iff ``path`` exists, schema matches, and ``cache_key`` agrees.

    Reads only attrs (no datasets) — ~ms per file even on cold Lustre.
    """
    if not os.path.exists(path):
        return False
    try:
        with h5py.File(path, "r") as f:
            sv = str(_decode(f.attrs.get("schema_version", "")))
            ck = str(_decode(f.attrs.get("cache_key", "")))
    except (OSError, KeyError):
        return False
    return sv == SCHEMA_VERSION and ck == expected_cache_key

to_xi_hh_record

to_xi_hh_record(record: XiABRecord)

Convert an :class:XiABRecord to a threshold-style XiHHRecord.

The dense (N_r, N_t, N_t) cube is repacked into the pair-sparse upper-triangular layout that :func:halocat.xi_hh_rebin.rebin_xi_hh_record expects, with mass_bins[:, 0] = thresholds and mass_bins[:, 1] = +inf. Halo counts n1 / n2 are filled per pair from n_h_cum.

Returns:

Type Description
XiHHRecord

A new in-memory record; not written to disk.

Source code in halocat/xi_AB_io.py
def to_xi_hh_record(record: XiABRecord):
    """Convert an :class:`XiABRecord` to a threshold-style ``XiHHRecord``.

    The dense (N_r, N_t, N_t) cube is repacked into the pair-sparse
    upper-triangular layout that
    :func:`halocat.xi_hh_rebin.rebin_xi_hh_record` expects, with
    ``mass_bins[:, 0] = thresholds`` and ``mass_bins[:, 1] = +inf``.
    Halo counts ``n1`` / ``n2`` are filled per pair from
    ``n_h_cum``.

    Returns
    -------
    XiHHRecord
        A new in-memory record; not written to disk.
    """
    # Late import: dataloader imports config (cheap), but not the
    # other way around, so order matters only in a circular sense.
    from .dataloader import XiHHRecord

    N_t = int(record.thresholds.size)
    N_r = int(record.r.size)

    mass_bins = np.column_stack([
        record.thresholds.astype(np.float64),
        np.full(N_t, np.inf, dtype=np.float64),
    ])

    pair_keys: list[str] = []
    pair_indices: list[tuple[int, int]] = []
    rows_xi = []
    rows_log10M1, rows_log10M2 = [], []
    rows_n1, rows_n2 = [], []

    for i in range(N_t):
        for j in range(i, N_t):
            pair_keys.append(f"M{i}_M{j}")
            pair_indices.append((i, j))
            rows_xi.append(record.xi_AB[:, i, j])
            rows_log10M1.append(mass_bins[i])
            rows_log10M2.append(mass_bins[j])
            rows_n1.append(int(record.n_h_cum[i]))
            rows_n2.append(int(record.n_h_cum[j]))

    P = len(pair_keys)
    r2 = np.broadcast_to(record.r[None, :], (P, N_r)).copy()
    xi2 = np.stack(rows_xi, axis=0)

    return XiHHRecord(
        r_edges=record.r_edges.copy(),
        mass_bins=mass_bins,
        pairs=pair_keys,
        pair_indices=np.asarray(pair_indices, dtype=int),
        r=r2,
        xi=xi2,
        n1=np.asarray(rows_n1, dtype=int),
        n2=np.asarray(rows_n2, dtype=int),
        log10M1=np.asarray(rows_log10M1, dtype=np.float64),
        log10M2=np.asarray(rows_log10M2, dtype=np.float64),
        attrs=dict(record.attrs),
    )

make_attrs

make_attrs(*, gravity: str, redshift: float, imodel: int, ibox: int, snapnum: int, cache_key: str, code_version: str, source_dat: str, halo_file: str, mass_definition: str = C.MASS_COLUMN, estimator: str = 'natural', pycorr_version: str = '', corrfunc_version: str = '') -> dict

Build the canonical attrs dict for a fresh :class:XiABRecord.

The writer adds schema_version and box_size itself; everything else lives here.

Source code in halocat/xi_AB_io.py
def make_attrs(*, gravity: str, redshift: float, imodel: int, ibox: int,
               snapnum: int, cache_key: str,
               code_version: str, source_dat: str,
               halo_file: str,
               mass_definition: str = C.MASS_COLUMN,
               estimator: str = "natural",
               pycorr_version: str = "",
               corrfunc_version: str = "") -> dict:
    """Build the canonical attrs dict for a fresh :class:`XiABRecord`.

    The writer adds ``schema_version`` and ``box_size`` itself; everything
    else lives here.
    """
    return {
        "gravity": str(gravity),
        "redshift": float(redshift),
        "imodel": int(imodel),
        "ibox": int(ibox),
        "snapnum": int(snapnum),
        "mass_definition": str(mass_definition),
        "mass_units": "Msun/h",
        "cache_key": str(cache_key),
        "code_version": str(code_version),
        "source_dat": str(source_dat),
        "halo_file": str(halo_file),
        "measured_at": _dt.datetime.now(_dt.timezone.utc)
                          .strftime("%Y-%m-%dT%H:%M:%SZ"),
        "estimator": str(estimator),
        "pycorr_version": str(pycorr_version),
        "corrfunc_version": str(corrfunc_version),
    }