Skip to content

Physics

Astrophysical content only — concrete models inherit the ABCs in liulu.physics.base. See Architecture.

Base classes

Real-space correlation

liulu.physics.base.RealSpaceCorrelation

Bases: ABC

Real-space two-point correlation function xi(r).

Source code in liulu/physics/base.py
class RealSpaceCorrelation(ABC):
    """Real-space two-point correlation function xi(r)."""

    @abstractmethod
    def __call__(self, r: ndarray) -> ndarray:
        ...

Pairwise velocity moments

liulu.physics.base.PairwiseVelocityMoments

Bases: ABC

Radial-tangential pairwise velocity moments as functions of r.

Naming convention
  • m_ij(r): moment about the origin, i-th radial order, j-th tangential order.
  • c_ij(r): central moment, i-th radial order, j-th tangential order.

Odd tangential orders vanish by isotropy and are handled automatically.

Required for Gaussian PDF: m_10, c_20, c_02 Required for skew-t PDF: additionally c_30, c_12, c_40, c_04, c_22

Source code in liulu/physics/base.py
class PairwiseVelocityMoments(ABC):
    """Radial-tangential pairwise velocity moments as functions of r.

    Naming convention
    -----------------
    - ``m_ij(r)``: moment about the origin, i-th radial order, j-th tangential order.
    - ``c_ij(r)``: central moment, i-th radial order, j-th tangential order.

    Odd tangential orders vanish by isotropy and are handled automatically.

    Required for Gaussian PDF: m_10, c_20, c_02
    Required for skew-t PDF:   additionally c_30, c_12, c_40, c_04, c_22
    """

    # ── required (Gaussian-level) ─────────────────────────────────────

    @abstractmethod
    def m_10(self, r: ndarray) -> ndarray:
        """Mean radial pairwise velocity (km/s). Negative = infall."""
        ...

    @abstractmethod
    def c_20(self, r: ndarray) -> ndarray:
        """Radial velocity variance sigma_r^2 (km/s)^2."""
        ...

    @abstractmethod
    def c_02(self, r: ndarray) -> ndarray:
        """Tangential velocity variance per component sigma_t^2 (km/s)^2."""
        ...

    # ── optional (skew-t level, defaults assume Gaussian + independent) ──

    def c_30(self, r: ndarray) -> ndarray:
        """Third radial central moment <(v_r - <v_r>)^3>."""
        return np.zeros_like(np.atleast_1d(np.asarray(r, dtype=np.float64)))

    def c_12(self, r: ndarray) -> ndarray:
        """Cross central moment <(v_r - <v_r>) v_t^2>."""
        return np.zeros_like(np.atleast_1d(np.asarray(r, dtype=np.float64)))

    def c_40(self, r: ndarray) -> ndarray:
        """Fourth radial central moment."""
        return 3.0 * self.c_20(r) ** 2

    def c_04(self, r: ndarray) -> ndarray:
        """Fourth tangential central moment."""
        return 3.0 * self.c_02(r) ** 2

    def c_22(self, r: ndarray) -> ndarray:
        """Cross central moment <(v_r - <v_r>)^2 v_t^2>."""
        return self.c_20(r) * self.c_02(r)

    # ── dispatcher ────────────────────────────────────────────────────

    def get_moment(self, r: ndarray, r_order: int, t_order: int,
                   mode: str) -> ndarray:
        """Retrieve a moment by radial/tangential order.

        Parameters
        ----------
        r : ndarray
            Pair separation.
        r_order, t_order : int
            Radial and tangential orders.
        mode : str
            ``'m'`` for moments about the origin, ``'c'`` for central moments.
        """
        r = np.atleast_1d(np.asarray(r, dtype=np.float64))
        if t_order % 2 != 0:
            return np.zeros_like(r)
        if r_order == 0 and t_order == 0:
            return np.ones_like(r)
        if mode == "c" and r_order + t_order == 1:
            return np.zeros_like(r)
        return getattr(self, f"{mode}_{r_order}{t_order}")(r)

m_10 abstractmethod

m_10(r: ndarray) -> ndarray

Mean radial pairwise velocity (km/s). Negative = infall.

Source code in liulu/physics/base.py
@abstractmethod
def m_10(self, r: ndarray) -> ndarray:
    """Mean radial pairwise velocity (km/s). Negative = infall."""
    ...

c_20 abstractmethod

c_20(r: ndarray) -> ndarray

Radial velocity variance sigma_r^2 (km/s)^2.

Source code in liulu/physics/base.py
@abstractmethod
def c_20(self, r: ndarray) -> ndarray:
    """Radial velocity variance sigma_r^2 (km/s)^2."""
    ...

c_02 abstractmethod

c_02(r: ndarray) -> ndarray

Tangential velocity variance per component sigma_t^2 (km/s)^2.

Source code in liulu/physics/base.py
@abstractmethod
def c_02(self, r: ndarray) -> ndarray:
    """Tangential velocity variance per component sigma_t^2 (km/s)^2."""
    ...

c_30

c_30(r: ndarray) -> ndarray

Third radial central moment <(v_r - )^3>.

Source code in liulu/physics/base.py
def c_30(self, r: ndarray) -> ndarray:
    """Third radial central moment <(v_r - <v_r>)^3>."""
    return np.zeros_like(np.atleast_1d(np.asarray(r, dtype=np.float64)))

c_12

c_12(r: ndarray) -> ndarray

Cross central moment <(v_r - ) v_t^2>.

Source code in liulu/physics/base.py
def c_12(self, r: ndarray) -> ndarray:
    """Cross central moment <(v_r - <v_r>) v_t^2>."""
    return np.zeros_like(np.atleast_1d(np.asarray(r, dtype=np.float64)))

c_40

c_40(r: ndarray) -> ndarray

Fourth radial central moment.

Source code in liulu/physics/base.py
def c_40(self, r: ndarray) -> ndarray:
    """Fourth radial central moment."""
    return 3.0 * self.c_20(r) ** 2

c_04

c_04(r: ndarray) -> ndarray

Fourth tangential central moment.

Source code in liulu/physics/base.py
def c_04(self, r: ndarray) -> ndarray:
    """Fourth tangential central moment."""
    return 3.0 * self.c_02(r) ** 2

c_22

c_22(r: ndarray) -> ndarray

Cross central moment <(v_r - )^2 v_t^2>.

Source code in liulu/physics/base.py
def c_22(self, r: ndarray) -> ndarray:
    """Cross central moment <(v_r - <v_r>)^2 v_t^2>."""
    return self.c_20(r) * self.c_02(r)

get_moment

get_moment(r: ndarray, r_order: int, t_order: int, mode: str) -> ndarray

Retrieve a moment by radial/tangential order.

Parameters:

Name Type Description Default
r ndarray

Pair separation.

required
r_order int

Radial and tangential orders.

required
t_order int

Radial and tangential orders.

required
mode str

'm' for moments about the origin, 'c' for central moments.

required
Source code in liulu/physics/base.py
def get_moment(self, r: ndarray, r_order: int, t_order: int,
               mode: str) -> ndarray:
    """Retrieve a moment by radial/tangential order.

    Parameters
    ----------
    r : ndarray
        Pair separation.
    r_order, t_order : int
        Radial and tangential orders.
    mode : str
        ``'m'`` for moments about the origin, ``'c'`` for central moments.
    """
    r = np.atleast_1d(np.asarray(r, dtype=np.float64))
    if t_order % 2 != 0:
        return np.zeros_like(r)
    if r_order == 0 and t_order == 0:
        return np.ones_like(r)
    if mode == "c" and r_order + t_order == 1:
        return np.zeros_like(r)
    return getattr(self, f"{mode}_{r_order}{t_order}")(r)

Velocity PDF

liulu.physics.base.VelocityPDF

Bases: ABC

Line-of-sight pairwise velocity PDF P(v_los | r_perp, r_par).

Source code in liulu/physics/base.py
class VelocityPDF(ABC):
    """Line-of-sight pairwise velocity PDF P(v_los | r_perp, r_par)."""

    @abstractmethod
    def __call__(self, v_los: ndarray, r_perp: ndarray,
                 r_par: ndarray) -> ndarray:
        """Evaluate the PDF.

        Parameters
        ----------
        v_los : ndarray
            Line-of-sight pairwise velocity (km/s).
        r_perp : ndarray
            Transverse pair separation (Mpc/h).
        r_par : ndarray
            Parallel pair separation (Mpc/h), non-negative.

        Returns
        -------
        ndarray
            PDF values, (km/s)^{-1}.
        """
        ...

__call__ abstractmethod

__call__(v_los: ndarray, r_perp: ndarray, r_par: ndarray) -> ndarray

Evaluate the PDF.

Parameters:

Name Type Description Default
v_los ndarray

Line-of-sight pairwise velocity (km/s).

required
r_perp ndarray

Transverse pair separation (Mpc/h).

required
r_par ndarray

Parallel pair separation (Mpc/h), non-negative.

required

Returns:

Type Description
ndarray

PDF values, (km/s)^{-1}.

Source code in liulu/physics/base.py
@abstractmethod
def __call__(self, v_los: ndarray, r_perp: ndarray,
             r_par: ndarray) -> ndarray:
    """Evaluate the PDF.

    Parameters
    ----------
    v_los : ndarray
        Line-of-sight pairwise velocity (km/s).
    r_perp : ndarray
        Transverse pair separation (Mpc/h).
    r_par : ndarray
        Parallel pair separation (Mpc/h), non-negative.

    Returns
    -------
    ndarray
        PDF values, (km/s)^{-1}.
    """
    ...

Real-space correlation providers

TabulatedXi

liulu.physics.real_space.TabulatedXi

Bases: RealSpaceCorrelation

Interpolate xi(r) from tabulated data.

Parameters:

Name Type Description Default
r_table array_like

Separation values, Mpc/h.

required
xi_table array_like

Correlation function values.

required
interpolator callable

Interpolation factory with signature (r, xi) -> callable. Defaults to log-linear scipy interpolation.

None
Source code in liulu/physics/real_space.py
class TabulatedXi(RealSpaceCorrelation):
    """Interpolate xi(r) from tabulated data.

    Parameters
    ----------
    r_table : array_like
        Separation values, Mpc/h.
    xi_table : array_like
        Correlation function values.
    interpolator : callable, optional
        Interpolation factory with signature (r, xi) -> callable.
        Defaults to log-linear scipy interpolation.
    """

    def __init__(self, r_table, xi_table, interpolator=None):
        self._r = np.asarray(r_table, dtype=np.float64)
        self._xi = np.asarray(xi_table, dtype=np.float64)

        if interpolator is not None:
            self._interp = interpolator(self._r, self._xi)
        else:
            from ..numerics.interpolation import loglog_interp
            # Power-law (log-log) interpolation: xi(r) is steeply falling and
            # ~power-law, so log-log is near-exact and avoids the convex
            # overestimate of log-linear (which biased the model monopole ~1.4%
            # high). Constant (edge) extrapolation below r_min keeps clustering
            # on small scales; xi -> 0 above r_max. log-log falls back to
            # log-linear where xi is non-positive.
            self._interp = loglog_interp(
                self._r, self._xi, fill_value=("edge", 0.0),
                name="TabulatedXi xi(r)")

    def __call__(self, r):
        return self._interp(r)

Linear-theory / Halofit

liulu.physics.real_space.LinearTheoryXi

Bases: RealSpaceCorrelation

xi(r) from linear theory power spectrum via Hankel transform.

Parameters:

Name Type Description Default
k array_like

Wavenumbers, h/Mpc.

required
pk array_like

Linear power spectrum P(k), (Mpc/h)^3.

required
hankel_transform callable

Transform function with signature (k, pk, r) -> xi. Defaults to numerics.hankel.pk_to_xi.

None
Source code in liulu/physics/real_space.py
class LinearTheoryXi(RealSpaceCorrelation):
    """xi(r) from linear theory power spectrum via Hankel transform.

    Parameters
    ----------
    k : array_like
        Wavenumbers, h/Mpc.
    pk : array_like
        Linear power spectrum P(k), (Mpc/h)^3.
    hankel_transform : callable, optional
        Transform function with signature (k, pk, r) -> xi.
        Defaults to numerics.hankel.pk_to_xi.
    """

    def __init__(self, k, pk, hankel_transform=None):
        self._k = np.asarray(k, dtype=np.float64)
        self._pk = np.asarray(pk, dtype=np.float64)

        if hankel_transform is not None:
            self._transform = hankel_transform
        else:
            from ..numerics.hankel import pk_to_xi
            self._transform = pk_to_xi

    def __call__(self, r):
        return self._transform(self._k, self._pk, r)

liulu.physics.real_space.HalofitXi

Bases: RealSpaceCorrelation

xi(r) from Halofit/HMCode nonlinear P(k) via Hankel transform.

Same interface as LinearTheoryXi but expects nonlinear P(k).

Source code in liulu/physics/real_space.py
class HalofitXi(RealSpaceCorrelation):
    """xi(r) from Halofit/HMCode nonlinear P(k) via Hankel transform.

    Same interface as LinearTheoryXi but expects nonlinear P(k).
    """

    def __init__(self, k, pk_nl, hankel_transform=None):
        self._k = np.asarray(k, dtype=np.float64)
        self._pk = np.asarray(pk_nl, dtype=np.float64)

        if hankel_transform is not None:
            self._transform = hankel_transform
        else:
            from ..numerics.hankel import pk_to_xi
            self._transform = pk_to_xi

    def __call__(self, r):
        return self._transform(self._k, self._pk, r)

Pairwise velocity moments

TabulatedMoments

liulu.physics.pairwise_velocity.TabulatedMoments

Bases: PairwiseVelocityMoments

Interpolate pairwise velocity moments from tabulated data.

Parameters:

Name Type Description Default
r_table array_like

Separation values, Mpc/h.

required
v_r array_like

Mean radial pairwise velocity, km/s (negative = infall).

required
sigma_r array_like

Radial pairwise velocity dispersion, km/s.

required
sigma_t array_like

Tangential pairwise velocity dispersion (per component), km/s.

required
gamma_r array_like

Radial standardised skewness. Defaults to zero.

None
kappa_r array_like

Radial excess kurtosis. Defaults to zero (Gaussian).

None
kappa_t array_like

Tangential excess kurtosis. Defaults to zero (Gaussian).

None
c_12_table array_like

Cross central moment <(v_r - ) v_t^2>. Defaults to zero.

None
c_22_table array_like

Cross central moment <(v_r - )^2 v_t^2>. If None, defaults to sigma_r^2 * sigma_t^2 (independence assumption).

None
interpolator callable

Interpolation factory with signature (r, y) -> callable.

None
Source code in liulu/physics/pairwise_velocity.py
class TabulatedMoments(PairwiseVelocityMoments):
    """Interpolate pairwise velocity moments from tabulated data.

    Parameters
    ----------
    r_table : array_like
        Separation values, Mpc/h.
    v_r : array_like
        Mean radial pairwise velocity, km/s (negative = infall).
    sigma_r : array_like
        Radial pairwise velocity dispersion, km/s.
    sigma_t : array_like
        Tangential pairwise velocity dispersion (per component), km/s.
    gamma_r : array_like, optional
        Radial standardised skewness. Defaults to zero.
    kappa_r : array_like, optional
        Radial excess kurtosis. Defaults to zero (Gaussian).
    kappa_t : array_like, optional
        Tangential excess kurtosis. Defaults to zero (Gaussian).
    c_12_table : array_like, optional
        Cross central moment <(v_r - <v_r>) v_t^2>. Defaults to zero.
    c_22_table : array_like, optional
        Cross central moment <(v_r - <v_r>)^2 v_t^2>. If None, defaults
        to sigma_r^2 * sigma_t^2 (independence assumption).
    interpolator : callable, optional
        Interpolation factory with signature (r, y) -> callable.
    """

    def __init__(self, r_table, v_r, sigma_r, sigma_t,
                 gamma_r=None, kappa_r=None, kappa_t=None,
                 c_12_table=None, c_22_table=None,
                 interpolator=None):
        from ..numerics.interpolation import log_linear_interp

        r = np.asarray(r_table, dtype=np.float64)
        if interpolator is not None:
            _interp = interpolator
        else:
            # Constant (edge) extrapolation: outside the table the dispersions
            # plateau to their boundary value rather than collapsing to 0
            # (which the PDF would read as "no pairs" and drop from the integral).
            def _interp(r_, y_):
                return log_linear_interp(r_, y_, fill_value="edge",
                                         name="TabulatedMoments")

        sigma_r = np.asarray(sigma_r, dtype=np.float64)
        sigma_t = np.asarray(sigma_t, dtype=np.float64)

        self._v_r = _interp(r, np.asarray(v_r, dtype=np.float64))
        self._sigma_r = _interp(r, sigma_r)
        self._sigma_t = _interp(r, sigma_t)

        if gamma_r is not None:
            _gamma_r = _interp(r, np.asarray(gamma_r, dtype=np.float64))
            self._c_30 = lambda r_: _gamma_r(r_) * self._sigma_r(r_) ** 3
        else:
            self._c_30 = None

        if kappa_r is not None:
            _kappa_r = _interp(r, np.asarray(kappa_r, dtype=np.float64))
            self._c_40 = lambda r_: (_kappa_r(r_) + 3.0) * self._sigma_r(r_) ** 4
        else:
            self._c_40 = None

        if kappa_t is not None:
            _kappa_t = _interp(r, np.asarray(kappa_t, dtype=np.float64))
            self._c_04 = lambda r_: (_kappa_t(r_) + 3.0) * self._sigma_t(r_) ** 4
        else:
            self._c_04 = None

        if c_12_table is not None:
            self._c_12 = _interp(r, np.asarray(c_12_table, dtype=np.float64))
        else:
            self._c_12 = None

        if c_22_table is not None:
            self._c_22 = _interp(r, np.asarray(c_22_table, dtype=np.float64))
        else:
            self._c_22 = None

    def m_10(self, r):
        return self._v_r(r)

    def c_20(self, r):
        return self._sigma_r(r) ** 2

    def c_02(self, r):
        return self._sigma_t(r) ** 2

    def c_30(self, r):
        if self._c_30 is not None:
            return self._c_30(r)
        return super().c_30(r)

    def c_12(self, r):
        if self._c_12 is not None:
            return self._c_12(r)
        return super().c_12(r)

    def c_40(self, r):
        if self._c_40 is not None:
            return self._c_40(r)
        return super().c_40(r)

    def c_04(self, r):
        if self._c_04 is not None:
            return self._c_04(r)
        return super().c_04(r)

    def c_22(self, r):
        if self._c_22 is not None:
            return self._c_22(r)
        return super().c_22(r)

Velocity PDFs

Gaussian

liulu.physics.velocity_pdf.GaussianPDF

Bases: VelocityPDF

Gaussian LOS pairwise velocity PDF.

Parameters:

Name Type Description Default
moments PairwiseVelocityMoments

Radial-tangential moment functions.

required
Source code in liulu/physics/velocity_pdf.py
class GaussianPDF(VelocityPDF):
    """Gaussian LOS pairwise velocity PDF.

    Parameters
    ----------
    moments : PairwiseVelocityMoments
        Radial-tangential moment functions.
    """

    def __init__(self, moments: PairwiseVelocityMoments):
        self._mean_los = project_to_los(moments, n=1, mode="m")
        self._c2_los = project_to_los(moments, n=2, mode="c")

    def __call__(self, v_los, r_perp, r_par):
        mean = self._mean_los(r_perp, r_par)
        var = self._c2_los(r_perp, r_par)
        safe_var = np.where(var > 0, var, 1.0)
        inv_sigma = 1.0 / np.sqrt(safe_var)
        z = (v_los - mean) * inv_sigma
        pdf = np.exp(-0.5 * z * z) * inv_sigma / np.sqrt(2.0 * np.pi)
        return np.where(var > 0, pdf, 0.0)

Skew-t

liulu.physics.velocity_pdf.SkewTPDF

Bases: VelocityPDF

Skew-t LOS pairwise velocity PDF (Azzalini & Capitanio 2003).

Constructed from PairwiseVelocityMoments. Projects radial-tangential moments to the four LOS moments (mean, variance, skewness, kurtosis), then converts to skew-t parameters.

Parameters:

Name Type Description Default
moments PairwiseVelocityMoments

Radial-tangential moment functions.

required
tabulate bool

If True (default), pre-compute (alpha, nu) on a (gamma1, gamma2) grid for fast vectorised evaluation. If False, use per-point fsolve (slow but exact).

True
table_kw dict

Keyword arguments forwarded to _SkewTParamTable (e.g. gamma1_range, gamma2_range, n_g1, n_g2).

{}
Source code in liulu/physics/velocity_pdf.py
class SkewTPDF(VelocityPDF):
    """Skew-t LOS pairwise velocity PDF (Azzalini & Capitanio 2003).

    Constructed from PairwiseVelocityMoments. Projects radial-tangential
    moments to the four LOS moments (mean, variance, skewness, kurtosis),
    then converts to skew-t parameters.

    Parameters
    ----------
    moments : PairwiseVelocityMoments
        Radial-tangential moment functions.
    tabulate : bool
        If True (default), pre-compute (alpha, nu) on a (gamma1, gamma2)
        grid for fast vectorised evaluation.  If False, use per-point
        ``fsolve`` (slow but exact).
    table_kw : dict
        Keyword arguments forwarded to ``_SkewTParamTable`` (e.g.
        ``gamma1_range``, ``gamma2_range``, ``n_g1``, ``n_g2``).
    """

    def __init__(self, moments: PairwiseVelocityMoments, *,
                 tabulate: bool = True, **table_kw):
        self._mean_los = project_to_los(moments, n=1, mode="m")
        self._c2_los = project_to_los(moments, n=2, mode="c")
        self._c3_los = project_to_los(moments, n=3, mode="c")
        self._c4_los = project_to_los(moments, n=4, mode="c")

        self._table = _SkewTParamTable(**table_kw) if tabulate else None

    def _los_moments(self, r_perp, r_par):
        """Compute the four LOS moments at (r_perp, r_par)."""
        mean = self._mean_los(r_perp, r_par)
        c2 = self._c2_los(r_perp, r_par)
        c3 = self._c3_los(r_perp, r_par)
        c4 = self._c4_los(r_perp, r_par)

        std = np.sqrt(np.maximum(c2, 0.0))
        safe_c2 = np.where(c2 > 0, c2, 1.0)
        gamma1 = np.where(c2 > 0, c3 / safe_c2 ** 1.5, 0.0)
        gamma2 = np.where(c2 > 0, c4 / safe_c2 ** 2 - 3.0, 0.0)
        return mean, std, gamma1, gamma2

    def __call__(self, v_los, r_perp, r_par):
        mean, std, gamma1, gamma2 = self._los_moments(r_perp, r_par)
        if self._table is not None:
            return self._eval_tabulated(v_los, mean, std, gamma1, gamma2)
        return self._eval_direct(v_los, mean, std, gamma1, gamma2)

    # ── fast path: tabulated ──────────────────────────────────────────

    def _eval_tabulated(self, v_los, mean, std, gamma1, gamma2):
        """Fully vectorised evaluation via pre-tabulated skew-t params."""
        v_los = np.asarray(v_los, dtype=np.float64)
        mean = np.atleast_1d(np.asarray(mean, dtype=np.float64))
        std = np.atleast_1d(np.asarray(std, dtype=np.float64))
        gamma1 = np.atleast_1d(np.asarray(gamma1, dtype=np.float64))
        gamma2 = np.atleast_1d(np.asarray(gamma2, dtype=np.float64))

        out_shape = np.broadcast_shapes(v_los.shape, mean.shape)

        alpha, nu = self._table(gamma1, gamma2)

        with np.errstate(invalid='ignore', divide='ignore', over='ignore'):
            delta = alpha / np.sqrt(1.0 + alpha ** 2)
            b = (np.sqrt(nu / np.pi)
                 * special.gamma((nu - 1.0) / 2.0)
                 / special.gamma(nu / 2.0))
            w = std / np.sqrt(nu / (nu - 2.0) - delta ** 2 * b ** 2)
            v_c = mean - w * delta * b
            result = _skewt_pdf(v_los, w, v_c, alpha, nu)

        # Gaussian fallback where the table had no valid solution
        bad = ~np.isfinite(result)
        if np.any(bad):
            safe_std = np.where(std > 0, std, 1.0)
            inv_s = 1.0 / safe_std
            z = (v_los - mean) * inv_s
            gauss = np.exp(-0.5 * z * z) * inv_s / np.sqrt(2.0 * np.pi)
            result = np.where(bad, gauss, result)

        return np.where(np.broadcast_to(std, out_shape) > 0, result, 0.0)

    # ── slow path: direct fsolve per point ────────────────────────────

    def _eval_direct(self, v_los, mean, std, gamma1, gamma2):
        """Per-point evaluation using fsolve (exact, no interpolation)."""
        mean = np.atleast_1d(np.asarray(mean, dtype=np.float64))
        std = np.atleast_1d(np.asarray(std, dtype=np.float64))
        gamma1 = np.atleast_1d(np.asarray(gamma1, dtype=np.float64))
        gamma2 = np.atleast_1d(np.asarray(gamma2, dtype=np.float64))
        v_los = np.asarray(v_los, dtype=np.float64)

        if mean.size == 1:
            if std.ravel()[0] <= 0:
                return np.zeros_like(v_los)
            g1, g2 = gamma1.ravel()[0], gamma2.ravel()[0]
            if np.abs(g1) < 1e-8 and np.abs(g2) < 1e-4:
                inv_s = 1.0 / std.ravel()[0]
                z = (v_los - mean.ravel()[0]) * inv_s
                return np.exp(-0.5 * z * z) * inv_s / np.sqrt(2.0 * np.pi)
            w, v_c, alpha, nu = _moments_to_st_params(
                mean.ravel()[0], std.ravel()[0], g1, g2)
            return _skewt_pdf(v_los, w, v_c, alpha, nu)

        out_shape = np.broadcast_shapes(v_los.shape, mean.shape)
        v_flat = np.broadcast_to(v_los, out_shape).ravel()
        mean_flat = np.broadcast_to(mean, out_shape).ravel()
        std_flat = np.broadcast_to(std, out_shape).ravel()
        g1_flat = np.broadcast_to(gamma1, out_shape).ravel()
        g2_flat = np.broadcast_to(gamma2, out_shape).ravel()

        # Group by unique (mean, std, g1, g2) to amortise fsolve calls
        params = np.column_stack([mean_flat, std_flat, g1_flat, g2_flat])
        _, inverse, counts = np.unique(
            params, axis=0, return_inverse=True, return_counts=True,
        )
        result = np.empty_like(v_flat)
        for uid in range(len(counts)):
            mask = inverse == uid
            m, s, g1, g2 = params[mask][0]
            v_sub = v_flat[mask]
            if s <= 0:
                result[mask] = 0.0
            elif np.abs(g1) < 1e-8 and np.abs(g2) < 1e-4:
                inv_s = 1.0 / s
                z = (v_sub - m) * inv_s
                result[mask] = np.exp(-0.5 * z * z) * inv_s / np.sqrt(2.0 * np.pi)
            else:
                w, v_c, alpha, nu = _moments_to_st_params(m, s, g1, g2)
                result[mask] = _skewt_pdf(v_sub, w, v_c, alpha, nu)
        return result.reshape(out_shape)

Generalized hyperbolic

liulu.physics.velocity_pdf.GeneralizedHyperbolicPDF

Bases: VelocityPDF

Generalised hyperbolic LOS pairwise-velocity PDF (Kuruvilla & Porciani 2018). A normal variance-mean (Gaussian) mixture; strictly generalises the skew-t.

The four projected LOS moments (mean, variance, skewness, excess kurtosis) are matched to the GH parameters at class index lam:

  • lam == -0.5 (NIG) uses the analytic, fully vectorised inversion (fast; the recommended default -- see :class:NIGPDF);
  • any other scalar lam uses a vectorised damped-Newton shape solve;
  • lam may also be a callable lam(r_perp, r_par) returning a per-separation class index -- this realises the scale-dependent GHD of Kuruvilla & Porciani (2018), in which the fifth (shape) parameter drifts with pair separation. Calibrate such a lam(r) from a measured velocity PDF (e.g. via :func:calibrate_gh_lambda in scripts/calibrate_gh_lambda.py).

Points whose (skewness, excess kurtosis) fall outside the attainable region fall back to a Gaussian with the same first two moments.

Parameters:

Name Type Description Default
moments PairwiseVelocityMoments
required
lam float or callable

GH class index (default -0.5 = NIG), or lam(r_perp, r_par) -> ndarray for a scale-dependent shape.

-0.5
Source code in liulu/physics/velocity_pdf.py
class GeneralizedHyperbolicPDF(VelocityPDF):
    """Generalised hyperbolic LOS pairwise-velocity PDF (Kuruvilla & Porciani
    2018).  A normal variance-mean (Gaussian) mixture; strictly generalises the
    skew-t.

    The four projected LOS moments (mean, variance, skewness, excess kurtosis)
    are matched to the GH parameters at class index ``lam``:

    - ``lam == -0.5`` (NIG) uses the analytic, fully vectorised inversion
      (fast; the recommended default -- see :class:`NIGPDF`);
    - any other scalar ``lam`` uses a vectorised damped-Newton shape solve;
    - ``lam`` may also be a **callable** ``lam(r_perp, r_par)`` returning a
      per-separation class index -- this realises the *scale-dependent* GHD of
      Kuruvilla & Porciani (2018), in which the fifth (shape) parameter drifts
      with pair separation. Calibrate such a ``lam(r)`` from a measured velocity
      PDF (e.g. via :func:`calibrate_gh_lambda` in
      ``scripts/calibrate_gh_lambda.py``).

    Points whose (skewness, excess kurtosis) fall outside the attainable region
    fall back to a Gaussian with the same first two moments.

    Parameters
    ----------
    moments : PairwiseVelocityMoments
    lam : float or callable
        GH class index (default -0.5 = NIG), or ``lam(r_perp, r_par) -> ndarray``
        for a scale-dependent shape.
    """

    def __init__(self, moments: PairwiseVelocityMoments, *, lam=-0.5):
        self.lam = lam if callable(lam) else float(lam)
        self._mean_los = project_to_los(moments, n=1, mode="m")
        self._c2_los = project_to_los(moments, n=2, mode="c")
        self._c3_los = project_to_los(moments, n=3, mode="c")
        self._c4_los = project_to_los(moments, n=4, mode="c")

    def _los_moments(self, r_perp, r_par):
        mean = self._mean_los(r_perp, r_par)
        c2 = self._c2_los(r_perp, r_par)
        c3 = self._c3_los(r_perp, r_par)
        c4 = self._c4_los(r_perp, r_par)
        safe = np.where(c2 > 0, c2, 1.0)
        g1 = np.where(c2 > 0, c3 / safe ** 1.5, 0.0)
        g2 = np.where(c2 > 0, c4 / safe ** 2 - 3.0, 0.0)
        return mean, c2, g1, g2

    def __call__(self, v_los, r_perp, r_par):
        v_los = np.asarray(v_los, dtype=np.float64)
        mean, c2, g1, g2 = self._los_moments(r_perp, r_par)
        mean = np.atleast_1d(np.asarray(mean, dtype=np.float64))
        c2 = np.atleast_1d(np.asarray(c2, dtype=np.float64))
        g1 = np.atleast_1d(np.asarray(g1, dtype=np.float64))
        g2 = np.atleast_1d(np.asarray(g2, dtype=np.float64))
        std = np.sqrt(np.maximum(c2, 0.0))
        out_shape = np.broadcast_shapes(v_los.shape, mean.shape)

        if callable(self.lam):
            lam = np.broadcast_to(np.atleast_1d(np.asarray(self.lam(r_perp, r_par),
                                                           dtype=np.float64)), mean.shape)
        else:
            lam = self.lam

        if np.isscalar(lam) and abs(lam + 0.5) < 1e-12:
            alpha, beta, delta, mu, valid = _nig_params_from_moments(mean, c2, g1, g2)
        else:
            alpha, beta, delta, mu, valid = _gh_params_vectorized(lam, mean, c2, g1, g2)

        with np.errstate(invalid="ignore", divide="ignore",
                         over="ignore", under="ignore"):
            p = _gh_density(v_los, alpha, beta, delta, lam, mu)

        bad = (~valid) | ~np.isfinite(p) | (p < 0)
        if np.any(bad):
            safe_std = np.where(std > 0, std, 1.0)
            z = (v_los - mean) / safe_std
            gauss = np.exp(-0.5 * z * z) / (safe_std * np.sqrt(2.0 * np.pi))
            p = np.where(bad, gauss, p)
        return np.where(np.broadcast_to(std, out_shape) > 0, p, 0.0)

NIG

liulu.physics.velocity_pdf.NIGPDF

Bases: GeneralizedHyperbolicPDF

Normal-inverse-Gaussian LOS pairwise-velocity PDF (GH with lambda=-1/2).

Analytic, vectorised four-moment matching with Gaussian fallback; the recommended fast default among the generalised-hyperbolic family.

Source code in liulu/physics/velocity_pdf.py
class NIGPDF(GeneralizedHyperbolicPDF):
    """Normal-inverse-Gaussian LOS pairwise-velocity PDF (GH with lambda=-1/2).

    Analytic, vectorised four-moment matching with Gaussian fallback; the
    recommended fast default among the generalised-hyperbolic family.
    """

    def __init__(self, moments: PairwiseVelocityMoments):
        super().__init__(moments, lam=-0.5)

Tabulated GH

liulu.physics.velocity_pdf.TabulatedGHPDF

Bases: VelocityPDF

Generalised-hyperbolic LOS velocity PDF with scale-dependent shape, its parameters tabulated over pair separation (Kuruvilla & Porciani 2018).

The expensive GH shape solve is done once on a coarse (r_perp, r_par) grid (the GH invariant shape (zeta, rho) is smooth in separation) and interpolated thereafter; the first two moments (location, scale) are taken from the exact local projected moments, so only the higher-order shape is interpolated. The class index lam may vary with separation -- pass a callable lam(r_perp, r_par) (e.g. the calibrated lam(r) of :func:calibrate_gh_lambda) to realise the scale-dependent GHD.

Outside the tabulation grid, or where the shape solve fails, it falls back to the analytic NIG (and then Gaussian), so it is always well defined.

Parameters:

Name Type Description Default
moments PairwiseVelocityMoments
required
lam float or callable

Class index, scalar or lam(r_perp, r_par) -> ndarray.

-0.5
rp_max float

Extent of the tabulation grid in r_perp, r_par (Mpc/h).

60.0
rpar_max float

Extent of the tabulation grid in r_perp, r_par (Mpc/h).

60.0
n_rp int

Grid resolution.

48
n_rpar int

Grid resolution.

48
Source code in liulu/physics/velocity_pdf.py
class TabulatedGHPDF(VelocityPDF):
    """Generalised-hyperbolic LOS velocity PDF with **scale-dependent** shape,
    its parameters tabulated over pair separation (Kuruvilla & Porciani 2018).

    The expensive GH shape solve is done once on a coarse ``(r_perp, r_par)``
    grid (the GH invariant shape ``(zeta, rho)`` is smooth in separation) and
    interpolated thereafter; the first two moments (location, scale) are taken
    from the *exact* local projected moments, so only the higher-order shape is
    interpolated. The class index ``lam`` may vary with separation -- pass a
    callable ``lam(r_perp, r_par)`` (e.g. the calibrated ``lam(r)`` of
    :func:`calibrate_gh_lambda`) to realise the scale-dependent GHD.

    Outside the tabulation grid, or where the shape solve fails, it falls back
    to the analytic NIG (and then Gaussian), so it is always well defined.

    Parameters
    ----------
    moments : PairwiseVelocityMoments
    lam : float or callable
        Class index, scalar or ``lam(r_perp, r_par) -> ndarray``.
    rp_max, rpar_max : float
        Extent of the tabulation grid in r_perp, r_par (Mpc/h).
    n_rp, n_rpar : int
        Grid resolution.
    """

    def __init__(self, moments: PairwiseVelocityMoments, *, lam=-0.5,
                 rp_max=60.0, rpar_max=150.0, n_rp=48, n_rpar=80):
        self.lam = lam
        self._mean_los = project_to_los(moments, n=1, mode="m")
        self._c2_los = project_to_los(moments, n=2, mode="c")
        self._c3_los = project_to_los(moments, n=3, mode="c")
        self._c4_los = project_to_los(moments, n=4, mode="c")
        self._nig = NIGPDF(moments)

        from scipy.interpolate import RegularGridInterpolator
        rp_g = np.concatenate([[1e-3], np.geomspace(0.05, rp_max, n_rp - 1)])
        rpar_g = np.concatenate([[1e-3], np.geomspace(0.05, rpar_max, n_rpar - 1)])
        RP, RPAR = np.meshgrid(rp_g, rpar_g, indexing="ij")
        mean, c2, g1, g2 = self._los_moments(RP, RPAR)
        lam_g = (np.asarray(lam(RP, RPAR), float) if callable(lam)
                 else np.full(RP.shape, float(lam)))
        zeta = np.full(RP.shape, np.nan)
        rho = np.full(RP.shape, np.nan)
        gg1, gg2, ll = g1.ravel(), g2.ravel(), lam_g.ravel()
        c2r = c2.ravel()
        zf, rf = zeta.ravel(), rho.ravel()
        for i in range(gg1.size):
            if c2r[i] > 0 and gg2[i] > (5.0 / 3.0) * gg1[i] ** 2 + 1e-6:
                zf[i], rf[i] = _gh_shape_solve_scalar(ll[i], gg1[i], gg2[i])
        zeta = zf.reshape(RP.shape); rho = rf.reshape(RP.shape)
        valid = np.isfinite(zeta) & np.isfinite(rho)
        # interpolate the (smooth) shape; NaN -> 0 so fill marks invalid
        kw = dict(bounds_error=False, fill_value=np.nan)
        self._zeta_i = RegularGridInterpolator((rp_g, rpar_g), np.where(valid, zeta, np.nan), **kw)
        self._rho_i = RegularGridInterpolator((rp_g, rpar_g), np.where(valid, rho, np.nan), **kw)
        self._lam_i = RegularGridInterpolator((rp_g, rpar_g), lam_g, **kw)
        self._coverage = valid.mean()

    def _los_moments(self, r_perp, r_par):
        mean = self._mean_los(r_perp, r_par)
        c2 = self._c2_los(r_perp, r_par)
        c3 = self._c3_los(r_perp, r_par)
        c4 = self._c4_los(r_perp, r_par)
        safe = np.where(c2 > 0, c2, 1.0)
        g1 = np.where(c2 > 0, c3 / safe ** 1.5, 0.0)
        g2 = np.where(c2 > 0, c4 / safe ** 2 - 3.0, 0.0)
        return mean, c2, g1, g2

    def __call__(self, v_los, r_perp, r_par):
        v_los = np.asarray(v_los, dtype=np.float64)
        rp = np.atleast_1d(np.asarray(r_perp, dtype=np.float64))
        rpar = np.atleast_1d(np.asarray(r_par, dtype=np.float64))
        mean, c2, g1, g2 = self._los_moments(rp, rpar)
        mean, c2 = np.atleast_1d(mean), np.atleast_1d(c2)
        std = np.sqrt(np.maximum(c2, 0.0))
        out_shape = np.broadcast_shapes(v_los.shape, mean.shape)

        pts = np.stack([np.broadcast_arrays(rp, rpar)[0].ravel(),
                        np.broadcast_arrays(rp, rpar)[1].ravel()], axis=-1)
        zeta = self._zeta_i(pts).reshape(mean.shape)
        rho = self._rho_i(pts).reshape(mean.shape)
        lam = self._lam_i(pts).reshape(mean.shape)
        good = np.isfinite(zeta) & np.isfinite(rho) & (c2 > 0)

        with np.errstate(all="ignore"):
            _, _, var0 = _gh_shape_skew_kurt(lam, zeta, rho)
            b = np.sqrt(c2 / var0)
            alpha = 1.0 / b
            beta = rho / b
            delta = zeta / np.sqrt(1.0 - rho ** 2) * b
            gam = np.sqrt(1.0 - rho ** 2) / b
            R1 = special.kv(lam + 1, zeta) / special.kv(lam, zeta)
            mu = mean - beta * (delta / gam) * R1
            p = _gh_density(v_los, alpha, beta, delta, lam, mu)
        good = good & np.isfinite(p) & (p >= 0)

        # fallback to analytic NIG where the GH table is invalid/out of range
        if not np.all(good):
            p_nig = self._nig(v_los, rp, rpar)
            p = np.where(good, p, p_nig)
        return np.where(np.broadcast_to(std, out_shape) > 0, p, 0.0)

Scale-dependent

liulu.physics.velocity_pdf.scale_dependent_lambda

scale_dependent_lambda(rp_grid, rpar_grid, lam_grid, fill=-0.5)

Build a callable lam(r_perp, r_par) interpolating a calibrated lambda(separation) table, for :class:TabulatedGHPDF. Outside the grid it returns fill (default -0.5 = NIG), so the model degrades gracefully to NIG on scales where no calibration exists.

Source code in liulu/physics/velocity_pdf.py
def scale_dependent_lambda(rp_grid, rpar_grid, lam_grid, fill=-0.5):
    """Build a callable ``lam(r_perp, r_par)`` interpolating a calibrated
    lambda(separation) table, for :class:`TabulatedGHPDF`. Outside the grid it
    returns ``fill`` (default -0.5 = NIG), so the model degrades gracefully to
    NIG on scales where no calibration exists.
    """
    from scipy.interpolate import RegularGridInterpolator
    rgi = RegularGridInterpolator(
        (np.asarray(rp_grid, dtype=np.float64), np.asarray(rpar_grid, dtype=np.float64)),
        np.asarray(lam_grid, dtype=np.float64),
        bounds_error=False, fill_value=float(fill))

    def lam(r_perp, r_par):
        rp, rpar = np.broadcast_arrays(np.atleast_1d(np.asarray(r_perp, dtype=np.float64)),
                                       np.atleast_1d(np.asarray(r_par, dtype=np.float64)))
        out = rgi(np.stack([rp.ravel(), rpar.ravel()], axis=-1)).reshape(rp.shape)
        return np.where(np.isfinite(out), out, fill)

    return lam

LOS projection

liulu.physics.los_projection.project_to_los

project_to_los(moments: PairwiseVelocityMoments, n: int, mode: str = 'c')

Project the n-th order r-t moment onto the line of sight.

Parameters:

Name Type Description Default
moments PairwiseVelocityMoments

Object providing get_moment(r, r_order, t_order, mode).

required
n int

Order of the LOS moment.

required
mode str

'c' for central moments, 'm' for moments about the origin.

'c'

Returns:

Type Description
callable

los_moment(r_perp, r_par) -> ndarray returning the projected n-th LOS moment.

Source code in liulu/physics/los_projection.py
def project_to_los(moments: PairwiseVelocityMoments, n: int,
                   mode: str = "c"):
    """Project the n-th order r-t moment onto the line of sight.

    Parameters
    ----------
    moments : PairwiseVelocityMoments
        Object providing ``get_moment(r, r_order, t_order, mode)``.
    n : int
        Order of the LOS moment.
    mode : str
        ``'c'`` for central moments, ``'m'`` for moments about the origin.

    Returns
    -------
    callable
        ``los_moment(r_perp, r_par) -> ndarray`` returning the projected
        n-th LOS moment.
    """

    def los_moment(r_perp, r_par):
        r_perp = np.atleast_1d(np.asarray(r_perp, dtype=np.float64))
        r_par = np.atleast_1d(np.asarray(r_par, dtype=np.float64))

        r = np.sqrt(r_par ** 2 + r_perp ** 2)
        mu = np.where(r > 0, r_par / r, 0.0)

        result = np.zeros_like(r)
        for k in range(n + 1):
            t_order = n - k
            coeff = comb(n, k, exact=True)
            m = moments.get_moment(r, r_order=k, t_order=t_order, mode=mode)
            result = result + coeff * mu ** k * (1.0 - mu ** 2) ** (t_order / 2.0) * m

        return result

    return los_moment

Cosmology

liulu.physics.cosmology.Cosmology

Bases: FlatLambdaCDM

Background cosmology based on :class:astropy.cosmology.FlatLambdaCDM.

Parameters:

Name Type Description Default
Om0 float

Present-day matter density parameter.

required
H0 float

Hubble constant in km/s/Mpc.

required
z float

Redshift of evaluation.

required
Source code in liulu/physics/cosmology.py
class Cosmology(FlatLambdaCDM):
    """Background cosmology based on :class:`astropy.cosmology.FlatLambdaCDM`.

    Parameters
    ----------
    Om0 : float
        Present-day matter density parameter.
    H0 : float
        Hubble constant in km/s/Mpc.
    z : float
        Redshift of evaluation.
    """

    def __init__(self, Om0: float, H0: float, z: float, **kwargs):
        super().__init__(H0=H0, Om0=Om0, **kwargs)
        self.z = z

    @property
    def a(self) -> float:
        """Scale factor."""
        return 1.0 / (1.0 + self.z)

    def Hz(self) -> float:
        """Hubble parameter H(z) in km/s/Mpc."""
        return self.H(self.z).value

    def aH(self) -> float:
        """a * H(z) in km/s/(Mpc/h).

        For distances in Mpc/h:
            v[km/s] = a*H(z)[km/s/Mpc] * delta_r[Mpc/h] * (1/h)[Mpc/(Mpc/h)]

        So aH in km/s/(Mpc/h) = a * H(z) / h.
        """
        return self.a * self.Hz() / self.h

    def f(self) -> float:
        """Linear growth rate f = d ln D / d ln a.

        Uses the approximation f ~ Omega_m(z)^0.55 (Linder 2005).
        """
        Omz = self.Om0 * (1 + self.z)**3 / self.efunc(self.z)**2
        return Omz**0.55

    def D(self) -> float:
        """Linear growth factor D(z), normalised to D(0) = 1.

        Uses the Carroll, Press & Turner (1992) fitting formula.
        """
        def _growth_unnorm(Om, Ode):
            return (5.0 / 2.0) * Om / (
                Om**(4.0 / 7.0) - Ode
                + (1.0 + Om / 2.0) * (1.0 + Ode / 70.0)
            )

        Ez2 = self.efunc(self.z)**2
        Omz = self.Om0 * (1 + self.z)**3 / Ez2
        Odez = self.Ode0 / Ez2

        Dz = _growth_unnorm(Omz, Odez) / (1.0 + self.z)
        D0 = _growth_unnorm(self.Om0, self.Ode0)
        return Dz / D0

a property

a: float

Scale factor.

Hz

Hz() -> float

Hubble parameter H(z) in km/s/Mpc.

Source code in liulu/physics/cosmology.py
def Hz(self) -> float:
    """Hubble parameter H(z) in km/s/Mpc."""
    return self.H(self.z).value

aH

aH() -> float

a * H(z) in km/s/(Mpc/h).

For distances in Mpc/h: v[km/s] = a*H(z)[km/s/Mpc] * delta_r[Mpc/h] * (1/h)[Mpc/(Mpc/h)]

So aH in km/s/(Mpc/h) = a * H(z) / h.

Source code in liulu/physics/cosmology.py
def aH(self) -> float:
    """a * H(z) in km/s/(Mpc/h).

    For distances in Mpc/h:
        v[km/s] = a*H(z)[km/s/Mpc] * delta_r[Mpc/h] * (1/h)[Mpc/(Mpc/h)]

    So aH in km/s/(Mpc/h) = a * H(z) / h.
    """
    return self.a * self.Hz() / self.h

f

f() -> float

Linear growth rate f = d ln D / d ln a.

Uses the approximation f ~ Omega_m(z)^0.55 (Linder 2005).

Source code in liulu/physics/cosmology.py
def f(self) -> float:
    """Linear growth rate f = d ln D / d ln a.

    Uses the approximation f ~ Omega_m(z)^0.55 (Linder 2005).
    """
    Omz = self.Om0 * (1 + self.z)**3 / self.efunc(self.z)**2
    return Omz**0.55

D

D() -> float

Linear growth factor D(z), normalised to D(0) = 1.

Uses the Carroll, Press & Turner (1992) fitting formula.

Source code in liulu/physics/cosmology.py
def D(self) -> float:
    """Linear growth factor D(z), normalised to D(0) = 1.

    Uses the Carroll, Press & Turner (1992) fitting formula.
    """
    def _growth_unnorm(Om, Ode):
        return (5.0 / 2.0) * Om / (
            Om**(4.0 / 7.0) - Ode
            + (1.0 + Om / 2.0) * (1.0 + Ode / 70.0)
        )

    Ez2 = self.efunc(self.z)**2
    Omz = self.Om0 * (1 + self.z)**3 / Ez2
    Odez = self.Ode0 / Ez2

    Dz = _growth_unnorm(Omz, Odez) / (1.0 + self.z)
    D0 = _growth_unnorm(self.Om0, self.Ode0)
    return Dz / D0