def run_single(gravity: str, redshift: float, imodel: int, ibox: int,
overwrite: bool = False, logger=None,
do_halo: bool = True, do_hmf: bool = True,
do_tpcf: bool = True, do_vel: bool = True) -> dict:
"""Process a single simulation realisation. Returns a status dict.
Halo data are always loaded from the reformatted ``halo.hdf5`` — if it
is missing, the source ``CatshortV.*.DAT`` is reformatted on demand.
The ``.DAT`` file is read-only; ``overwrite=True`` only triggers
re-measurement of the downstream statistics, not re-reformatting.
"""
log = logger or get_logger()
snapnum = C.snapnum_for_redshift(redshift, imodel)
out_dir = ensure_dir(C.get_output_dir(gravity, redshift, imodel, ibox))
halo_path = os.path.join(out_dir, "halo.hdf5")
hmf_path = os.path.join(out_dir, "hmf.hdf5")
xi_path = os.path.join(out_dir, "xi_hh.hdf5")
vel_path = os.path.join(out_dir, "velocity_moments.hdf5")
status = {
"gravity": gravity, "redshift": redshift,
"imodel": imodel, "ibox": ibox, "snapnum": snapnum,
"ok": False, "skipped": [], "error": None,
}
label = f"{gravity} z={redshift:.2f} m{imodel} b{ibox}"
common_attrs = dict(
gravity=gravity, redshift=float(redshift),
imodel=int(imodel), ibox=int(ibox),
snapnum=int(snapnum), box_size=float(C.BOX_SIZE),
)
try:
# Decide whether any stage needs the halo data at all.
need_halo_data = (
do_halo
or (do_hmf and _need(hmf_path, overwrite))
or (do_tpcf and _need(xi_path, overwrite))
or (do_vel and _need(vel_path, overwrite))
)
if not need_halo_data:
log.info(f"{label}: all outputs exist, skipping")
status["ok"] = True
status["skipped"] = ["halo", "hmf", "xi_hh", "velocity_moments"]
return status
# Always read halo data from the reformatted halo.hdf5. If it is
# missing (or do_halo was requested with overwrite), reformat the
# source .DAT first.
_ensure_halo_hdf5(
gravity, redshift, imodel, ibox,
halo_path=halo_path, snapnum=snapnum, label=label, log=log,
rewrite=(do_halo and overwrite),
)
with timer(f"{label} read halo.hdf5", log):
data = read_halo_hdf5(halo_path)
log.info(f"{label}: {len(data[C.MASS_COLUMN])} haloes")
if not do_halo:
status["skipped"].append("halo")
if do_hmf and _need(hmf_path, overwrite):
with timer(f"{label} HMF", log):
hmf = measure_hmf(
data[C.MASS_COLUMN], C.BOX_SIZE,
C.LOG10M_MIN, C.LOG10M_MAX, C.NBIN_HMF,
)
write_hmf(hmf, hmf_path, attrs=common_attrs)
else:
status["skipped"].append("hmf")
if do_tpcf and _need(xi_path, overwrite):
with timer(f"{label} xi_hh", log):
xi = measure_xi_hh(
data, C.BOX_SIZE, C.MASS_BINS, C.R_EDGES,
mass_key=C.MASS_COLUMN,
)
write_xi_hh(xi, xi_path, attrs=common_attrs)
else:
status["skipped"].append("xi_hh")
if do_vel and _need(vel_path, overwrite):
with timer(f"{label} velocity moments", log):
vel = measure_velocity_moments(
data, C.BOX_SIZE, C.VEL_MASS_BINS, C.VEL_R_EDGES,
mass_key=C.MASS_COLUMN,
)
write_velocity_moments(vel, vel_path, attrs=common_attrs)
else:
status["skipped"].append("velocity_moments")
status["ok"] = True
log.info(f"{label}: done")
except Exception as e:
status["error"] = f"{type(e).__name__}: {e}"
log.error(f"{label}: FAILED — {status['error']}")
log.debug(traceback.format_exc())
return status