Skip to content

Numerics

Pure numerical routines — no cosmology knowledge; they operate on callables and arrays. See Architecture.

Interpolation

liulu.numerics.interpolation.loglog_interp

loglog_interp(x, y, fill_value=0.0, name='table', warn=True)

Power-law interpolation: linear in (log x, log y). Near-exact for the steeply-falling, ~power-law xi(r) of the streaming weight, where ordinary log-linear interpolation overestimates the convex curve between bins and biases the model monopole high.

Robust to non-positive y: points with y <= 0 are excluded from the log-log fit, and queries beyond the positive support fall back to the configured fill (fill_value follows the same scalar / (below, above) / "edge" convention as :func:log_linear_interp). If fewer than two positive points exist it delegates to :func:log_linear_interp.

Source code in liulu/numerics/interpolation.py
def loglog_interp(x, y, fill_value=0.0, name="table", warn=True):
    """Power-law interpolation: linear in (log x, log y). Near-exact for the
    steeply-falling, ~power-law xi(r) of the streaming weight, where ordinary
    log-linear interpolation overestimates the convex curve between bins and
    biases the model monopole high.

    Robust to non-positive y: points with y <= 0 are excluded from the log-log
    fit, and queries beyond the positive support fall back to the configured
    fill (`fill_value` follows the same scalar / (below, above) / "edge"
    convention as :func:`log_linear_interp`). If fewer than two positive points
    exist it delegates to :func:`log_linear_interp`.
    """
    x = np.asarray(x, dtype=np.float64)
    y = np.asarray(y, dtype=np.float64)
    pos = y > 0
    if pos.sum() < 2:
        return log_linear_interp(x, y, fill_value=fill_value, name=name, warn=warn)

    xp, yp = x[pos], y[pos]
    below, above = _resolve_fill(fill_value, y)
    x_min, x_max = float(x[0]), float(x[-1])
    xp_lo, xp_hi = float(xp[0]), float(xp[-1])
    _interp = interp1d(np.log(xp), np.log(yp), kind='linear',
                       bounds_error=False, fill_value=np.nan)

    def interpolator(x_new):
        x_new = np.atleast_1d(np.asarray(x_new, dtype=np.float64))
        if warn:
            _warn_out_of_range(x_new, x_min, x_max, name)
        with np.errstate(divide='ignore', invalid='ignore'):
            out = np.exp(_interp(np.log(x_new)))
        # below / above the positive log-log support -> configured fills
        out = np.where(x_new < xp_lo, below, out)
        out = np.where(x_new > xp_hi, above, out)
        out = np.where(np.isfinite(out), out, 0.0)
        return out

    return interpolator

liulu.numerics.interpolation.log_linear_interp

log_linear_interp(x, y, fill_value=0.0, name='table', warn=True)

Log-linear interpolation: linear in log(x), linear in y.

Handles negative y values (e.g. pairwise velocities) by interpolating y directly (not log(y)).

Parameters:

Name Type Description Default
x ndarray

Abscissae (must be positive, ascending).

required
y ndarray

Ordinates.

required
fill_value float or tuple

Out-of-range policy; see the module docstring. Defaults to 0.0.

0.0
name str

Label used in the out-of-range warning.

'table'
warn bool

Whether to warn on out-of-range queries.

True

Returns:

Type Description
callable

Interpolator f(x_new) -> y_new.

Source code in liulu/numerics/interpolation.py
def log_linear_interp(x, y, fill_value=0.0, name="table", warn=True):
    """Log-linear interpolation: linear in log(x), linear in y.

    Handles negative y values (e.g. pairwise velocities) by interpolating
    y directly (not log(y)).

    Parameters
    ----------
    x : ndarray
        Abscissae (must be positive, ascending).
    y : ndarray
        Ordinates.
    fill_value : float or tuple
        Out-of-range policy; see the module docstring. Defaults to ``0.0``.
    name : str
        Label used in the out-of-range warning.
    warn : bool
        Whether to warn on out-of-range queries.

    Returns
    -------
    callable
        Interpolator f(x_new) -> y_new.
    """
    x = np.asarray(x, dtype=np.float64)
    y = np.asarray(y, dtype=np.float64)
    below, above = _resolve_fill(fill_value, y)
    x_min, x_max = float(x[0]), float(x[-1])

    _interp = interp1d(np.log(x), y, kind='linear',
                       bounds_error=False, fill_value=(below, above))

    def interpolator(x_new):
        x_new = np.atleast_1d(np.asarray(x_new, dtype=np.float64))
        if warn:
            _warn_out_of_range(x_new, x_min, x_max, name)
        with np.errstate(divide='ignore', invalid='ignore'):
            result = _interp(np.log(x_new))
        # Non-positive x_new -> log is nan/-inf; treat as below-range.
        return np.where(np.isnan(result), below, result)

    return interpolator

liulu.numerics.interpolation.log_spline_interp

log_spline_interp(x, y, fill_value=0.0, name='table', warn=True)

Cubic spline interpolation in log(x) space.

Parameters:

Name Type Description Default
x ndarray

Abscissae (must be positive, ascending).

required
y ndarray

Ordinates.

required
fill_value float or tuple

Out-of-range policy; see the module docstring. Defaults to 0.0.

0.0
name str

Label used in the out-of-range warning.

'table'
warn bool

Whether to warn on out-of-range queries.

True

Returns:

Type Description
callable

Interpolator f(x_new) -> y_new.

Source code in liulu/numerics/interpolation.py
def log_spline_interp(x, y, fill_value=0.0, name="table", warn=True):
    """Cubic spline interpolation in log(x) space.

    Parameters
    ----------
    x : ndarray
        Abscissae (must be positive, ascending).
    y : ndarray
        Ordinates.
    fill_value : float or tuple
        Out-of-range policy; see the module docstring. Defaults to ``0.0``.
    name : str
        Label used in the out-of-range warning.
    warn : bool
        Whether to warn on out-of-range queries.

    Returns
    -------
    callable
        Interpolator f(x_new) -> y_new.
    """
    x = np.asarray(x, dtype=np.float64)
    y = np.asarray(y, dtype=np.float64)
    below, above = _resolve_fill(fill_value, y)
    x_min, x_max = float(x[0]), float(x[-1])

    _spline = CubicSpline(np.log(x), y, extrapolate=False)

    def interpolator(x_new):
        x_new = np.atleast_1d(np.asarray(x_new, dtype=np.float64))
        if warn:
            _warn_out_of_range(x_new, x_min, x_max, name)
        with np.errstate(divide='ignore', invalid='ignore'):
            result = _spline(np.log(x_new))
        # CubicSpline(extrapolate=False) yields nan outside the range (and for
        # non-positive x_new); map below/above range to the configured fills.
        result = np.where(x_new > x_max, above, result)
        result = np.where(np.isnan(result), below, result)
        return result

    return interpolator

Hankel transforms

liulu.numerics.hankel.pk_to_xi

pk_to_xi(k, pk, r)

Transform P(k) to xi(r) via direct quadrature.

Parameters:

Name Type Description Default
k ndarray

Wavenumbers, h/Mpc.

required
pk ndarray

Power spectrum, (Mpc/h)^3.

required
r array_like

Separations at which to evaluate xi, Mpc/h.

required

Returns:

Type Description
ndarray

Correlation function xi(r).

Source code in liulu/numerics/hankel.py
def pk_to_xi(k, pk, r):
    """Transform P(k) to xi(r) via direct quadrature.

    Parameters
    ----------
    k : ndarray
        Wavenumbers, h/Mpc.
    pk : ndarray
        Power spectrum, (Mpc/h)^3.
    r : array_like
        Separations at which to evaluate xi, Mpc/h.

    Returns
    -------
    ndarray
        Correlation function xi(r).
    """
    r = np.atleast_1d(np.asarray(r, dtype=np.float64))
    xi = np.empty_like(r)

    for i, ri in enumerate(r):
        kr = k * ri
        # j_0(x) = sin(x)/x, handle x->0 limit
        j0 = np.where(kr > 1e-10, np.sin(kr) / kr, 1.0)
        integrand = pk * k**2 * j0
        xi[i] = np.trapezoid(integrand, k) / (2.0 * np.pi**2)

    return xi

liulu.numerics.hankel.xi_to_pk

xi_to_pk(r, xi, k)

Transform xi(r) to P(k) via direct quadrature (inverse Hankel).

Parameters:

Name Type Description Default
r ndarray

Separations, Mpc/h.

required
xi ndarray

Correlation function.

required
k array_like

Wavenumbers at which to evaluate P(k), h/Mpc.

required

Returns:

Type Description
ndarray

Power spectrum P(k), (Mpc/h)^3.

Source code in liulu/numerics/hankel.py
def xi_to_pk(r, xi, k):
    """Transform xi(r) to P(k) via direct quadrature (inverse Hankel).

    Parameters
    ----------
    r : ndarray
        Separations, Mpc/h.
    xi : ndarray
        Correlation function.
    k : array_like
        Wavenumbers at which to evaluate P(k), h/Mpc.

    Returns
    -------
    ndarray
        Power spectrum P(k), (Mpc/h)^3.
    """
    k = np.atleast_1d(np.asarray(k, dtype=np.float64))
    pk = np.empty_like(k)

    for i, ki in enumerate(k):
        kr = ki * r
        j0 = np.where(kr > 1e-10, np.sin(kr) / kr, 1.0)
        integrand = xi * r**2 * j0
        pk[i] = 4.0 * np.pi * np.trapezoid(integrand, r)

    return pk