"""
Event Pipeline with explicit execution order.
The Pipeline manages event execution in a fixed, user-defined order.
Unlike traditional ECS systems with dependency-based topological sorting,
BAM Engine uses explicit ordering to ensure deterministic, reproducible
simulation behavior.
Design Philosophy
-----------------
Explicit ordering over dependency resolution:
- Users specify exact execution sequence via YAML
- No automatic reordering or optimization
- Guarantees pipeline matches legacy implementation
- Makes execution trace obvious for debugging
Pipeline YAML Format
--------------------
events:
- event_name # Single execution
- event_name x N # Repeat N times
Parameter substitution:
- event_{i} # Substitute {i} with parameter value
Examples
--------
Load and execute the default pipeline:
>>> import bamengine as be
>>> sim = be.Simulation.init(n_firms=100, seed=42)
>>> pipeline = create_default_pipeline(max_M=5, max_H=3, max_Z=2)
>>> pipeline.execute(sim)
Load a custom pipeline from YAML:
>>> from bamengine.core import Pipeline
>>> pipeline = Pipeline.from_yaml("my_custom_pipeline.yml", max_M=5, max_H=3, max_Z=2)
Modify an existing pipeline:
>>> # Insert custom event after standard event
>>> pipeline.insert_after("firms_adjust_price", "my_custom_pricing")
>>>
>>> # Remove an event
>>> pipeline.remove("labor_market_round")
>>>
>>> # Replace an event with custom implementation
>>> pipeline.replace("firms_decide_desired_production", "my_production_rule")
See Also
--------
:class:`~bamengine.core.event.Event` : Base class for all events
:func:`create_default_pipeline` : Factory for canonical BAM pipeline
:meth:`ConfigValidator.validate_pipeline_yaml() <bamengine.config.validator.ConfigValidator.validate_pipeline_yaml>` : Pipeline validation
"""
from __future__ import annotations
import re
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass, field
from importlib import resources
from pathlib import Path
from typing import TYPE_CHECKING, cast
import yaml
from bamengine.core.event import Event
from bamengine.core.registry import get_event
if TYPE_CHECKING: # pragma: no cover
from bamengine.simulation import Simulation
[docs]
@dataclass(slots=True)
class RepeatedEvent:
"""
Wrapper for events that execute multiple times per period.
Used for market rounds where agents interact over multiple rounds
(e.g., job applications, loan applications, shopping rounds).
Attributes
----------
event : Event
The event to repeat.
n_repeats : int
Number of times to execute the event.
Examples
--------
Wrap an event to repeat it multiple times:
>>> from bamengine.core.registry import get_event
>>> sim = Simulation.init(n_firms=100, seed=42)
>>> event_cls = get_event("labor_market_round")
>>> event = event_cls()
>>> repeated = RepeatedEvent(event, n_repeats=5)
>>> repeated.execute(sim) # Executes 5 times
See Also
--------
Pipeline.from_event_list : Build pipeline with repeated events
"""
event: Event
n_repeats: int
[docs]
def execute(self, sim: Simulation) -> None:
"""
Execute the event n_repeats times.
Parameters
----------
sim : Simulation
Simulation instance to operate on.
Returns
-------
None
All mutations are in-place.
"""
for _ in range(self.n_repeats):
self.event.execute(sim)
@property
def name(self) -> str:
"""
Return the name of the underlying event.
Returns
-------
str
Event name in snake_case.
"""
return self.event.name
[docs]
@dataclass(slots=True)
class Pipeline:
"""
Event execution pipeline with explicit ordering.
The Pipeline manages event execution in a fixed, user-defined order.
Events are executed sequentially in the exact order specified, with
no automatic dependency resolution or reordering.
Attributes
----------
events : list[Event]
Ordered list of event instances to execute.
_event_map : dict[str, Event]
Internal mapping from event names to instances for quick lookup.
Examples
--------
Create pipeline from event list:
>>> from bamengine.core import Pipeline
>>> sim = Simulation.init(n_firms=100, seed=42)
>>> pipeline = Pipeline.from_event_list(
... [
... "firms_decide_desired_production",
... "firms_adjust_price",
... "labor_market_round_0",
... ]
... )
>>> pipeline.execute(sim)
Load pipeline from YAML:
>>> pipeline = Pipeline.from_yaml("custom_pipeline.yml", max_M=5, max_H=3, max_Z=2)
Modify pipeline after creation:
>>> pipeline.insert_after("firms_adjust_price", "my_custom_event")
>>> pipeline.remove("labor_market_round")
>>> pipeline.replace("firms_decide_desired_production", "my_production_rule")
Notes
-----
The order of events is critical for correct simulation behavior.
Users are responsible for ensuring the order is logically correct.
See Also
--------
Pipeline.from_event_list : Build pipeline from event name list
Pipeline.from_yaml : Build pipeline from YAML configuration
create_default_pipeline : Factory for canonical BAM pipeline
"""
events: list[Event] = field(default_factory=list)
_event_map: dict[str, Event] = field(default_factory=dict, init=False, repr=False)
_after_event_callbacks: dict[str, list[Callable[[Simulation], None]]] = field(
default_factory=lambda: defaultdict(list), init=False, repr=False
)
[docs]
def __post_init__(self) -> None:
"""Build internal event mapping and initialize callbacks."""
self._event_map = {event.name: event for event in self.events}
# Ensure _after_event_callbacks is a defaultdict (in case of copy/pickle)
if not isinstance(self._after_event_callbacks, defaultdict): # pragma: no cover
self._after_event_callbacks = defaultdict(list, self._after_event_callbacks)
[docs]
@classmethod
def from_event_list(
cls,
event_names: list[str],
*,
repeats: dict[str, int] | None = None,
) -> Pipeline:
"""
Build pipeline from ordered list of event names.
Events are executed in the exact order provided. Users are
responsible for ensuring the order is logically correct.
Parameters
----------
event_names : list[str]
Event names in desired execution order.
repeats : dict[str, int], optional
Events that should repeat multiple times.
Format: {event_name: n_repeats}
Returns
-------
Pipeline
Pipeline with events in the order specified.
Raises
------
ValueError
If event name not found in registry.
Notes
-----
The order of events is critical for correct simulation behavior.
Use the default pipeline as a reference for the required ordering.
To apply extension hooks, call ``pipeline.apply_hooks(*event_classes)``
or ``sim.use_events(*event_classes)`` after pipeline creation.
"""
repeats = repeats or {}
# Instantiate events (wrap repeated ones)
event_instances = []
for name in event_names:
event_cls = get_event(name)
event = event_cls()
# Wrap in RepeatedEvent if specified
if name in repeats:
event = cast(
Event, cast(object, RepeatedEvent(event, n_repeats=repeats[name]))
)
event_instances.append(event)
return cls(events=event_instances)
[docs]
@classmethod
def from_yaml(
cls,
yaml_path: str | Path,
**params: int,
) -> Pipeline:
"""
Build pipeline from YAML configuration file.
The YAML file should have an 'events' key with a list of event
specifications. Supports special syntax:
- 'event_name' - single event
- 'event_name x N' - repeat event N times
Parameters can be substituted using {param_name} syntax.
Parameters
----------
yaml_path : str | Path
Path to YAML configuration file.
**params : int
Parameters to substitute in the YAML (e.g., max_M=5, max_H=3, max_Z=2).
Returns
-------
Pipeline
Pipeline with events parsed from YAML.
Raises
------
ValueError
If YAML format is invalid or event not found in registry.
Examples
--------
>>> pipeline = Pipeline.from_yaml("my_pipeline.yml", max_M=5, max_H=3, max_Z=2)
"""
# Read YAML file
yaml_path = Path(yaml_path)
with open(yaml_path) as f:
config = yaml.safe_load(f)
if "events" not in config:
raise ValueError(f"YAML file must have 'events' key: {yaml_path}")
event_specs = config["events"]
event_names = []
# Parse each event specification
for spec in event_specs:
# Substitute parameters
for param_name, param_value in params.items():
spec = spec.replace(f"{{{param_name}}}", str(param_value))
# Parse the spec
event_names.extend(cls._parse_event_spec(spec))
return cls.from_event_list(event_names)
@staticmethod
def _parse_event_spec(spec: str) -> list[str]:
"""
Parse event specification string into list of event names.
Supports:
- 'event_name' -> ['event_name']
- 'event_name x 3' -> ['event_name', 'event_name', 'event_name']
"""
spec = spec.strip()
# Pattern 1: Repeated event (event_name x N)
repeated_pattern = r"^(.+?)\s+x\s+(\d+)$"
match = re.match(repeated_pattern, spec)
if match:
event_name = match.group(1).strip()
count = int(match.group(2))
return [event_name] * count
# Pattern 2: Single event (event_name)
return [spec]
[docs]
def execute(self, sim: Simulation) -> None:
"""
Execute all events in pipeline order, firing callbacks after each event.
Parameters
----------
sim : Simulation
Simulation instance to operate on.
Returns
-------
None
All mutations are in-place.
Notes
-----
Callbacks registered via ``register_after_event()`` are fired after
each event completes. This is used for data capture timing in
SimulationResults.
"""
for event in self.events:
event.execute(sim)
# Fire registered callbacks for this event (for data capture timing)
for callback in self._after_event_callbacks.get(event.name, []):
callback(sim)
[docs]
def register_after_event(
self, event_name: str, callback: Callable[[Simulation], None]
) -> None:
"""
Register a callback to run after a specific event.
This is used by SimulationResults for configurable data capture timing.
Callbacks are fired in the order they were registered.
Parameters
----------
event_name : str
Name of the event after which to run the callback.
callback : Callable[[Simulation], None]
Function to call after the event executes.
Receives the Simulation instance as its only argument.
Notes
-----
This callback system is separate from the event hook system
(``@event(after=..., before=..., replace=...)``). Event hooks are
for inserting new events into the pipeline at initialization time.
This callback system is for running arbitrary code (like data capture)
after events during execution.
Examples
--------
>>> def capture_production(sim):
... print(f"Production: {sim.prod.production.sum()}")
>>> pipeline.register_after_event("firms_run_production", capture_production)
>>> pipeline.execute(sim) # Prints production after firms_run_production
"""
self._after_event_callbacks[event_name].append(callback)
[docs]
def clear_callbacks(self) -> None:
"""
Clear all registered after-event callbacks.
This should be called after a simulation run to ensure the pipeline
can be reused without accumulating callbacks from previous runs.
Examples
--------
>>> pipeline.register_after_event("firms_run_production", my_callback)
>>> pipeline.execute(sim)
>>> pipeline.clear_callbacks() # Remove all callbacks for reuse
"""
self._after_event_callbacks.clear()
[docs]
def insert_after(self, after: str, events: Event | str | list[Event | str]) -> None:
"""
Insert event(s) after specified event.
Parameters
----------
after : str
Event name to insert after.
events : Event | str | list[Event | str]
Event instance, event name, or list of events to insert.
If a list is provided, events are inserted in order.
Raises
------
ValueError
If 'after' event not found in pipeline.
Examples
--------
Insert a single event:
>>> pipeline.insert_after("firms_pay_dividends", "custom_event")
Insert multiple events:
>>> pipeline.insert_after(
... "firms_pay_dividends",
... [
... "event_a",
... "event_b",
... "event_c",
... ],
... )
"""
if after not in self._event_map:
raise ValueError(f"Event '{after}' not found in pipeline")
# Convert single event to list
event_list = events if isinstance(events, list) else [events]
# Find insertion point
idx = self.events.index(self._event_map[after])
# Insert events in reverse order to maintain list order
for event in reversed(event_list):
# Instantiate if name provided
if isinstance(event, str):
event_cls = get_event(event)
event = event_cls()
self.events.insert(idx + 1, event)
self._event_map[event.name] = event
[docs]
def insert_before(
self, before: str, events: Event | str | list[Event | str]
) -> None:
"""
Insert event(s) before specified event.
Parameters
----------
before : str
Event name to insert before.
events : Event | str | list[Event | str]
Event instance, event name, or list of events to insert.
If a list is provided, events are inserted in order.
Raises
------
ValueError
If 'before' event not found in pipeline.
Examples
--------
Insert a single event:
>>> pipeline.insert_before("firms_adjust_price", "pre_pricing_check")
Insert multiple events:
>>> pipeline.insert_before(
... "firms_adjust_price",
... [
... "event_a",
... "event_b",
... "event_c",
... ],
... )
"""
if before not in self._event_map:
raise ValueError(f"Event '{before}' not found in pipeline")
# Convert single event to list
event_list = events if isinstance(events, list) else [events]
# Find insertion point
idx = self.events.index(self._event_map[before])
# Insert events in reverse order to maintain list order
for event in reversed(event_list):
# Instantiate if name provided
if isinstance(event, str):
event_cls = get_event(event)
event = event_cls()
self.events.insert(idx, event)
self._event_map[event.name] = event
[docs]
def remove(self, event_name: str) -> None:
"""
Remove event from pipeline.
Parameters
----------
event_name : str
Name of event to remove.
Raises
------
ValueError
If event not found in pipeline.
"""
if event_name not in self._event_map:
raise ValueError(f"Event '{event_name}' not found in pipeline")
event = self._event_map[event_name]
self.events.remove(event)
del self._event_map[event_name]
[docs]
def replace(self, old_name: str, new_event: Event | str) -> None:
"""
Replace event with another event.
Parameters
----------
old_name : str
Name of event to replace.
new_event : Event | str
New event instance or event name.
Raises
------
ValueError
If old event not found in pipeline.
"""
if old_name not in self._event_map:
raise ValueError(f"Event '{old_name}' not found in pipeline")
# Instantiate if name provided
if isinstance(new_event, str):
event_cls = get_event(new_event)
new_event = event_cls()
# Replace in list
idx = self.events.index(self._event_map[old_name])
self.events[idx] = new_event
# Update mapping
del self._event_map[old_name]
self._event_map[new_event.name] = new_event
[docs]
def apply_hooks(self, *event_classes: type[Event]) -> None:
"""
Apply pipeline hooks from event classes.
Reads hook metadata (``_hook_replace``, ``_hook_after``, ``_hook_before``)
set by the ``@event(replace=..., after=..., before=...)`` decorator.
Classes without hook metadata are silently skipped.
Replace hooks are applied first, then after/before hooks.
If target event not in pipeline, hook is silently skipped.
If event already in pipeline, hook is silently skipped.
Parameters
----------
*event_classes : type
Event classes decorated with ``@event(after=..., before=..., or replace=...)``.
Examples
--------
>>> from extensions.rnd import RND_EVENTS
>>> pipeline.apply_hooks(*RND_EVENTS)
"""
# Apply replace hooks first
for cls in event_classes:
target = getattr(cls, "_hook_replace", None)
if target and target in self._event_map and cls.name not in self._event_map:
self.replace(target, cls.name)
# Apply after/before hooks with insertion point tracking
after_points: dict[str, str] = {}
before_points: dict[str, str] = {}
for cls in event_classes:
if cls.name in self._event_map:
continue
after_target = getattr(cls, "_hook_after", None)
if after_target and after_target in self._event_map:
insertion_point = after_points.get(after_target, after_target)
self.insert_after(insertion_point, cls.name)
after_points[after_target] = cls.name
before_target = getattr(cls, "_hook_before", None)
if before_target and before_target in self._event_map:
insertion_point = before_points.get(before_target, before_target)
self.insert_before(insertion_point, cls.name)
before_points[before_target] = cls.name
[docs]
def __len__(self) -> int:
"""Return number of events in pipeline."""
return len(self.events)
[docs]
def __repr__(self) -> str:
"""Provide informative repr."""
return f"Pipeline(n_events={len(self.events)})"
[docs]
def create_default_pipeline(
max_M: int,
max_H: int,
max_Z: int,
) -> Pipeline:
"""
Create default BAM simulation event pipeline.
Loads the pipeline from config/default_pipeline.yml and substitutes
market round parameters (max_M, max_H, max_Z).
The pipeline uses batch matching for labor and credit markets:
``labor_market_round`` repeated ``max_M`` times and
``credit_market_round`` repeated ``max_H`` times. The goods market
uses ``goods_market_round`` which handles all visits internally.
Parameters
----------
max_M : int
Number of labor market matching rounds.
max_H : int
Number of credit market matching rounds.
max_Z : int
Number of goods market shopping visits (passed to goods_market_round).
Returns
-------
Pipeline
Default BAM pipeline with all events in correct order.
Notes
-----
This function creates the "canonical" BAM pipeline. Users can modify
it using insert_after(), remove(), replace() methods, or create their
own pipeline from a custom YAML file using Pipeline.from_yaml().
"""
# Locate the default pipeline YAML file
try:
# Python 3.9+: importlib.resources.files() returns a Traversable.
# Use as_file() to obtain a real filesystem Path (mypy-compatible).
traversable = resources.files("bamengine") / "config" / "default_pipeline.yml"
with resources.as_file(traversable) as yaml_fs_path:
pipeline = Pipeline.from_yaml(
Path(yaml_fs_path),
max_M=max_M,
max_H=max_H,
max_Z=max_Z,
)
except (
AttributeError
): # pragma: no cover - Python 3.8 fallback, not tested on 3.13+
# Fallback for Python 3.8 where resources.files() is unavailable.
import bamengine
yaml_path = Path(bamengine.__file__).parent / "config" / "default_pipeline.yml"
pipeline = Pipeline.from_yaml(yaml_path, max_M=max_M, max_H=max_H, max_Z=max_Z)
return pipeline