Source code for calibration.stability

"""Multi-seed stability testing with tiered evaluation and ranking strategies.

This module handles the stability testing phase of calibration: evaluating
top candidates from screening across multiple seeds with configurable
ranking strategies and tiered pruning.
"""

from __future__ import annotations

import statistics
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Any

from calibration.analysis import CalibrationResult, format_eta
from calibration.grid import count_combinations, generate_combinations
from calibration.screening import delete_checkpoint, run_screening, save_checkpoint
from validation import (
    StabilityResult,
    compute_combined_score,
    get_validation_func,
    get_validation_funcs,
)


def _evaluate_single_seed(
    params: dict[str, Any],
    scenario: str,
    seed: int,
    n_periods: int,
) -> tuple[dict[str, Any], int, float, int]:
    """Evaluate a single seed for stability testing. Returns (params, seed, score, n_fail)."""
    validate = get_validation_func(scenario)
    result = validate(seed=seed, n_periods=n_periods, **params)
    return dict(params), seed, result.total_score, result.n_fail


[docs] def evaluate_stability( params: dict[str, Any], scenario: str, seeds: list[int], n_periods: int, ) -> CalibrationResult: """Run multi-seed stability test for full evaluation. Parameters ---------- params : dict Parameter configuration. scenario : str Scenario name. seeds : list[int] List of random seeds to test. n_periods : int Number of simulation periods. Returns ------- CalibrationResult Result with stability metrics and combined score. """ _, run_stability_fn, _, _ = get_validation_funcs(scenario) stability: StabilityResult = run_stability_fn( seeds=seeds, n_periods=n_periods, **params ) combined = compute_combined_score(stability) return CalibrationResult( params=params, single_score=stability.seed_results[0].total_score, n_pass=stability.seed_results[0].n_pass, n_warn=stability.seed_results[0].n_warn, n_fail=stability.seed_results[0].n_fail, mean_score=stability.mean_score, std_score=stability.std_score, pass_rate=stability.pass_rate, combined_score=combined, stability_result=stability, seed_scores=[sr.total_score for sr in stability.seed_results], )
[docs] def parse_stability_tiers(tiers_str: str) -> list[tuple[int, int]]: """Parse stability tiers from CLI string. Parameters ---------- tiers_str : str Format: "100:10,50:20,10:100" meaning (top 100 x 10 seeds, top 50 x 20 seeds, top 10 x 100 seeds) Returns ------- list[tuple[int, int]] List of (n_configs, total_seeds) tuples. """ tiers = [] for part in tiers_str.split(","): configs, seeds = part.strip().split(":") tiers.append((int(configs), int(seeds))) return tiers
def _rank_candidates( candidates: list[CalibrationResult], rank_by: str = "combined", k_factor: float = 1.0, ) -> list[CalibrationResult]: """Rank candidates by the specified strategy. Parameters ---------- candidates : list[CalibrationResult] Candidates to rank. rank_by : str Ranking strategy: "combined", "stability", or "mean". k_factor : float k in mean - k*std formula for "combined" ranking. Returns ------- list[CalibrationResult] Sorted candidates (best first). """ if rank_by == "stability": # Sort by (pass_rate DESC, n_fail ASC, combined_score DESC) candidates.sort( key=lambda r: ( r.pass_rate or 0.0, -(r.n_fail or 0), r.combined_score or 0.0, ), reverse=True, ) elif rank_by == "mean": candidates.sort(key=lambda r: r.mean_score or 0.0, reverse=True) else: # "combined": mean * pass_rate * (1 - k * std) for c in candidates: if c.mean_score is not None and c.std_score is not None: pr = c.pass_rate if c.pass_rate is not None else 1.0 c.combined_score = c.mean_score * pr * (1.0 - k_factor * c.std_score) candidates.sort(key=lambda r: r.combined_score or 0.0, reverse=True) return candidates
[docs] def run_tiered_stability( candidates: list[CalibrationResult], scenario: str, tiers: list[tuple[int, int]], n_workers: int = 10, n_periods: int = 1000, avg_time_per_run: float = 0.0, rank_by: str = "combined", k_factor: float = 1.0, ) -> list[CalibrationResult]: """Run incremental tiered stability testing. Each tier runs only NEW seeds (not previously tested ones) and accumulates all seed scores for ranking. Parameters ---------- candidates : list[CalibrationResult] Screening results to stability-test. scenario : str Scenario name. tiers : list[tuple[int, int]] List of (n_configs, total_seeds) -- each tier tests the top n_configs using enough new seeds to reach total_seeds cumulative. n_workers : int Parallel workers. n_periods : int Simulation periods. avg_time_per_run : float Estimated time per run for ETA. rank_by : str Ranking strategy: "combined" (mean*(1-k*std)), "stability" (pass_rate/n_fail priority), or "mean" (mean_score only). k_factor : float Configurable k in mean - k*std formula (for "combined" ranking). Returns ------- list[CalibrationResult] Final results sorted by ranking strategy (best first). """ # Initialize seed_scores/seed_fails from screening (seed 0) for c in candidates: if c.seed_scores is None: c.seed_scores = [c.single_score] if c.seed_fails is None: c.seed_fails = [c.n_fail] current = candidates for tier_idx, (n_configs, total_seeds) in enumerate(tiers): tier_num = tier_idx + 1 # Take top n_configs (or all if fewer available) top = current[:n_configs] print(f"\n Tier {tier_num}: {len(top)} configs x {total_seeds} total seeds") # Determine which new seeds each config needs for c in top: existing_n = len(c.seed_scores or []) new_seeds_needed = max(0, total_seeds - existing_n) if new_seeds_needed > 0: new_seed_ids = list(range(existing_n, total_seeds)) print( f" Testing {c.single_score:.3f} config: " f"+{new_seeds_needed} seeds ({existing_n}->{total_seeds})" ) # Run new seeds new_scores: list[float] = [] new_fails: list[int] = [] if n_workers > 1: with ProcessPoolExecutor(max_workers=n_workers) as executor: futures = [ executor.submit( _evaluate_single_seed, c.params, scenario, seed, n_periods, ) for seed in new_seed_ids ] for i, future in enumerate(as_completed(futures)): _, _seed, score, n_fail = future.result() new_scores.append(score) new_fails.append(n_fail) done = i + 1 remaining = new_seeds_needed - done eta = format_eta(remaining, avg_time_per_run, n_workers) print( f" Tier {tier_num}: " f"Testing {done}/{new_seeds_needed} " f"({100 * done / new_seeds_needed:.0f}%) " f"| {remaining} remaining | ETA: {eta}" ) else: for i, seed in enumerate(new_seed_ids): _, _seed, score, n_fail = _evaluate_single_seed( c.params, scenario, seed, n_periods ) new_scores.append(score) new_fails.append(n_fail) done = i + 1 remaining = new_seeds_needed - done eta = format_eta(remaining, avg_time_per_run, n_workers) print( f" Tier {tier_num}: " f"Testing {done}/{new_seeds_needed} " f"({100 * done / new_seeds_needed:.0f}%) " f"| {remaining} remaining | ETA: {eta}" ) c.seed_scores = (c.seed_scores or []) + new_scores c.seed_fails = (c.seed_fails or [c.n_fail]) + new_fails # Update aggregate metrics scores = c.seed_scores or [] if scores: c.mean_score = statistics.mean(scores) c.std_score = statistics.stdev(scores) if len(scores) > 1 else 0.0 n_passed = sum(1 for nf in (c.seed_fails or []) if nf == 0) c.pass_rate = n_passed / len(c.seed_fails) if c.seed_fails else None pr = c.pass_rate if c.pass_rate is not None else 1.0 c.combined_score = c.mean_score * pr * (1.0 - k_factor * c.std_score) # Rank by chosen strategy top = _rank_candidates(top, rank_by=rank_by, k_factor=k_factor) current = top # Print tier results print(f"\n Tier {tier_num} results (top 5):") for i, r in enumerate(current[:5]): print( f" #{i + 1}: combined={r.combined_score:.4f} " f"mean={r.mean_score:.3f} +/- {r.std_score:.3f} " f"({len(r.seed_scores or [])} seeds)" ) # Checkpoint after each tier save_checkpoint(current, scenario, "stability") delete_checkpoint(scenario, "stability") return current
# ============================================================================= # Focused calibration (orchestrates screening + stability) # =============================================================================
[docs] def run_focused_calibration( grid: dict[str, list[Any]], fixed_params: dict[str, Any], scenario: str = "baseline", n_workers: int = 10, n_periods: int = 1000, stability_tiers: list[tuple[int, int]] | None = None, avg_time_per_run: float = 0.0, resume: bool = False, rank_by: str = "combined", k_factor: float = 1.0, ) -> list[CalibrationResult]: """Run calibration on focused grid with fixed params. Parameters ---------- grid : dict Parameter grid to search (from build_focused_grid). fixed_params : dict Fixed parameter values (from build_focused_grid). scenario : str Scenario name. n_workers : int Number of parallel workers. n_periods : int Number of simulation periods. stability_tiers : list[tuple[int, int]], optional Tiered stability config. Defaults to [(100, 10), (50, 20), (10, 100)]. avg_time_per_run : float Average time per simulation run (from sensitivity). resume : bool If True, resume from checkpoint. rank_by : str Ranking strategy for stability testing. k_factor : float k in mean - k*std formula. Returns ------- list[CalibrationResult] Results sorted by ranking strategy (best first). """ if stability_tiers is None: stability_tiers = [(100, 10), (50, 20), (10, 100)] total = count_combinations(grid) print(f"\n[{scenario}] Focused Grid Search: {total} combinations") print(f"Fixed params: {fixed_params}") # Generate all combinations (merging fixed params) combinations = list(generate_combinations(grid, fixed=fixed_params)) # Screening phase print(f"\nScreening {total} combinations with {n_workers} workers...") screening_results = run_screening( combinations, scenario, n_workers=n_workers, n_periods=n_periods, avg_time_per_run=avg_time_per_run, resume=resume, ) # Stability testing phase print("\nTiered stability testing...") return run_tiered_stability( screening_results, scenario, tiers=stability_tiers, n_workers=n_workers, n_periods=n_periods, avg_time_per_run=avg_time_per_run, rank_by=rank_by, k_factor=k_factor, )