"""Scoring and status check functions for validation.
This module provides functions to compute scores and check statuses
for validation metrics.
"""
from __future__ import annotations
import math
from validation.types import StabilityResult, Status
# =============================================================================
# Fail Escalation Constants
# =============================================================================
# Weight-based escalation: clamp(INTERCEPT - SLOPE * weight, FLOOR, CEILING)
# Higher weight → lower multiplier (stricter, fails more easily)
# Lower weight → higher multiplier (more lenient, harder to fail)
_FAIL_INTERCEPT = 5.0
_FAIL_SLOPE = 2.0
_FAIL_FLOOR = 0.5 # Strictest: high-weight metrics fail faster than normal
_FAIL_CEILING = 5.0 # Most lenient: low-weight metrics need extreme deviation
[docs]
def fail_escalation_multiplier(weight: float) -> float:
"""Compute the fail-escalation multiplier from a metric's weight.
The multiplier scales the WARN→FAIL boundary in status check functions.
A multiplier < 1 shrinks the WARN zone (stricter), > 1 widens it (more
lenient).
Mapping (with default constants):
weight 3.0 → 0.5 (FAIL at 0.5× normal threshold)
weight 2.0 → 1.0 (normal behaviour)
weight 1.5 → 2.0 (FAIL at 2× normal threshold)
weight 1.0 → 3.0 (FAIL at 3× normal threshold)
weight 0.5 → 4.0 (FAIL at 4× normal threshold)
Parameters
----------
weight : float
Metric weight (typically 0.5–3.0).
Returns
-------
float
Escalation multiplier, clamped to [_FAIL_FLOOR, _FAIL_CEILING].
"""
raw = _FAIL_INTERCEPT - _FAIL_SLOPE * weight
return max(_FAIL_FLOOR, min(_FAIL_CEILING, raw))
# =============================================================================
# Scoring Functions
# =============================================================================
[docs]
def score_mean_tolerance(actual: float, target: float, tolerance: float) -> float:
"""Score from 0-1 based on distance from target.
Returns 1.0 if exactly on target, 0.5 at distance == tolerance,
and 0.0 at distance >= 2 * tolerance (linear decay throughout).
"""
distance = abs(actual - target) / tolerance
return max(0.0, 1.0 - distance / 2.0)
[docs]
def score_range(actual: float, min_val: float, max_val: float) -> float:
"""Score from 0-1 based on position relative to range.
Returns 0.75-1.0 if inside range (higher near center).
Returns 0.0-0.75 if outside range (decays with distance).
"""
range_size = max_val - min_val
if range_size == 0:
return 1.0 if actual == min_val else 0.0
if min_val <= actual <= max_val:
# Inside range: score based on distance to center
center = (min_val + max_val) / 2
half_range = range_size / 2
distance_from_center = abs(actual - center) / half_range
return 1.0 - 0.25 * distance_from_center # 0.75-1.0
else:
# Outside range: decay toward 0
if actual < min_val:
overshoot = (min_val - actual) / range_size
else:
overshoot = (actual - max_val) / range_size
return max(0.0, 0.75 - overshoot) # 0.0-0.75
[docs]
def score_pct_within_target(
actual_pct: float, target_pct: float, min_pct: float
) -> float:
"""Score 0-1 for percentage meeting target.
Returns 1.0 if actual >= target, scores proportionally if >= min,
and penalizes below min.
"""
if actual_pct >= target_pct:
return 1.0
elif actual_pct >= min_pct:
progress = (actual_pct - min_pct) / (target_pct - min_pct)
return 0.75 + 0.25 * progress
else:
if min_pct > 0:
shortfall = (min_pct - actual_pct) / min_pct
return max(0.0, 0.75 * (1 - shortfall))
return 0.0
[docs]
def score_outlier_penalty(
outlier_pct: float, max_outlier_pct: float, penalty_weight: float = 2.0
) -> float:
"""Score 0-1 with exponential penalty for excessive outliers.
Returns 1.0 if outlier_pct <= max_outlier_pct, else exponentially
decays based on how much the actual exceeds the maximum allowed.
"""
if outlier_pct <= max_outlier_pct:
return 1.0
if max_outlier_pct == 0.0:
return 0.0
excess = outlier_pct - max_outlier_pct
return max(0.0, math.exp(-penalty_weight * excess / max_outlier_pct))
# =============================================================================
# Status Check Functions
# =============================================================================
[docs]
def check_mean_tolerance(
actual: float,
target: float,
tolerance: float,
warn_multiplier: float = 2.0,
escalation: float = 1.0,
) -> Status:
"""Check if actual value is within tolerance of target.
Returns:
PASS if within tolerance
WARN if within warn_multiplier * escalation * tolerance
FAIL otherwise
"""
diff = abs(actual - target)
if diff <= tolerance:
return "PASS"
elif diff <= tolerance * warn_multiplier * escalation:
return "WARN"
return "FAIL"
[docs]
def check_range(
actual: float,
min_val: float,
max_val: float,
warn_buffer: float = 0.5,
escalation: float = 1.0,
) -> Status:
"""Check if actual value is within range.
Returns:
PASS if within [min_val, max_val]
WARN if within extended range (buffer * escalation applied)
FAIL otherwise
"""
range_size = max_val - min_val
effective_buffer = warn_buffer * escalation
if min_val <= actual <= max_val:
return "PASS"
elif (
(min_val - effective_buffer * range_size)
<= actual
<= (max_val + effective_buffer * range_size)
):
return "WARN"
return "FAIL"
[docs]
def check_pct_within_target(
actual_pct: float,
target_pct: float,
min_pct: float,
escalation: float = 1.0,
) -> Status:
"""Check if percentage within target meets threshold.
With escalation, the WARN zone extends below ``min_pct`` proportionally
to the original WARN-zone width ``(target_pct - min_pct)``.
Returns:
PASS if actual >= target_pct
WARN if actual >= effective_min
FAIL otherwise
"""
effective_min = max(0.0, min_pct - (target_pct - min_pct) * (escalation - 1.0))
if actual_pct >= target_pct:
return "PASS"
elif actual_pct >= effective_min:
return "WARN"
return "FAIL"
[docs]
def check_outlier_penalty(
outlier_pct: float,
max_outlier_pct: float,
severe_multiplier: float = 2.0,
escalation: float = 1.0,
) -> Status:
"""Check if outlier percentage is within acceptable limits.
Returns:
PASS if outlier_pct <= max_outlier_pct
WARN if outlier_pct <= max_outlier_pct * severe_multiplier * escalation
FAIL otherwise
"""
if outlier_pct <= max_outlier_pct:
return "PASS"
elif outlier_pct <= max_outlier_pct * severe_multiplier * escalation:
return "WARN"
return "FAIL"
# =============================================================================
# Improvement Scoring (buffer-stock vs growth_plus)
# =============================================================================
[docs]
def check_improvement(
delta: float,
weight: float,
max_degradation_base: float = 0.10,
) -> Status:
"""Check if a metric's score delta indicates acceptable change.
Uses a weight-aware degradation threshold: high-weight metrics tolerate
less degradation than low-weight ones.
Parameters
----------
delta : float
Score delta (buffer_stock_score - growth_plus_score).
Positive means improvement, negative means degradation.
weight : float
Metric weight (from the Growth+ metric spec).
max_degradation_base : float
Base degradation threshold. Actual threshold = base / weight.
Returns
-------
Status
PASS if delta >= 0 (improved or same).
WARN if degradation within threshold.
FAIL if degradation exceeds threshold.
"""
if delta >= 0:
return "PASS"
threshold = max_degradation_base / max(weight, 0.1)
if abs(delta) <= threshold:
return "WARN"
return "FAIL"
[docs]
def score_improvement(delta: float) -> float:
"""Score from 0-1 based on improvement delta.
Returns ``max(0, min(1, 1 + delta))``.
Improvement (delta > 0) yields score close to 1.0.
Degradation (delta < 0) penalizes toward 0.0.
Parameters
----------
delta : float
Score delta (buffer_stock_score - growth_plus_score).
Returns
-------
float
Score in [0.0, 1.0].
"""
return max(0.0, min(1.0, 1.0 + delta))
# =============================================================================
# Aggregate Score Functions
# =============================================================================
[docs]
def compute_combined_score(stability: StabilityResult) -> float:
"""Compute combined score balancing accuracy and stability.
Formula: mean_score * pass_rate * (1 - std_score)
- Higher mean_score is better
- Higher pass_rate is better
- Lower std_score is better
Parameters
----------
stability : StabilityResult
Result from run_stability_test().
Returns
-------
float
Combined score (higher is better).
"""
return stability.mean_score * stability.pass_rate * (1 - stability.std_score)
# =============================================================================
# Status Helpers
# =============================================================================
STATUS_COLORS: dict[Status, str] = {
"PASS": "lightgreen",
"WARN": "lightyellow",
"FAIL": "lightcoral",
}
[docs]
def worst_status(*statuses: Status) -> Status:
"""Return the most severe status from the given statuses."""
if "FAIL" in statuses:
return "FAIL"
if "WARN" in statuses:
return "WARN"
return "PASS"