Skip to content

halocat.xi_AB_manager

xi_AB_manager

Manager for cumulative-threshold ξ_AB measurements + emulator assembly.

Two user-facing entry points:

  • :class:XiABLoader — load-or-measure ξ_AB for one realisation (one (gravity, redshift, imodel, ibox) tuple). Mirrors the :class:halocat.dataloader.HMFLoader / :class:XiHHLoader workflow but writes to its own xi_AB_thresh.hdf5 schema and invokes the threshold-grid pair-counting optimisation in :func:halocat.tpcf._measure_xi_AB_threshold_grid.

  • :func:assemble_emulator_training_set — stack across a sub-grid (one gravity, one redshift, many imodels × iboxes) into the emulator-shaped tensors. Optionally materialises a single suite-snapshot HDF5 file and / or memoises the assembled dict in process memory.

The manager calls into existing halocat infrastructure for halo catalogue access (halocat.io.read_halo_hdf5 + on-demand .DAT → halo.hdf5 reformatting via halocat.pipeline.run_single); cosmology parameters (halocat.cosmology.get_cosmology); and storage (halocat.xi_AB_io).

DEFAULT_THRESHOLDS module-attribute

DEFAULT_THRESHOLDS = round(arange(12.55, 15.45 + 1e-09, 0.05), 6)

SUITE_SCHEMA_VERSION module-attribute

SUITE_SCHEMA_VERSION = '1'

XiABLoader

XiABLoader(thresholds=None, r_edges=None, *, mass_definition: str = 'Mtot', overwrite: bool = False, write_halo_hdf5: bool = False, logger=None)

Bases: _BaseHaloStatLoader

Load-or-measure dataloader for the cumulative-threshold ξ_AB cube.

Each call resolves to a single (gravity, redshift, imodel, ibox) realisation, loads the cached xi_AB_thresh.hdf5 if its cache_key matches the current spec, and otherwise invokes the threshold-grid pair-counting optimisation (:func:halocat.tpcf._measure_xi_AB_threshold_grid) and writes a fresh file atomically.

Parameters:

Name Type Description Default
thresholds array_like

Shared mass-threshold grid in log10(M / [Msun/h]). Defaults to np.arange(12.55, 15.45 + 1e-9, 0.05) (59 thresholds, Δ log M = 0.05 dex). Validated on construction; out-of-range raises ValueError.

None
r_edges array_like

Separation bin edges (Mpc/h). Defaults to :data:halocat.config.R_EDGES. Stored in the cache key.

None
mass_definition 'Mtot'

Halo mass column name. Currently only "Mtot" is supported.

"Mtot"
overwrite bool

If True, ignore the existing cache and re-measure.

False
write_halo_hdf5 bool

Force re-reformatting halo.hdf5 from its source CatshortV.*.DAT even if the HDF5 file exists.

False
logger Logger
None

Examples:

>>> loader = XiABLoader()
>>> rec = loader.get("LCDM", 0.25, imodel=0, ibox=1)
>>> rec.xi_AB.shape
(60, 59, 59)
Source code in halocat/xi_AB_manager.py
def __init__(self,
             thresholds=None,
             r_edges=None,
             *,
             mass_definition: str = "Mtot",
             overwrite: bool = False,
             write_halo_hdf5: bool = False,
             logger=None):
    super().__init__(overwrite=overwrite,
                     write_halo_hdf5=write_halo_hdf5,
                     logger=logger or _log)
    if thresholds is None:
        thresholds = DEFAULT_THRESHOLDS
    self.thresholds = np.asarray(thresholds, dtype=np.float64)
    self.r_edges = (np.asarray(r_edges, dtype=np.float64)
                    if r_edges is not None
                    else np.asarray(C.R_EDGES, dtype=np.float64))
    self.mass_definition = str(mass_definition)
    if self.mass_definition != "Mtot":
        raise ValueError(
            f"mass_definition={mass_definition!r} not supported; "
            f"only 'Mtot' for now"
        )
    self._validate_thresholds()

output_filename class-attribute instance-attribute

output_filename = 'xi_AB_thresh.hdf5'

thresholds instance-attribute

thresholds = asarray(thresholds, dtype=float64)

r_edges instance-attribute

r_edges = asarray(r_edges, dtype=float64) if r_edges is not None else asarray(R_EDGES, dtype=float64)

mass_definition instance-attribute

mass_definition = str(mass_definition)

overwrite instance-attribute

overwrite = overwrite

write_halo_hdf5 instance-attribute

write_halo_hdf5 = write_halo_hdf5

log instance-attribute

log = logger or get_logger('halocat.dataloader')

get

get(gravity: str, redshift: float, imodel: int, ibox: int) -> XiABRecord

Return the ξ_AB record, measuring on demand if missing/stale.

Source code in halocat/xi_AB_manager.py
def get(self, gravity: str, redshift: float,
        imodel: int, ibox: int) -> XiABRecord:
    """Return the ξ_AB record, measuring on demand if missing/stale."""
    path = self.path(gravity, redshift, imodel, ibox)
    expected = self._expected_cache_key(gravity, redshift, imodel, ibox)
    if not self.overwrite and is_cache_valid(path, expected):
        return self._read(path)
    self.log.info(
        "XiABLoader: measuring %s z=%.2f m%d b%d",
        gravity, redshift, imodel, ibox,
    )
    return self.measure(gravity, redshift, imodel, ibox,
                        expected_cache_key=expected)

measure

measure(gravity: str, redshift: float, imodel: int, ibox: int, *, expected_cache_key: str | None = None) -> XiABRecord

Force one ξ_AB measurement and write the file atomically.

Source code in halocat/xi_AB_manager.py
def measure(self, gravity: str, redshift: float,
            imodel: int, ibox: int,
            *, expected_cache_key: str | None = None) -> XiABRecord:
    """Force one ξ_AB measurement and write the file atomically."""
    out_dir = C.get_output_dir(gravity, redshift, imodel, ibox)
    halo_path = os.path.join(out_dir, "halo.hdf5")
    if not os.path.exists(halo_path) or self.write_halo_hdf5:
        status = run_single(
            gravity, redshift, imodel, ibox,
            overwrite=self.write_halo_hdf5, logger=self.log,
            do_halo=True, do_hmf=False, do_tpcf=False, do_vel=False,
        )
        if not status["ok"]:
            raise RuntimeError(
                f"halo.hdf5 reformat failed for {gravity} "
                f"z={redshift:.2f} m{imodel} b{ibox}: {status['error']}"
            )

    data = read_halo_hdf5(halo_path)
    positions = np.array([data["x"], data["y"], data["z"]])     # (3, N)
    masses = np.asarray(data[self.mass_definition], dtype=np.float64)

    with h5py.File(halo_path, "r") as f:
        sd = f.attrs.get("source_dat", b"")
        source_dat = sd.decode() if isinstance(sd, bytes) else str(sd)
        snapnum = int(f.attrs.get("snapnum", -1))

    xi, n_h, n_pairs, r = _measure_xi_AB_threshold_grid(
        positions, masses, self.thresholds, self.r_edges,
        box_size=float(C.BOX_SIZE), logger=self.log,
    )

    if expected_cache_key is None:
        expected_cache_key = xi_AB_cache_key(
            thresholds=self.thresholds, r_edges=self.r_edges,
            mass_definition=self.mass_definition,
            code_version=__version__, source_dat=source_dat,
        )

    attrs = make_attrs(
        gravity=gravity, redshift=float(redshift),
        imodel=int(imodel), ibox=int(ibox), snapnum=snapnum,
        cache_key=expected_cache_key,
        code_version=__version__, source_dat=source_dat,
        halo_file=halo_path, mass_definition=self.mass_definition,
        pycorr_version=_safe_version("pycorr"),
        corrfunc_version=_safe_version("Corrfunc"),
    )

    record = XiABRecord(
        r_edges=self.r_edges, r=r, thresholds=self.thresholds,
        xi_AB=xi, n_h_cum=n_h, n_pairs=n_pairs,
        box_size=float(C.BOX_SIZE), attrs=attrs,
    )
    write_xi_AB(record, self.path(gravity, redshift, imodel, ibox),
                atomic=True)
    return record

get_grid

get_grid(gravities: list[str] | None = None, redshifts: list[float] | None = None, imodels: list[int] | None = None, iboxes: list[int] | None = None, *, skip_missing: bool = False) -> dict

Stack ξ_AB across a sub-grid into (G, Z, M, B, N_r, N_t, N_t).

Returns a dict with axis labels, the shared thresholds / r_edges / r, and stacked xi_AB, n_h_cum, plus a bool present mask of shape (G, Z, M, B). Mirrors :meth:HMFLoader.get_grid / :meth:XiHHLoader.get_grid.

Source code in halocat/xi_AB_manager.py
def get_grid(self,
             gravities: list[str] | None = None,
             redshifts: list[float] | None = None,
             imodels: list[int] | None = None,
             iboxes: list[int] | None = None,
             *,
             skip_missing: bool = False) -> dict:
    """Stack ξ_AB across a sub-grid into ``(G, Z, M, B, N_r, N_t, N_t)``.

    Returns a dict with axis labels, the shared
    ``thresholds`` / ``r_edges`` / ``r``, and stacked ``xi_AB``,
    ``n_h_cum``, plus a bool ``present`` mask of shape
    ``(G, Z, M, B)``. Mirrors :meth:`HMFLoader.get_grid` /
    :meth:`XiHHLoader.get_grid`.
    """
    gravities = list(gravities) if gravities else list(C.GRAVITIES)
    redshifts = list(redshifts) if redshifts else list(C.REDSHIFTS)
    imodels = list(imodels) if imodels else list(C.IMODELS)
    iboxes = list(iboxes) if iboxes else list(C.IBOXES)

    G, Z, M, B = (len(gravities), len(redshifts),
                  len(imodels), len(iboxes))
    N_r = self.r_edges.size - 1
    N_t = self.thresholds.size

    xi = np.full((G, Z, M, B, N_r, N_t, N_t), np.nan, dtype=np.float64)
    n_h = np.full((G, Z, M, B, N_t), -1, dtype=np.int64)
    present = np.zeros((G, Z, M, B), dtype=bool)

    for ig, gravity in enumerate(gravities):
        for iz, z in enumerate(redshifts):
            for im, imodel in enumerate(imodels):
                for ib, ibox in enumerate(iboxes):
                    path = self.path(gravity, z, imodel, ibox)
                    if skip_missing and not os.path.exists(path):
                        continue
                    rec = self.get(gravity, z, imodel, ibox)
                    xi[ig, iz, im, ib] = rec.xi_AB
                    n_h[ig, iz, im, ib] = rec.n_h_cum
                    present[ig, iz, im, ib] = True

    return {
        "gravities": np.array(gravities),
        "redshifts": np.array(redshifts),
        "imodels": np.array(imodels),
        "iboxes": np.array(iboxes),
        "thresholds": self.thresholds,
        "r_edges": self.r_edges,
        "r": 0.5 * (self.r_edges[:-1] + self.r_edges[1:]),
        "xi_AB": xi,
        "n_h_cum": n_h,
        "present": present,
    }

path classmethod

path(gravity: str, redshift: float, imodel: int, ibox: int) -> str
Source code in halocat/dataloader.py
@classmethod
def path(cls, gravity: str, redshift: float, imodel: int, ibox: int) -> str:
    return os.path.join(
        C.get_output_dir(gravity, redshift, imodel, ibox),
        cls.output_filename,
    )

exists classmethod

exists(gravity: str, redshift: float, imodel: int, ibox: int) -> bool
Source code in halocat/dataloader.py
@classmethod
def exists(cls, gravity: str, redshift: float,
           imodel: int, ibox: int) -> bool:
    return os.path.exists(cls.path(gravity, redshift, imodel, ibox))

clear_in_process_cache

clear_in_process_cache() -> None

Drop the module-level memo cache used by :func:assemble_emulator_training_set. Call this if the underlying per-realisation caches were re-measured outside the manager.

Source code in halocat/xi_AB_manager.py
def clear_in_process_cache() -> None:
    """Drop the module-level memo cache used by
    :func:`assemble_emulator_training_set`. Call this if the underlying
    per-realisation caches were re-measured outside the manager."""
    _ASSEMBLY_CACHE.clear()

assemble_emulator_training_set

assemble_emulator_training_set(gravity: str, redshift: float, imodels: Sequence[int], iboxes: Sequence[int], *, thresholds=None, r_edges=None, mass_definition: str = 'Mtot', avg_iboxes: bool = True, materialise_snapshot: bool = False, snapshot_path: str | None = None, in_process_cache: bool = True, overwrite: bool = False, write_halo_hdf5: bool = False, logger=None) -> dict

Build emulator-shaped tensors for one gravity × one redshift.

Calls :meth:XiABLoader.get for every (gravity, redshift, imodel, ibox) in the requested sub-grid (measuring on demand for cache misses), then stacks into the (N_cosmo, N_r, N_t, N_t) cube the emulator training pipeline expects, plus the cosmology parameter matrix :func:halocat.cosmology.theta_keys -ordered.

Parameters:

Name Type Description Default
gravity ('LCDM', 'fRn1')

The θ schema is gravity-conditional — LCDM has 4 parameters, fRn1 has 5 — so this function always returns one gravity per call.

"LCDM"
redshift float
required
imodels sequence of int
required
iboxes sequence of int
required
thresholds see :class:`XiABLoader`.
None
r_edges see :class:`XiABLoader`.
None
mass_definition see :class:`XiABLoader`.
None
avg_iboxes bool

If True, average ξ_AB and n_h_cum across iboxes (and report the across-ibox variance separately). If False, the leading axis stacks every (imodel, ibox) pair as an independent sample.

True
materialise_snapshot bool

If True, write the assembled dict to a single suite-level HDF5 file at snapshot_path (or under OUTPUT_ROOT/snapshots/ if snapshot_path is None). Subsequent calls with a matching suite-config hash skip per-realisation IO entirely.

False
snapshot_path str
None
in_process_cache bool

Memoise the assembled dict in module memory keyed by (gravity, redshift, imodels, iboxes, suite_hash). Cached results are validated against max(mtime) of the per-realisation files; a re-measurement invalidates the entry.

True
overwrite bool

Forwarded to :class:XiABLoader.

False
write_halo_hdf5 bool

Forwarded to :class:XiABLoader.

False
logger Logger
None

Returns:

Name Type Description
out dict

Keys (with shapes for avg_iboxes=True):

  • 'xi_AB' : (N_cosmo, N_r, N_t, N_t)
  • 'xi_AB_var' : (N_cosmo, N_r, N_t, N_t) — across-ibox variance; NaN if len(iboxes) <= 1
  • 'theta' : (N_cosmo, n_params)
  • 'theta_keys' : tuple of str
  • 'n_h_cum' : (N_cosmo, N_t) halo counts (mean over iboxes)
  • 'r' : (N_r,)
  • 'r_edges' : (N_r+1,)
  • 'thresholds' : (N_t,)
  • 'imodels' : (N_cosmo,)
  • 'iboxes' : (N_iboxes,)
  • 'gravity' : str
  • 'redshift' : float
  • 'cache_key' : str (suite-config hash)
  • 'present' : (N_cosmo, N_iboxes) bool

For avg_iboxes=False the leading axis instead has length N_cosmo * N_iboxes (with imodels, iboxes per-row), and xi_AB_var is omitted.

Source code in halocat/xi_AB_manager.py
def assemble_emulator_training_set(
    gravity: str,
    redshift: float,
    imodels: Sequence[int],
    iboxes: Sequence[int],
    *,
    thresholds=None,
    r_edges=None,
    mass_definition: str = "Mtot",
    avg_iboxes: bool = True,
    materialise_snapshot: bool = False,
    snapshot_path: str | None = None,
    in_process_cache: bool = True,
    overwrite: bool = False,
    write_halo_hdf5: bool = False,
    logger=None,
) -> dict:
    """Build emulator-shaped tensors for one ``gravity`` × one redshift.

    Calls :meth:`XiABLoader.get` for every
    ``(gravity, redshift, imodel, ibox)`` in the requested sub-grid
    (measuring on demand for cache misses), then stacks into the
    ``(N_cosmo, N_r, N_t, N_t)`` cube the emulator training pipeline
    expects, plus the cosmology parameter matrix
    :func:`halocat.cosmology.theta_keys` -ordered.

    Parameters
    ----------
    gravity : {"LCDM", "fRn1"}
        The θ schema is gravity-conditional — LCDM has 4 parameters,
        fRn1 has 5 — so this function always returns one gravity per
        call.
    redshift : float
    imodels : sequence of int
    iboxes : sequence of int
    thresholds, r_edges, mass_definition : see :class:`XiABLoader`.
    avg_iboxes : bool, default True
        If True, average ξ_AB and ``n_h_cum`` across iboxes (and report
        the across-ibox variance separately). If False, the leading
        axis stacks every ``(imodel, ibox)`` pair as an independent
        sample.
    materialise_snapshot : bool, default False
        If True, write the assembled dict to a single suite-level HDF5
        file at ``snapshot_path`` (or under ``OUTPUT_ROOT/snapshots/``
        if ``snapshot_path`` is None). Subsequent calls with a matching
        suite-config hash skip per-realisation IO entirely.
    snapshot_path : str, optional
    in_process_cache : bool, default True
        Memoise the assembled dict in module memory keyed by
        ``(gravity, redshift, imodels, iboxes, suite_hash)``. Cached
        results are validated against ``max(mtime)`` of the
        per-realisation files; a re-measurement invalidates the entry.
    overwrite : bool, default False
        Forwarded to :class:`XiABLoader`.
    write_halo_hdf5 : bool, default False
        Forwarded to :class:`XiABLoader`.
    logger : logging.Logger, optional

    Returns
    -------
    out : dict
        Keys (with shapes for ``avg_iboxes=True``):

        - ``'xi_AB'``       : ``(N_cosmo, N_r, N_t, N_t)``
        - ``'xi_AB_var'``   : ``(N_cosmo, N_r, N_t, N_t)`` — across-ibox
          variance; NaN if ``len(iboxes) <= 1``
        - ``'theta'``       : ``(N_cosmo, n_params)``
        - ``'theta_keys'``  : tuple of str
        - ``'n_h_cum'``     : ``(N_cosmo, N_t)`` halo counts (mean over
          iboxes)
        - ``'r'``           : ``(N_r,)``
        - ``'r_edges'``     : ``(N_r+1,)``
        - ``'thresholds'``  : ``(N_t,)``
        - ``'imodels'``     : ``(N_cosmo,)``
        - ``'iboxes'``      : ``(N_iboxes,)``
        - ``'gravity'``     : str
        - ``'redshift'``    : float
        - ``'cache_key'``   : str  (suite-config hash)
        - ``'present'``     : ``(N_cosmo, N_iboxes)`` bool

        For ``avg_iboxes=False`` the leading axis instead has length
        ``N_cosmo * N_iboxes`` (with ``imodels``, ``iboxes`` per-row),
        and ``xi_AB_var`` is omitted.
    """
    log = logger or _log
    imodels = list(imodels)
    iboxes = list(iboxes)
    loader = XiABLoader(
        thresholds=thresholds, r_edges=r_edges,
        mass_definition=mass_definition,
        overwrite=overwrite, write_halo_hdf5=write_halo_hdf5,
        logger=log,
    )

    suite_key = _suite_cache_key(
        gravity=gravity, redshift=redshift,
        imodels=imodels, iboxes=iboxes,
        thresholds=loader.thresholds, r_edges=loader.r_edges,
        mass_definition=loader.mass_definition,
        avg_iboxes=avg_iboxes,
    )

    # Snapshot fast path
    if materialise_snapshot and snapshot_path is None:
        snapshot_path = _default_snapshot_path(
            gravity, redshift, suite_key,
        )
    if (materialise_snapshot
            and snapshot_path is not None
            and os.path.exists(snapshot_path)
            and not overwrite):
        try:
            cached = _read_snapshot(snapshot_path)
            if cached.get("cache_key", "") == suite_key:
                log.info("assemble: snapshot hit %s", snapshot_path)
                return cached
        except (OSError, ValueError) as exc:
            log.warning("assemble: snapshot read failed (%s); rebuilding", exc)

    # Per-realisation paths (used for in-process-cache invalidation)
    real_paths = [loader.path(gravity, redshift, m, b)
                  for m in imodels for b in iboxes]

    if in_process_cache and suite_key in _ASSEMBLY_CACHE:
        entry = _ASSEMBLY_CACHE[suite_key]
        if _max_mtime(real_paths) <= entry["mtime"]:
            return entry["result"]

    # Stack via get_grid then drop the gravity / redshift axes.
    grid = loader.get_grid(
        gravities=[gravity], redshifts=[redshift],
        imodels=imodels, iboxes=iboxes, skip_missing=False,
    )
    # grid['xi_AB']     : (1, 1, M, B, N_r, N_t, N_t)
    # grid['n_h_cum']   : (1, 1, M, B, N_t)
    # grid['present']   : (1, 1, M, B)
    xi_grid = grid["xi_AB"][0, 0]              # (M, B, N_r, N_t, N_t)
    n_h_grid = grid["n_h_cum"][0, 0]            # (M, B, N_t)
    present = grid["present"][0, 0]             # (M, B)

    keys = theta_keys(gravity)
    theta_rows = np.array(
        [[get_cosmology(gravity, m)[k] for k in keys] for m in imodels],
        dtype=np.float64,
    )

    result: dict = {
        "gravity": gravity,
        "redshift": float(redshift),
        "imodels": np.array(imodels),
        "iboxes": np.array(iboxes),
        "thresholds": loader.thresholds,
        "r_edges": loader.r_edges,
        "r": grid["r"],
        "theta_keys": keys,
        "cache_key": suite_key,
        "present": present,
    }

    if avg_iboxes:
        # All-NaN slices at high-mass thresholds (n_h(>=t) hits 0 for
        # every ibox in the cosmology) trigger nanmean / nanvar
        # warnings. They produce NaN in the output, which is the
        # correct signal — silence the noise.
        import warnings
        with np.errstate(invalid="ignore"), warnings.catch_warnings():
            warnings.filterwarnings("ignore",
                                     message="Mean of empty slice",
                                     category=RuntimeWarning)
            warnings.filterwarnings("ignore",
                                     message="Degrees of freedom <= 0",
                                     category=RuntimeWarning)
            xi_mean = np.nanmean(xi_grid, axis=1)        # (M, N_r, N_t, N_t)
            if len(iboxes) >= 2:
                xi_var = np.nanvar(xi_grid, axis=1, ddof=1)
            else:
                xi_var = np.full_like(xi_mean, np.nan)
            n_h_mean = (np.nanmean(n_h_grid.astype(np.float64), axis=1)
                        if n_h_grid.size else np.full(
                            (len(imodels), loader.thresholds.size), np.nan))
        result["xi_AB"] = xi_mean
        result["xi_AB_var"] = xi_var
        result["n_h_cum"] = n_h_mean
        result["theta"] = theta_rows
    else:
        # Stack as flat samples; one row per (imodel, ibox)
        N_r = grid["r"].size
        N_t = loader.thresholds.size
        flat_xi = xi_grid.reshape(-1, N_r, N_t, N_t)
        flat_nh = n_h_grid.reshape(-1, N_t).astype(np.float64)
        flat_imodels = np.repeat(np.array(imodels), len(iboxes))
        flat_iboxes = np.tile(np.array(iboxes), len(imodels))
        flat_theta = np.repeat(theta_rows, len(iboxes), axis=0)
        result["xi_AB"] = flat_xi
        result["n_h_cum"] = flat_nh
        result["theta"] = flat_theta
        result["imodels_flat"] = flat_imodels
        result["iboxes_flat"] = flat_iboxes

    if in_process_cache:
        _ASSEMBLY_CACHE[suite_key] = {
            "result": result,
            "mtime": _max_mtime(real_paths),
        }

    if materialise_snapshot and snapshot_path is not None:
        _write_snapshot(snapshot_path, result)

    return result