Pipeline#
- class bamengine.core.pipeline.Pipeline(events=<factory>)[source]#
Bases:
objectEvent execution pipeline with explicit ordering.
The Pipeline manages event execution in a fixed, user-defined order. Events are executed sequentially in the exact order specified, with no automatic dependency resolution or reordering.
- Variables:
events (
list[Event]) – Ordered list of event instances to execute._event_map (
dict[str,Event]) – Internal mapping from event names to instances for quick lookup.
Examples
Create pipeline from event list:
>>> from bamengine.core import Pipeline >>> sim = Simulation.init(n_firms=100, seed=42) >>> pipeline = Pipeline.from_event_list( ... [ ... "firms_decide_desired_production", ... "firms_adjust_price", ... "labor_market_round_0", ... ] ... ) >>> pipeline.execute(sim)
Load pipeline from YAML:
>>> pipeline = Pipeline.from_yaml("custom_pipeline.yml", max_M=5, max_H=3, max_Z=2)
Modify pipeline after creation:
>>> pipeline.insert_after("firms_adjust_price", "my_custom_event") >>> pipeline.remove("labor_market_round") >>> pipeline.replace("firms_decide_desired_production", "my_production_rule")
Notes
The order of events is critical for correct simulation behavior. Users are responsible for ensuring the order is logically correct.
See also
Pipeline.from_event_listBuild pipeline from event name list
Pipeline.from_yamlBuild pipeline from YAML configuration
create_default_pipelineFactory for canonical BAM pipeline
- events#
- classmethod from_event_list(event_names, *, repeats=None)[source]#
Build pipeline from ordered list of event names.
Events are executed in the exact order provided. Users are responsible for ensuring the order is logically correct.
- Parameters:
event_names (
list[str]) – Event names in desired execution order.repeats (
dict[str,int], optional) – Events that should repeat multiple times. Format: {event_name: n_repeats}
- Returns:
Pipeline with events in the order specified.
- Return type:
- Raises:
ValueError – If event name not found in registry.
Notes
The order of events is critical for correct simulation behavior. Use the default pipeline as a reference for the required ordering.
To apply extension hooks, call
pipeline.apply_hooks(*event_classes)orsim.use_events(*event_classes)after pipeline creation.
- classmethod from_yaml(yaml_path, **params)[source]#
Build pipeline from YAML configuration file.
The YAML file should have an ‘events’ key with a list of event specifications. Supports special syntax: - ‘event_name’ - single event - ‘event_name x N’ - repeat event N times
Parameters can be substituted using {param_name} syntax.
- Parameters:
yaml_path (
str | Path) – Path to YAML configuration file.**params (
int) – Parameters to substitute in the YAML (e.g., max_M=5, max_H=3, max_Z=2).
- Returns:
Pipeline with events parsed from YAML.
- Return type:
- Raises:
ValueError – If YAML format is invalid or event not found in registry.
Examples
>>> pipeline = Pipeline.from_yaml("my_pipeline.yml", max_M=5, max_H=3, max_Z=2)
- execute(sim)[source]#
Execute all events in pipeline order, firing callbacks after each event.
- Parameters:
sim (
Simulation) – Simulation instance to operate on.- Returns:
All mutations are in-place.
- Return type:
Notes
Callbacks registered via
register_after_event()are fired after each event completes. This is used for data capture timing in SimulationResults.
- register_after_event(event_name, callback)[source]#
Register a callback to run after a specific event.
This is used by SimulationResults for configurable data capture timing. Callbacks are fired in the order they were registered.
- Parameters:
event_name (
str) – Name of the event after which to run the callback.callback (
Callable[[Simulation],None]) – Function to call after the event executes. Receives the Simulation instance as its only argument.
Notes
This callback system is separate from the event hook system (
@event(after=..., before=..., replace=...)). Event hooks are for inserting new events into the pipeline at initialization time. This callback system is for running arbitrary code (like data capture) after events during execution.Examples
>>> def capture_production(sim): ... print(f"Production: {sim.prod.production.sum()}") >>> pipeline.register_after_event("firms_run_production", capture_production) >>> pipeline.execute(sim) # Prints production after firms_run_production
- clear_callbacks()[source]#
Clear all registered after-event callbacks.
This should be called after a simulation run to ensure the pipeline can be reused without accumulating callbacks from previous runs.
Examples
>>> pipeline.register_after_event("firms_run_production", my_callback) >>> pipeline.execute(sim) >>> pipeline.clear_callbacks() # Remove all callbacks for reuse
- insert_after(after, events)[source]#
Insert event(s) after specified event.
- Parameters:
after (
str) – Event name to insert after.events (
Event | str | list[Event | str]) – Event instance, event name, or list of events to insert. If a list is provided, events are inserted in order.
- Raises:
ValueError – If ‘after’ event not found in pipeline.
Examples
Insert a single event:
>>> pipeline.insert_after("firms_pay_dividends", "custom_event")
Insert multiple events:
>>> pipeline.insert_after( ... "firms_pay_dividends", ... [ ... "event_a", ... "event_b", ... "event_c", ... ], ... )
- insert_before(before, events)[source]#
Insert event(s) before specified event.
- Parameters:
before (
str) – Event name to insert before.events (
Event | str | list[Event | str]) – Event instance, event name, or list of events to insert. If a list is provided, events are inserted in order.
- Raises:
ValueError – If ‘before’ event not found in pipeline.
Examples
Insert a single event:
>>> pipeline.insert_before("firms_adjust_price", "pre_pricing_check")
Insert multiple events:
>>> pipeline.insert_before( ... "firms_adjust_price", ... [ ... "event_a", ... "event_b", ... "event_c", ... ], ... )
- remove(event_name)[source]#
Remove event from pipeline.
- Parameters:
event_name (
str) – Name of event to remove.- Raises:
ValueError – If event not found in pipeline.
- replace(old_name, new_event)[source]#
Replace event with another event.
- Parameters:
old_name (
str) – Name of event to replace.new_event (
Event | str) – New event instance or event name.
- Raises:
ValueError – If old event not found in pipeline.
- apply_hooks(*event_classes)[source]#
Apply pipeline hooks from event classes.
Reads hook metadata (
_hook_replace,_hook_after,_hook_before) set by the@event(replace=..., after=..., before=...)decorator. Classes without hook metadata are silently skipped.Replace hooks are applied first, then after/before hooks. If target event not in pipeline, hook is silently skipped. If event already in pipeline, hook is silently skipped.
- Parameters:
*event_classes (
type) – Event classes decorated with@event(after=..., before=..., or replace=...).
Examples
>>> from extensions.rnd import RND_EVENTS >>> pipeline.apply_hooks(*RND_EVENTS)
- __init__(events=<factory>)#