Skip to content

StreamingModel

The driver that wires the physics components to the numerical integration and produces redshift-space observables.

liulu.model.StreamingModel

Streaming model of redshift-space distortions.

Parameters:

Name Type Description Default
xi_real RealSpaceCorrelation

Real-space correlation function xi(r).

required
velocity_pdf VelocityPDF

LOS pairwise velocity PDF, already constructed from moments.

required
aH float

a * H(z) in km/s/(Mpc/h).

required
y_max float

Integration limit for y (real-space parallel distance), Mpc/h.

150.0
n_y int

Number of quadrature points per half-interval for y integration.

600
n_mu int

Number of mu points for multipole projection.

100
Source code in liulu/model.py
class StreamingModel:
    """Streaming model of redshift-space distortions.

    Parameters
    ----------
    xi_real : RealSpaceCorrelation
        Real-space correlation function xi(r).
    velocity_pdf : VelocityPDF
        LOS pairwise velocity PDF, already constructed from moments.
    aH : float
        a * H(z) in km/s/(Mpc/h).
    y_max : float
        Integration limit for y (real-space parallel distance), Mpc/h.
    n_y : int
        Number of quadrature points per half-interval for y integration.
    n_mu : int
        Number of mu points for multipole projection.
    """

    def __init__(
        self,
        xi_real: RealSpaceCorrelation,
        velocity_pdf: VelocityPDF,
        aH: float,
        y_max: float = 150.0,
        n_y: int = 600,
        n_mu: int = 100,
    ):
        self.xi = xi_real
        self.pdf = velocity_pdf
        self.aH = aH
        self.y_max = y_max
        self.n_y = n_y
        self.n_mu = n_mu

    # ── core integral ─────────────────────────────────────────────────

    def xi_s_smu(self, s, mu):
        """Compute xi^s on a (s, mu) grid.

        Parameters
        ----------
        s : array_like
            Redshift-space pair separations, Mpc/h.
        mu : array_like
            Cosine of the angle to the line of sight.

        Returns
        -------
        ndarray, shape (n_s, n_mu)
            Redshift-space correlation function.
        """
        s = np.atleast_1d(np.asarray(s, dtype=np.float64))
        mu = np.atleast_1d(np.asarray(mu, dtype=np.float64))

        S = s.reshape(-1, 1)
        MU = mu.reshape(1, -1)
        s_par = S * MU
        s_perp = S * np.sqrt(1.0 - MU ** 2)

        # Build symmetric y-grid, avoiding y=0
        epsilon = 1e-3
        y_pos = np.linspace(epsilon, self.y_max, self.n_y)
        y_full = np.concatenate([-y_pos[::-1], y_pos])

        # Broadcast: (n_s, n_mu, n_y)
        Y = y_full[np.newaxis, np.newaxis, :]
        PERP = s_perp[:, :, np.newaxis]
        PARA = s_par[:, :, np.newaxis]

        r = np.sqrt(PERP ** 2 + Y ** 2)
        v_los = self.aH * (PARA - Y) * np.sign(Y)

        xi_r = self.xi(r)
        pdf_val = np.nan_to_num(
            self.pdf(v_los, PERP, np.abs(Y)), copy=False
        )
        integrand = self.aH * (1.0 + xi_r) * pdf_val

        one_plus_xi_s = simpson(integrand, x=y_full, axis=-1)
        return one_plus_xi_s - 1.0

    # ── convenience wrappers ──────────────────────────────────────────

    def xi_s_2d(self, s_perp, s_par):
        """Evaluate xi^s at given (s_perp, s_par) points.

        Parameters
        ----------
        s_perp : array_like
            Transverse separation(s), Mpc/h.
        s_par : array_like
            Line-of-sight separation(s), Mpc/h.

        Returns
        -------
        ndarray
            xi^s values, same shape as input.
        """
        s_perp = np.atleast_1d(np.asarray(s_perp, dtype=np.float64))
        s_par = np.atleast_1d(np.asarray(s_par, dtype=np.float64))

        s = np.sqrt(s_perp ** 2 + s_par ** 2)
        mu = np.where(s > 0, s_par / s, 0.0)

        # Evaluate one by one (each point may have unique s,mu)
        shape = s.shape
        s_flat = s.ravel()
        mu_flat = mu.ravel()

        result = np.empty(s_flat.size)
        for i in range(s_flat.size):
            xi = self.xi_s_smu(np.array([s_flat[i]]), np.array([mu_flat[i]]))
            result[i] = xi[0, 0]
        return result.reshape(shape)

    def multipoles(self, s_array, ell_max=4):
        """Compute multipoles xi_0(s), xi_2(s), xi_4(s).

        Uses Gauss-Legendre quadrature over mu in [0, 1] (even-ell symmetry).

        Parameters
        ----------
        s_array : array_like
            Separation values, Mpc/h.
        ell_max : int
            Maximum multipole order (must be even).

        Returns
        -------
        dict
            {0: xi_0(s), 2: xi_2(s), 4: xi_4(s), ...}
        """
        s_array = np.atleast_1d(np.asarray(s_array, dtype=np.float64))

        # Gauss-Legendre nodes on [0, 1]
        nodes, weights = np.polynomial.legendre.leggauss(self.n_mu)
        mu = 0.5 * (nodes + 1.0)
        w = 0.5 * weights

        # Vectorised: compute xi^s for all (s, mu) at once
        xi_s = self.xi_s_smu(s_array, mu)  # shape (n_s, n_mu)

        ell_list = list(range(0, ell_max + 1, 2))
        result = {}
        for ell in ell_list:
            L = legendre(ell)(mu)
            result[ell] = (2 * ell + 1) * np.sum(w * xi_s * L, axis=1)
        return result

    def wedges(self, s_array, mu_edges, n_mu_per_wedge=50):
        """Compute wedge-averaged xi(s, Delta_mu).

        Parameters
        ----------
        s_array : array_like
            Separation values, Mpc/h.
        mu_edges : list of tuples
            List of (mu_min, mu_max) for each wedge.
        n_mu_per_wedge : int
            Number of GL nodes per wedge.

        Returns
        -------
        list of ndarray
            Wedge-averaged xi(s) for each wedge.
        """
        s_array = np.atleast_1d(np.asarray(s_array, dtype=np.float64))
        result = []
        for mu_min, mu_max in mu_edges:
            nodes, weights = np.polynomial.legendre.leggauss(n_mu_per_wedge)
            mu = 0.5 * (mu_max + mu_min) + 0.5 * (mu_max - mu_min) * nodes
            w = 0.5 * (mu_max - mu_min) * weights

            xi_s = self.xi_s_smu(s_array, mu)  # (n_s, n_mu)
            xi_wedge = np.sum(w * xi_s, axis=1) / (mu_max - mu_min)
            result.append(xi_wedge)
        return result

xi_s_smu

xi_s_smu(s, mu)

Compute xi^s on a (s, mu) grid.

Parameters:

Name Type Description Default
s array_like

Redshift-space pair separations, Mpc/h.

required
mu array_like

Cosine of the angle to the line of sight.

required

Returns:

Type Description
(ndarray, shape(n_s, n_mu))

Redshift-space correlation function.

Source code in liulu/model.py
def xi_s_smu(self, s, mu):
    """Compute xi^s on a (s, mu) grid.

    Parameters
    ----------
    s : array_like
        Redshift-space pair separations, Mpc/h.
    mu : array_like
        Cosine of the angle to the line of sight.

    Returns
    -------
    ndarray, shape (n_s, n_mu)
        Redshift-space correlation function.
    """
    s = np.atleast_1d(np.asarray(s, dtype=np.float64))
    mu = np.atleast_1d(np.asarray(mu, dtype=np.float64))

    S = s.reshape(-1, 1)
    MU = mu.reshape(1, -1)
    s_par = S * MU
    s_perp = S * np.sqrt(1.0 - MU ** 2)

    # Build symmetric y-grid, avoiding y=0
    epsilon = 1e-3
    y_pos = np.linspace(epsilon, self.y_max, self.n_y)
    y_full = np.concatenate([-y_pos[::-1], y_pos])

    # Broadcast: (n_s, n_mu, n_y)
    Y = y_full[np.newaxis, np.newaxis, :]
    PERP = s_perp[:, :, np.newaxis]
    PARA = s_par[:, :, np.newaxis]

    r = np.sqrt(PERP ** 2 + Y ** 2)
    v_los = self.aH * (PARA - Y) * np.sign(Y)

    xi_r = self.xi(r)
    pdf_val = np.nan_to_num(
        self.pdf(v_los, PERP, np.abs(Y)), copy=False
    )
    integrand = self.aH * (1.0 + xi_r) * pdf_val

    one_plus_xi_s = simpson(integrand, x=y_full, axis=-1)
    return one_plus_xi_s - 1.0

xi_s_2d

xi_s_2d(s_perp, s_par)

Evaluate xi^s at given (s_perp, s_par) points.

Parameters:

Name Type Description Default
s_perp array_like

Transverse separation(s), Mpc/h.

required
s_par array_like

Line-of-sight separation(s), Mpc/h.

required

Returns:

Type Description
ndarray

xi^s values, same shape as input.

Source code in liulu/model.py
def xi_s_2d(self, s_perp, s_par):
    """Evaluate xi^s at given (s_perp, s_par) points.

    Parameters
    ----------
    s_perp : array_like
        Transverse separation(s), Mpc/h.
    s_par : array_like
        Line-of-sight separation(s), Mpc/h.

    Returns
    -------
    ndarray
        xi^s values, same shape as input.
    """
    s_perp = np.atleast_1d(np.asarray(s_perp, dtype=np.float64))
    s_par = np.atleast_1d(np.asarray(s_par, dtype=np.float64))

    s = np.sqrt(s_perp ** 2 + s_par ** 2)
    mu = np.where(s > 0, s_par / s, 0.0)

    # Evaluate one by one (each point may have unique s,mu)
    shape = s.shape
    s_flat = s.ravel()
    mu_flat = mu.ravel()

    result = np.empty(s_flat.size)
    for i in range(s_flat.size):
        xi = self.xi_s_smu(np.array([s_flat[i]]), np.array([mu_flat[i]]))
        result[i] = xi[0, 0]
    return result.reshape(shape)

multipoles

multipoles(s_array, ell_max=4)

Compute multipoles xi_0(s), xi_2(s), xi_4(s).

Uses Gauss-Legendre quadrature over mu in [0, 1] (even-ell symmetry).

Parameters:

Name Type Description Default
s_array array_like

Separation values, Mpc/h.

required
ell_max int

Maximum multipole order (must be even).

4

Returns:

Type Description
dict

{0: xi_0(s), 2: xi_2(s), 4: xi_4(s), ...}

Source code in liulu/model.py
def multipoles(self, s_array, ell_max=4):
    """Compute multipoles xi_0(s), xi_2(s), xi_4(s).

    Uses Gauss-Legendre quadrature over mu in [0, 1] (even-ell symmetry).

    Parameters
    ----------
    s_array : array_like
        Separation values, Mpc/h.
    ell_max : int
        Maximum multipole order (must be even).

    Returns
    -------
    dict
        {0: xi_0(s), 2: xi_2(s), 4: xi_4(s), ...}
    """
    s_array = np.atleast_1d(np.asarray(s_array, dtype=np.float64))

    # Gauss-Legendre nodes on [0, 1]
    nodes, weights = np.polynomial.legendre.leggauss(self.n_mu)
    mu = 0.5 * (nodes + 1.0)
    w = 0.5 * weights

    # Vectorised: compute xi^s for all (s, mu) at once
    xi_s = self.xi_s_smu(s_array, mu)  # shape (n_s, n_mu)

    ell_list = list(range(0, ell_max + 1, 2))
    result = {}
    for ell in ell_list:
        L = legendre(ell)(mu)
        result[ell] = (2 * ell + 1) * np.sum(w * xi_s * L, axis=1)
    return result

wedges

wedges(s_array, mu_edges, n_mu_per_wedge=50)

Compute wedge-averaged xi(s, Delta_mu).

Parameters:

Name Type Description Default
s_array array_like

Separation values, Mpc/h.

required
mu_edges list of tuples

List of (mu_min, mu_max) for each wedge.

required
n_mu_per_wedge int

Number of GL nodes per wedge.

50

Returns:

Type Description
list of ndarray

Wedge-averaged xi(s) for each wedge.

Source code in liulu/model.py
def wedges(self, s_array, mu_edges, n_mu_per_wedge=50):
    """Compute wedge-averaged xi(s, Delta_mu).

    Parameters
    ----------
    s_array : array_like
        Separation values, Mpc/h.
    mu_edges : list of tuples
        List of (mu_min, mu_max) for each wedge.
    n_mu_per_wedge : int
        Number of GL nodes per wedge.

    Returns
    -------
    list of ndarray
        Wedge-averaged xi(s) for each wedge.
    """
    s_array = np.atleast_1d(np.asarray(s_array, dtype=np.float64))
    result = []
    for mu_min, mu_max in mu_edges:
        nodes, weights = np.polynomial.legendre.leggauss(n_mu_per_wedge)
        mu = 0.5 * (mu_max + mu_min) + 0.5 * (mu_max - mu_min) * nodes
        w = 0.5 * (mu_max - mu_min) * weights

        xi_s = self.xi_s_smu(s_array, mu)  # (n_s, n_mu)
        xi_wedge = np.sum(w * xi_s, axis=1) / (mu_max - mu_min)
        result.append(xi_wedge)
    return result