Skip to content

halocat.pipeline

pipeline

Orchestration: run a single (gravity, z, imodel, ibox) or the full grid.

run_single

run_single(gravity: str, redshift: float, imodel: int, ibox: int, overwrite: bool = False, logger=None, do_halo: bool = True, do_hmf: bool = True, do_tpcf: bool = True, do_vel: bool = True) -> dict

Process a single simulation realisation. Returns a status dict.

Halo data are always loaded from the reformatted halo.hdf5 — if it is missing, the source CatshortV.*.DAT is reformatted on demand. The .DAT file is read-only; overwrite=True only triggers re-measurement of the downstream statistics, not re-reformatting.

Source code in halocat/pipeline.py
def run_single(gravity: str, redshift: float, imodel: int, ibox: int,
               overwrite: bool = False, logger=None,
               do_halo: bool = True, do_hmf: bool = True,
               do_tpcf: bool = True, do_vel: bool = True) -> dict:
    """Process a single simulation realisation. Returns a status dict.

    Halo data are always loaded from the reformatted ``halo.hdf5`` — if it
    is missing, the source ``CatshortV.*.DAT`` is reformatted on demand.
    The ``.DAT`` file is read-only; ``overwrite=True`` only triggers
    re-measurement of the downstream statistics, not re-reformatting.
    """
    log = logger or get_logger()
    snapnum = C.snapnum_for_redshift(redshift, imodel)
    out_dir = ensure_dir(C.get_output_dir(gravity, redshift, imodel, ibox))

    halo_path = os.path.join(out_dir, "halo.hdf5")
    hmf_path = os.path.join(out_dir, "hmf.hdf5")
    xi_path = os.path.join(out_dir, "xi_hh.hdf5")
    vel_path = os.path.join(out_dir, "velocity_moments.hdf5")

    status = {
        "gravity": gravity, "redshift": redshift,
        "imodel": imodel, "ibox": ibox, "snapnum": snapnum,
        "ok": False, "skipped": [], "error": None,
    }

    label = f"{gravity} z={redshift:.2f} m{imodel} b{ibox}"

    common_attrs = dict(
        gravity=gravity, redshift=float(redshift),
        imodel=int(imodel), ibox=int(ibox),
        snapnum=int(snapnum), box_size=float(C.BOX_SIZE),
    )

    try:
        # Decide whether any stage needs the halo data at all.
        need_halo_data = (
            do_halo
            or (do_hmf and _need(hmf_path, overwrite))
            or (do_tpcf and _need(xi_path, overwrite))
            or (do_vel and _need(vel_path, overwrite))
        )
        if not need_halo_data:
            log.info(f"{label}: all outputs exist, skipping")
            status["ok"] = True
            status["skipped"] = ["halo", "hmf", "xi_hh", "velocity_moments"]
            return status

        # Always read halo data from the reformatted halo.hdf5. If it is
        # missing (or do_halo was requested with overwrite), reformat the
        # source .DAT first.
        _ensure_halo_hdf5(
            gravity, redshift, imodel, ibox,
            halo_path=halo_path, snapnum=snapnum, label=label, log=log,
            rewrite=(do_halo and overwrite),
        )
        with timer(f"{label} read halo.hdf5", log):
            data = read_halo_hdf5(halo_path)
        log.info(f"{label}: {len(data[C.MASS_COLUMN])} haloes")

        if not do_halo:
            status["skipped"].append("halo")

        if do_hmf and _need(hmf_path, overwrite):
            with timer(f"{label} HMF", log):
                hmf = measure_hmf(
                    data[C.MASS_COLUMN], C.BOX_SIZE,
                    C.LOG10M_MIN, C.LOG10M_MAX, C.NBIN_HMF,
                )
                write_hmf(hmf, hmf_path, attrs=common_attrs)
        else:
            status["skipped"].append("hmf")

        if do_tpcf and _need(xi_path, overwrite):
            with timer(f"{label} xi_hh", log):
                xi = measure_xi_hh(
                    data, C.BOX_SIZE, C.MASS_BINS, C.R_EDGES,
                    mass_key=C.MASS_COLUMN,
                )
                write_xi_hh(xi, xi_path, attrs=common_attrs)
        else:
            status["skipped"].append("xi_hh")

        if do_vel and _need(vel_path, overwrite):
            with timer(f"{label} velocity moments", log):
                vel = measure_velocity_moments(
                    data, C.BOX_SIZE, C.VEL_MASS_BINS, C.VEL_R_EDGES,
                    mass_key=C.MASS_COLUMN,
                )
                write_velocity_moments(vel, vel_path, attrs=common_attrs)
        else:
            status["skipped"].append("velocity_moments")

        status["ok"] = True
        log.info(f"{label}: done")
    except Exception as e:
        status["error"] = f"{type(e).__name__}: {e}"
        log.error(f"{label}: FAILED — {status['error']}")
        log.debug(traceback.format_exc())

    return status

run_all

run_all(gravities=None, redshifts=None, imodels=None, iboxes=None, overwrite: bool = False, logger=None, **kwargs) -> list[dict]

Loop sequentially over the parameter grid.

Source code in halocat/pipeline.py
def run_all(gravities=None, redshifts=None, imodels=None, iboxes=None,
            overwrite: bool = False, logger=None, **kwargs) -> list[dict]:
    """Loop sequentially over the parameter grid."""
    log = logger or get_logger()
    gravities = gravities or C.GRAVITIES
    redshifts = redshifts or C.REDSHIFTS
    imodels = imodels or C.IMODELS
    iboxes = iboxes or C.IBOXES

    results: list[dict] = []
    for gravity, redshift, imodel, ibox in product(
            gravities, redshifts, imodels, iboxes):
        results.append(run_single(
            gravity, redshift, imodel, ibox,
            overwrite=overwrite, logger=log, **kwargs,
        ))

    n_ok = sum(1 for r in results if r["ok"])
    n_fail = len(results) - n_ok
    log.info(f"run_all: {n_ok}/{len(results)} ok, {n_fail} failed")
    if n_fail:
        for r in results:
            if not r["ok"]:
                log.warning(
                    f"  FAIL {r['gravity']} z={r['redshift']:.2f} "
                    f"m{r['imodel']} b{r['ibox']}: {r['error']}"
                )
    return results