Skip to content

Improved ExecutionPlan/TermGraph performance#289

Open
everling wants to merge 4 commits intostefan-jansen:mainfrom
everling:planning-perf
Open

Improved ExecutionPlan/TermGraph performance#289
everling wants to merge 4 commits intostefan-jansen:mainfrom
everling:planning-perf

Conversation

@everling
Copy link

Hi,
For sufficiently large pipelines the execution planning stage becomes a bottleneck. I've made 3 changes in zipline/pipeline/graph.py that reduce redundant operations and improve time complexity properties.

Changes:

  • Graph nodes are added once (_add_to_graph). The cyclical dependency check should still work.
  • Instead of calling dict(self.graph.nodes())[term] for every call to _ensure_extra_rows, init the dict once.
  • set_extra_rows stops early if the term has already been ensured for more rows than the current min_extra_rows parameter

I'm pretty sure functionality remains intact, but it would be great if someone could have a look, as the performance gains are tremendous. I've compared pipeline execution times across commits and pipeline depths:

image

The pipeline is a simple branching DAG:

import sqlalchemy as sa
import pandas as pd
import numpy as np
from zipline.assets import AssetDBWriter, AssetFinder
from zipline.pipeline.domain import US_EQUITIES
from zipline.pipeline.loaders.frame import DataFrameLoader
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline import Pipeline
from zipline.pipeline import SimplePipelineEngine
import time

# make dummy data
engine = sa.create_engine("sqlite:///:memory:")
writer = AssetDBWriter(engine)

asset_start = pd.Timestamp("2000-01-01", tz='UTC')
asset_end = pd.Timestamp("2050-12-31", tz='UTC')

equities_data = pd.DataFrame(
    {
        "sid": [1, 2],
        "symbol": ["AAPL", "GOOG"],
        "asset_name": ["Apple Inc.", "Alphabet Inc."],
        "start_date": [asset_start, asset_start],
        "end_date": [asset_end, asset_end],
        "first_traded": [asset_start, asset_start],
        "exchange": ["NYSE", "NASDAQ"],
        "security_end_date": [asset_end, asset_end],
    }
)
exchanges_data = pd.DataFrame(
    {
        "exchange": ["NYSE", "NASDAQ"],
        "country_code": ["US", "US"],
    }
)
writer.write(
    equities=equities_data,
    exchanges=exchanges_data,
)

dates = US_EQUITIES.calendar.sessions_in_range("2000","2026")
baseline = pd.DataFrame(index=dates, columns=equities_data.index, 
                        data=np.random.random((len(dates), len(equities_data.index))))
frame_loader  = DataFrameLoader(column=USEquityPricing.close, baseline=baseline )

def get_dummy_loader(column):
    return frame_loader

asset_finder = AssetFinder(engine)
engine = SimplePipelineEngine(get_loader=get_dummy_loader, asset_finder=asset_finder)
start_date = pd.Timestamp("2022-01-03")
end_date = pd.Timestamp("2024-01-03")
stats = {}

# make increasingly deep pipelines
for depth in range(1, 13):
    out = USEquityPricing.close.latest
    for i in range(depth):
        out = (out > np.random.random()).if_else(out, out / 2)
    p = Pipeline(columns={"out":out})
    start = time.time()
    df = engine.run_pipeline(p, pd.Timestamp("2022-01-03"),pd.Timestamp("2024-01-03"))
    dur = time.time() - start
    stats[depth] = dur

print(stats)

@everling
Copy link
Author

everling commented Aug 14, 2025

@stefan-jansen I have been running zipline-reloaded with these edits for quite some time, haven't noticed any issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant