Source code for validation.robustness.stats

"""Statistical tools for robustness analysis.

Pure functions for time series analysis: Hodrick-Prescott filtering,
lead-lag cross-correlations, autoregressive model fitting, and
impulse-response function computation. No simulation dependencies.
"""

from __future__ import annotations

import numpy as np
from numpy.typing import NDArray
from scipy import sparse
from scipy.sparse.linalg import spsolve


[docs] def hp_filter( y: NDArray[np.floating], lamb: float = 1600.0, ) -> tuple[NDArray[np.floating], NDArray[np.floating]]: """Hodrick-Prescott filter decomposition into trend and cycle. Solves the penalized least-squares problem: min_τ Σ(y_t - τ_t)² + λ Σ(τ_{t+1} - 2τ_t + τ_{t-1})² via the sparse linear system ``(I + λ K'K) τ = y``. Parameters ---------- y : NDArray Time series of length T. lamb : float Smoothing parameter. Standard values: 1600 for quarterly data, 6.25 for annual, 129600 for monthly. Returns ------- trend : NDArray Trend component (τ). cycle : NDArray Cyclical component (y - τ). """ t = len(y) if t < 3: return y.copy(), np.zeros_like(y) # Build second-difference matrix K of shape (T-2, T) # K[i, :] = [0, ..., 0, 1, -2, 1, 0, ..., 0] with 1 at position i diags = np.array([np.ones(t - 2), -2 * np.ones(t - 2), np.ones(t - 2)]) offsets = [0, 1, 2] k_mat = sparse.diags(diags, offsets, shape=(t - 2, t), format="csc") # Solve (I + λ K'K) τ = y identity = sparse.eye(t, format="csc") a_mat = identity + lamb * (k_mat.T @ k_mat) trend = spsolve(a_mat, y) cycle = y - trend return trend, cycle
[docs] def cross_correlation( x: NDArray[np.floating], y: NDArray[np.floating], max_lag: int = 4, ) -> NDArray[np.floating]: """Cross-correlation of x and y at integer leads and lags. Computes ``corr(x_t, y_{t+k})`` for ``k = -max_lag, ..., 0, ..., +max_lag``. At lag k=0 this is the contemporaneous correlation. Positive k means y *leads* x (y is shifted forward relative to x). Parameters ---------- x, y : NDArray Time series of equal length T. max_lag : int Maximum lead/lag to compute. Returns ------- NDArray Array of shape ``(2 * max_lag + 1,)`` with correlations. Index ``max_lag + k`` holds ``corr(x_t, y_{t+k})``. """ n = len(x) if n != len(y): raise ValueError(f"Series must have equal length, got {n} and {len(y)}") result = np.zeros(2 * max_lag + 1) for k in range(-max_lag, max_lag + 1): if k >= 0: x_seg = x[: n - k] y_seg = y[k:] else: x_seg = x[-k:] y_seg = y[: n + k] if len(x_seg) < 3: result[max_lag + k] = np.nan else: result[max_lag + k] = np.corrcoef(x_seg, y_seg)[0, 1] return result
[docs] def fit_ar( y: NDArray[np.floating], order: int = 2, ) -> tuple[NDArray[np.floating], float]: """Fit an AR(p) model via ordinary least squares. Estimates the model ``y_t = c + φ_1 y_{t-1} + ... + φ_p y_{t-p} + ε_t``. Parameters ---------- y : NDArray Time series of length T. order : int AR order (p). Must be >= 1. Returns ------- coeffs : NDArray Estimated coefficients ``[c, φ_1, ..., φ_p]`` of length ``order + 1``. r_squared : float Coefficient of determination (R²). """ t = len(y) if t <= order + 1: raise ValueError( f"Series length {t} too short for AR({order}), need > {order + 1}" ) # Build design matrix: [1, y_{t-1}, y_{t-2}, ..., y_{t-p}] x_mat = np.ones((t - order, order + 1)) for lag in range(1, order + 1): x_mat[:, lag] = y[order - lag : t - lag] y_dep = y[order:] # Solve via least squares coeffs, _residuals, _, _ = np.linalg.lstsq(x_mat, y_dep, rcond=None) # Compute R² y_pred = x_mat @ coeffs ss_res = np.sum((y_dep - y_pred) ** 2) ss_tot = np.sum((y_dep - np.mean(y_dep)) ** 2) r_squared = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0 return coeffs, float(r_squared)
[docs] def impulse_response( ar_coeffs: NDArray[np.floating], n_periods: int = 20, ) -> NDArray[np.floating]: """Compute impulse-response function from AR coefficients. Simulates the response to a unit shock at t=0 through the AR recursion. Parameters ---------- ar_coeffs : NDArray AR coefficients ``[c, φ_1, ..., φ_p]`` from :func:`fit_ar`. n_periods : int Number of periods to simulate. Returns ------- NDArray Impulse-response values of length ``n_periods``. """ order = len(ar_coeffs) - 1 # Exclude constant phi = ar_coeffs[1:] # AR coefficients only (no constant) irf = np.zeros(n_periods) irf[0] = 1.0 # Unit shock for t in range(1, n_periods): for lag in range(min(t, order)): irf[t] += phi[lag] * irf[t - lag - 1] return irf