.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto_examples/advanced/example_custom_pipeline.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note :ref:`Go to the end ` to download the full example code. .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_examples_advanced_example_custom_pipeline.py: =============== Custom Pipeline =============== This example demonstrates how to customize the event execution pipeline using YAML configuration. Custom pipelines let you reorder events, add custom events, or remove built-in events. You'll learn to: - Understand the default pipeline structure - Create custom pipeline YAML files - Use special syntax (repetition) - Add custom events to the pipeline - Load and execute custom pipelines .. GENERATED FROM PYTHON SOURCE LINES 20-34 What is the Pipeline? --------------------- The pipeline defines which events execute each period and in what order. BAM Engine's default pipeline has 8 phases: 1. **Planning**: Production targets, pricing 2. **Labor Market**: Job search and hiring 3. **Credit Market**: Loan applications and provision 4. **Production**: Wage payments, production 5. **Goods Market**: Shopping and consumption 6. **Revenue**: Sales revenue, debt repayment 7. **Bankruptcy**: Insolvency detection and exit 8. **Entry**: New firms/banks spawning .. GENERATED FROM PYTHON SOURCE LINES 34-48 .. code-block:: Python import tempfile from pathlib import Path import bamengine as bam # Initialize simulation sim = bam.Simulation.init(n_firms=50, n_households=250, seed=42) print("Default pipeline has events like:") print(" - firms_decide_desired_production") print(" - labor_market_round (repeated max_M times)") print(" - goods_market_round") .. rst-class:: sphx-glr-script-out .. code-block:: none Default pipeline has events like: - firms_decide_desired_production - labor_market_round (repeated max_M times) - goods_market_round .. GENERATED FROM PYTHON SOURCE LINES 49-61 Pipeline YAML Syntax -------------------- Pipelines are defined in YAML with special syntax: **Simple event**: ``- event_name`` **Repeated event**: ``- event_name x N`` Executes event N times **Parameter substitution**: ``{max_M}``, ``{max_H}`` Replaced with config values at load time .. GENERATED FROM PYTHON SOURCE LINES 61-80 .. code-block:: Python # Example pipeline YAML structure example_yaml = """ events: # Simple events - firms_decide_desired_production - firms_plan_breakeven_price - firms_plan_price # Batch shopping (handles all Z visits internally) - goods_market_round # Repeated batch matching (max_M rounds of labor market) - labor_market_round x {max_M} """ print("\nExample pipeline YAML:") print(example_yaml) .. rst-class:: sphx-glr-script-out .. code-block:: none Example pipeline YAML: events: # Simple events - firms_decide_desired_production - firms_plan_breakeven_price - firms_plan_price # Batch shopping (handles all Z visits internally) - goods_market_round # Repeated batch matching (max_M rounds of labor market) - labor_market_round x {max_M} .. GENERATED FROM PYTHON SOURCE LINES 81-85 Creating a Custom Pipeline -------------------------- Create a minimal pipeline for testing that skips unnecessary events. .. GENERATED FROM PYTHON SOURCE LINES 85-123 .. code-block:: Python # Minimal pipeline: just planning, production, and stats minimal_pipeline = """ # Minimal pipeline for testing # Skips labor market, credit market, goods market events: # Planning phase - firms_decide_desired_production - firms_plan_breakeven_price - firms_plan_price - update_avg_mkt_price # Skip labor/credit/goods markets for speed # Production (simplified) - firms_run_production """ # Write to temp file config_dir = Path(tempfile.mkdtemp()) pipeline_path = config_dir / "minimal_pipeline.yml" pipeline_path.write_text(minimal_pipeline) print(f"Created minimal pipeline at: {pipeline_path}") # Load and run with custom pipeline sim_minimal = bam.Simulation.init( n_firms=50, n_households=250, seed=42, pipeline_path=str(pipeline_path), ) print("\nRunning with minimal pipeline...") sim_minimal.run(n_periods=10) print("Completed 10 periods") .. rst-class:: sphx-glr-script-out .. code-block:: none Created minimal pipeline at: /tmp/tmpe_x4hq7r/minimal_pipeline.yml Running with minimal pipeline... Completed 10 periods .. GENERATED FROM PYTHON SOURCE LINES 124-131 Removing Events --------------- Create a pipeline without dividends (all profits retained). When removing events, you may need to add a replacement that handles any required state updates. Here we remove ``firms_pay_dividends`` and add a custom event to set retained_profit = net_profit. .. GENERATED FROM PYTHON SOURCE LINES 131-254 .. code-block:: Python from bamengine import event, ops @event class RetainAllProfits: """Custom event that retains all profits (no dividends). This replaces firms_pay_dividends - it sets retained_profit to net_profit so that firms_update_net_worth can add it to net worth. """ def execute(self, sim): bor = sim.get_role("Borrower") # All net profit is retained (no dividends paid) bor.retained_profit[:] = bor.net_profit no_dividends_pipeline = """ # Pipeline without dividend payments # Firms retain all profits via custom RetainAllProfits event events: # Planning - firms_decide_desired_production - firms_plan_breakeven_price - firms_plan_price - firms_decide_desired_labor - firms_decide_vacancies - firms_fire_excess_workers # Labor market - calc_inflation_rate - adjust_minimum_wage - firms_decide_wage_offer - workers_decide_firms_to_apply - labor_market_round x {max_M} - firms_calc_wage_bill # Credit market - banks_decide_credit_supply - banks_decide_interest_rate - firms_decide_credit_demand - firms_calc_financial_fragility - firms_prepare_loan_applications - credit_market_round x {max_H} - firms_fire_workers # Production - firms_pay_wages - workers_receive_wage - firms_run_production - update_avg_mkt_price - workers_update_contracts # Goods market - consumers_calc_propensity - consumers_decide_income_to_spend - consumers_decide_firms_to_visit - goods_market_round - consumers_finalize_purchases # Revenue (custom event replaces dividends) - firms_collect_revenue - firms_validate_debt_commitments - retain_all_profits # CUSTOM: replaces firms_pay_dividends # Bankruptcy - firms_update_net_worth - mark_bankrupt_firms - mark_bankrupt_banks # Entry - spawn_replacement_firms - spawn_replacement_banks """ # Write and test no_div_path = config_dir / "no_dividends_pipeline.yml" no_div_path.write_text(no_dividends_pipeline) print("Created pipeline without dividends") # Compare with and without dividends import matplotlib.pyplot as plt # Default pipeline (with dividends) sim_with_div = bam.Simulation.init(n_firms=100, n_households=500, seed=42) borr_with_div = sim_with_div.get_role("Borrower") nw_with_div = [] for _ in range(50): sim_with_div.step() nw_with_div.append(bam.ops.mean(borr_with_div.net_worth)) # Custom pipeline (no dividends) sim_no_div = bam.Simulation.init( n_firms=100, n_households=500, seed=42, pipeline_path=str(no_div_path), ) borr_no_div = sim_no_div.get_role("Borrower") nw_no_div = [] for _ in range(50): sim_no_div.step() nw_no_div.append(bam.ops.mean(borr_no_div.net_worth)) # Plot comparison fig, ax = plt.subplots(figsize=(10, 6)) ax.plot(nw_with_div, label="With Dividends (default)", linewidth=2) ax.plot(nw_no_div, label="No Dividends (custom pipeline)", linewidth=2) ax.set_xlabel("Period") ax.set_ylabel("Mean Firm Net Worth") ax.set_title("Effect of Dividend Policy on Firm Net Worth") ax.legend() ax.grid(True, alpha=0.3) plt.tight_layout() plt.show() print("\nFinal mean net worth:") print(f" With dividends: {nw_with_div[-1]:.2f}") print(f" No dividends: {nw_no_div[-1]:.2f}") .. image-sg:: /auto_examples/advanced/images/sphx_glr_example_custom_pipeline_001.png :alt: Effect of Dividend Policy on Firm Net Worth :srcset: /auto_examples/advanced/images/sphx_glr_example_custom_pipeline_001.png :class: sphx-glr-single-img .. rst-class:: sphx-glr-script-out .. code-block:: none Created pipeline without dividends Final mean net worth: With dividends: 11.28 No dividends: 11.36 .. GENERATED FROM PYTHON SOURCE LINES 255-259 Adding Custom Events -------------------- Define custom events and add them to the pipeline. .. GENERATED FROM PYTHON SOURCE LINES 259-386 .. code-block:: Python from bamengine import event @event class CollectSalesTax: """Collect sales tax from firm revenue.""" def execute(self, sim): prod = sim.get_role("Producer") borr = sim.get_role("Borrower") # 5% sales tax on revenue tax_rate = 0.05 # Approximate revenue as production * price revenue = ops.multiply(prod.production, prod.price) tax = ops.multiply(revenue, tax_rate) # Reduce net worth by tax borr.net_worth[:] = borr.net_worth - tax @event class PayUnemploymentBenefits: """Pay unemployment benefits to jobless workers.""" def execute(self, sim): wrk = sim.get_role("Worker") cons = sim.get_role("Consumer") # Benefit = 40% of minimum wage for unemployed benefit = sim.ec.min_wage * 0.4 unemployed = wrk.employer < 0 # Add to consumer income cons.income[:] = cons.income + ops.where(unemployed, benefit, 0.0) print("\nCustom events defined:") print(" - collect_sales_tax") print(" - pay_unemployment_benefits") # Pipeline with custom events custom_events_pipeline = """ # Pipeline with custom tax and benefits events: # Planning - firms_decide_desired_production - firms_plan_breakeven_price - firms_plan_price - firms_decide_desired_labor - firms_decide_vacancies - firms_fire_excess_workers # Labor market - calc_inflation_rate - adjust_minimum_wage - firms_decide_wage_offer - workers_decide_firms_to_apply - labor_market_round x {max_M} - firms_calc_wage_bill # Credit market - banks_decide_credit_supply - banks_decide_interest_rate - firms_decide_credit_demand - firms_calc_financial_fragility - firms_prepare_loan_applications - credit_market_round x {max_H} - firms_fire_workers # Production - firms_pay_wages - workers_receive_wage - firms_run_production - update_avg_mkt_price - workers_update_contracts # CUSTOM: Unemployment benefits (after wage receipt) - pay_unemployment_benefits # Goods market - consumers_calc_propensity - consumers_decide_income_to_spend - consumers_decide_firms_to_visit - goods_market_round - consumers_finalize_purchases # Revenue - firms_collect_revenue # CUSTOM: Sales tax (after revenue collection) - collect_sales_tax - firms_validate_debt_commitments - firms_pay_dividends # Bankruptcy - firms_update_net_worth - mark_bankrupt_firms - mark_bankrupt_banks # Entry - spawn_replacement_firms - spawn_replacement_banks """ custom_path = config_dir / "custom_events_pipeline.yml" custom_path.write_text(custom_events_pipeline) # Run with custom events sim_custom = bam.Simulation.init( n_firms=100, n_households=500, seed=42, pipeline_path=str(custom_path), ) print("\nRunning with custom events pipeline...") sim_custom.run(n_periods=30) print("Completed 30 periods with tax and benefits") # Calculate unemployment from Worker.employed (calc_unemployment_rate deprecated) wrk_custom = sim_custom.get_role("Worker") unemployment_custom = 1 - bam.ops.mean(wrk_custom.employed.astype(float)) print(f"Final unemployment: {unemployment_custom:.2%}") .. rst-class:: sphx-glr-script-out .. code-block:: none Custom events defined: - collect_sales_tax - pay_unemployment_benefits Running with custom events pipeline... Completed 30 periods with tax and benefits Final unemployment: 0.00% .. GENERATED FROM PYTHON SOURCE LINES 387-393 Batch Market Events ------------------- Market matching uses batch events that handle both sides of the market (applications and matching) in a single vectorized step per round. The ``x N`` repetition syntax runs multiple rounds. .. GENERATED FROM PYTHON SOURCE LINES 393-410 .. code-block:: Python batch_explanation = """ Repetition syntax: event_name x N For N=4, the event executes 4 times: 1. event_name (round 0) 2. event_name (round 1) 3. event_name (round 2) 4. event_name (round 3) Used in BAM for: - Labor market: labor_market_round x {max_M} - Credit market: credit_market_round x {max_H} - Goods market: goods_market_round (single event, handles all Z visits internally) """ print(batch_explanation) .. rst-class:: sphx-glr-script-out .. code-block:: none Repetition syntax: event_name x N For N=4, the event executes 4 times: 1. event_name (round 0) 2. event_name (round 1) 3. event_name (round 2) 4. event_name (round 3) Used in BAM for: - Labor market: labor_market_round x {max_M} - Credit market: credit_market_round x {max_H} - Goods market: goods_market_round (single event, handles all Z visits internally) .. GENERATED FROM PYTHON SOURCE LINES 411-415 Parameter Substitution ---------------------- Use ``{param_name}`` in pipeline YAML to substitute config values. .. GENERATED FROM PYTHON SOURCE LINES 415-497 .. code-block:: Python param_example = """ # Available parameters: # {max_M} - Labor market matching rounds per period # {max_H} - Credit market matching rounds per period events: # 4 rounds of labor market matching - labor_market_round x {max_M} # 2 rounds of credit market matching - credit_market_round x {max_H} # Batch-sequential shopping (handles all Z visits internally) - goods_market_round # The actual values come from: # - defaults.yml (max_M: 4, max_H: 2, max_Z: 2) # - Or your custom config """ print("Parameter substitution in pipelines:") print(param_example) # Test with different friction values # Uses default pipeline - the key difference is in max_M, max_H, max_Z config parameters # Low friction (many search rounds) sim_low_friction = bam.Simulation.init( n_firms=100, n_households=500, max_M=6, max_H=4, max_Z=4, # More search rounds seed=42, ) # High friction (few search rounds) sim_high_friction = bam.Simulation.init( n_firms=100, n_households=500, max_M=2, max_H=1, max_Z=1, # Fewer rounds seed=42, ) # Run both n_periods = 50 unemp_low = [] unemp_high = [] # Get worker roles for unemployment calculation wrk_low = sim_low_friction.get_role("Worker") wrk_high = sim_high_friction.get_role("Worker") for _ in range(n_periods): sim_low_friction.step() sim_high_friction.step() # Calculate unemployment from Worker.employed (calc_unemployment_rate deprecated) unemp_low.append(1 - bam.ops.mean(wrk_low.employed.astype(float))) unemp_high.append(1 - bam.ops.mean(wrk_high.employed.astype(float))) # Plot fig, ax = plt.subplots(figsize=(10, 6)) ax.plot( bam.ops.multiply(bam.ops.asarray(unemp_low), 100), label="Low Friction (M=6, H=4, Z=4)", linewidth=2, ) ax.plot( bam.ops.multiply(bam.ops.asarray(unemp_high), 100), label="High Friction (M=2, H=1, Z=1)", linewidth=2, ) ax.set_xlabel("Period") ax.set_ylabel("Unemployment Rate (%)") ax.set_title("Effect of Search Frictions on Unemployment") ax.legend() ax.grid(True, alpha=0.3) plt.tight_layout() plt.show() .. image-sg:: /auto_examples/advanced/images/sphx_glr_example_custom_pipeline_002.png :alt: Effect of Search Frictions on Unemployment :srcset: /auto_examples/advanced/images/sphx_glr_example_custom_pipeline_002.png :class: sphx-glr-single-img .. rst-class:: sphx-glr-script-out .. code-block:: none Parameter substitution in pipelines: # Available parameters: # {max_M} - Labor market matching rounds per period # {max_H} - Credit market matching rounds per period events: # 4 rounds of labor market matching - labor_market_round x {max_M} # 2 rounds of credit market matching - credit_market_round x {max_H} # Batch-sequential shopping (handles all Z visits internally) - goods_market_round # The actual values come from: # - defaults.yml (max_M: 4, max_H: 2, max_Z: 2) # - Or your custom config .. GENERATED FROM PYTHON SOURCE LINES 498-500 Cleanup ------- .. GENERATED FROM PYTHON SOURCE LINES 500-506 .. code-block:: Python import shutil shutil.rmtree(config_dir) print("\nTemp files cleaned up.") .. rst-class:: sphx-glr-script-out .. code-block:: none Temp files cleaned up. .. GENERATED FROM PYTHON SOURCE LINES 507-517 Key Takeaways ------------- - Pipelines define event execution order - Use YAML format with ``events:`` list - Special syntax: ``x N`` for repetition - Parameter substitution: ``{max_M}``, ``{max_H}`` - Remove events by commenting/deleting from YAML - Add custom events by defining with ``@event`` and including in YAML - Load via ``pipeline_path`` parameter in ``Simulation.init()`` .. rst-class:: sphx-glr-timing **Total running time of the script:** (0 minutes 1.381 seconds) .. _sphx_glr_download_auto_examples_advanced_example_custom_pipeline.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: example_custom_pipeline.ipynb ` .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: example_custom_pipeline.py ` .. container:: sphx-glr-download sphx-glr-download-zip :download:`Download zipped: example_custom_pipeline.zip ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_