Pipeline#

class bamengine.core.pipeline.Pipeline(events=<factory>)[source]#

Bases: object

Event execution pipeline with explicit ordering.

The Pipeline manages event execution in a fixed, user-defined order. Events are executed sequentially in the exact order specified, with no automatic dependency resolution or reordering.

Variables:
  • events (list[Event]) – Ordered list of event instances to execute.

  • _event_map (dict[str, Event]) – Internal mapping from event names to instances for quick lookup.

Examples

Create pipeline from event list:

>>> from bamengine.core import Pipeline
>>> sim = Simulation.init(n_firms=100, seed=42)
>>> pipeline = Pipeline.from_event_list(
...     [
...         "firms_decide_desired_production",
...         "firms_adjust_price",
...         "labor_market_round_0",
...     ]
... )
>>> pipeline.execute(sim)

Load pipeline from YAML:

>>> pipeline = Pipeline.from_yaml("custom_pipeline.yml", max_M=5, max_H=3, max_Z=2)

Modify pipeline after creation:

>>> pipeline.insert_after("firms_adjust_price", "my_custom_event")
>>> pipeline.remove("labor_market_round")
>>> pipeline.replace("firms_decide_desired_production", "my_production_rule")

Notes

The order of events is critical for correct simulation behavior. Users are responsible for ensuring the order is logically correct.

See also

Pipeline.from_event_list

Build pipeline from event name list

Pipeline.from_yaml

Build pipeline from YAML configuration

create_default_pipeline

Factory for canonical BAM pipeline

events#
__post_init__()[source]#

Build internal event mapping and initialize callbacks.

classmethod from_event_list(event_names, *, repeats=None)[source]#

Build pipeline from ordered list of event names.

Events are executed in the exact order provided. Users are responsible for ensuring the order is logically correct.

Parameters:
  • event_names (list[str]) – Event names in desired execution order.

  • repeats (dict[str, int], optional) – Events that should repeat multiple times. Format: {event_name: n_repeats}

Returns:

Pipeline with events in the order specified.

Return type:

Pipeline

Raises:

ValueError – If event name not found in registry.

Notes

The order of events is critical for correct simulation behavior. Use the default pipeline as a reference for the required ordering.

To apply extension hooks, call pipeline.apply_hooks(*event_classes) or sim.use_events(*event_classes) after pipeline creation.

classmethod from_yaml(yaml_path, **params)[source]#

Build pipeline from YAML configuration file.

The YAML file should have an ‘events’ key with a list of event specifications. Supports special syntax: - ‘event_name’ - single event - ‘event_name x N’ - repeat event N times

Parameters can be substituted using {param_name} syntax.

Parameters:
  • yaml_path (str | Path) – Path to YAML configuration file.

  • **params (int) – Parameters to substitute in the YAML (e.g., max_M=5, max_H=3, max_Z=2).

Returns:

Pipeline with events parsed from YAML.

Return type:

Pipeline

Raises:

ValueError – If YAML format is invalid or event not found in registry.

Examples

>>> pipeline = Pipeline.from_yaml("my_pipeline.yml", max_M=5, max_H=3, max_Z=2)
execute(sim)[source]#

Execute all events in pipeline order, firing callbacks after each event.

Parameters:

sim (Simulation) – Simulation instance to operate on.

Returns:

All mutations are in-place.

Return type:

None

Notes

Callbacks registered via register_after_event() are fired after each event completes. This is used for data capture timing in SimulationResults.

register_after_event(event_name, callback)[source]#

Register a callback to run after a specific event.

This is used by SimulationResults for configurable data capture timing. Callbacks are fired in the order they were registered.

Parameters:
  • event_name (str) – Name of the event after which to run the callback.

  • callback (Callable[[Simulation], None]) – Function to call after the event executes. Receives the Simulation instance as its only argument.

Notes

This callback system is separate from the event hook system (@event(after=..., before=..., replace=...)). Event hooks are for inserting new events into the pipeline at initialization time. This callback system is for running arbitrary code (like data capture) after events during execution.

Examples

>>> def capture_production(sim):
...     print(f"Production: {sim.prod.production.sum()}")
>>> pipeline.register_after_event("firms_run_production", capture_production)
>>> pipeline.execute(sim)  # Prints production after firms_run_production
clear_callbacks()[source]#

Clear all registered after-event callbacks.

This should be called after a simulation run to ensure the pipeline can be reused without accumulating callbacks from previous runs.

Examples

>>> pipeline.register_after_event("firms_run_production", my_callback)
>>> pipeline.execute(sim)
>>> pipeline.clear_callbacks()  # Remove all callbacks for reuse
insert_after(after, events)[source]#

Insert event(s) after specified event.

Parameters:
  • after (str) – Event name to insert after.

  • events (Event | str | list[Event | str]) – Event instance, event name, or list of events to insert. If a list is provided, events are inserted in order.

Raises:

ValueError – If ‘after’ event not found in pipeline.

Examples

Insert a single event:

>>> pipeline.insert_after("firms_pay_dividends", "custom_event")

Insert multiple events:

>>> pipeline.insert_after(
...     "firms_pay_dividends",
...     [
...         "event_a",
...         "event_b",
...         "event_c",
...     ],
... )
insert_before(before, events)[source]#

Insert event(s) before specified event.

Parameters:
  • before (str) – Event name to insert before.

  • events (Event | str | list[Event | str]) – Event instance, event name, or list of events to insert. If a list is provided, events are inserted in order.

Raises:

ValueError – If ‘before’ event not found in pipeline.

Examples

Insert a single event:

>>> pipeline.insert_before("firms_adjust_price", "pre_pricing_check")

Insert multiple events:

>>> pipeline.insert_before(
...     "firms_adjust_price",
...     [
...         "event_a",
...         "event_b",
...         "event_c",
...     ],
... )
remove(event_name)[source]#

Remove event from pipeline.

Parameters:

event_name (str) – Name of event to remove.

Raises:

ValueError – If event not found in pipeline.

replace(old_name, new_event)[source]#

Replace event with another event.

Parameters:
  • old_name (str) – Name of event to replace.

  • new_event (Event | str) – New event instance or event name.

Raises:

ValueError – If old event not found in pipeline.

apply_hooks(*event_classes)[source]#

Apply pipeline hooks from event classes.

Reads hook metadata (_hook_replace, _hook_after, _hook_before) set by the @event(replace=..., after=..., before=...) decorator. Classes without hook metadata are silently skipped.

Replace hooks are applied first, then after/before hooks. If target event not in pipeline, hook is silently skipped. If event already in pipeline, hook is silently skipped.

Parameters:

*event_classes (type) – Event classes decorated with @event(after=..., before=..., or replace=...).

Examples

>>> from extensions.rnd import RND_EVENTS
>>> pipeline.apply_hooks(*RND_EVENTS)
__len__()[source]#

Return number of events in pipeline.

__repr__()[source]#

Provide informative repr.

__init__(events=<factory>)#