Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
475 changes: 392 additions & 83 deletions nodestream/pipeline/pipeline.py

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions nodestream/pipeline/step.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from typing import AsyncGenerator, Optional

from ..metrics import (
FATAL_ERRORS,
NON_FATAL_ERRORS,
Metrics,
)
from ..metrics import FATAL_ERRORS, NON_FATAL_ERRORS, Metrics
from .object_storage import ObjectStore
from .progress_reporter import PipelineProgressReporter

Expand Down Expand Up @@ -110,6 +106,8 @@ class Step:
asynchronous context. They can process records and emit new records.
"""

tracks_lineage: bool = False

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any developer benefit to encapsulating this as a new subclass of Step? Then finalize_record could only be declared on that interface?

I feel like this could avoid confusion for Step developers that don't need this finalizing behaviour since if that boolean is false then finalize_record is never called.

def FinalizingStep(Step):
    async def finalize_record(self, callback_token: object):
         """Finalize a record.
        This method is called when a record produced by this step has been
        fully processed by all downstream steps. It is not called for records
        that are not produced by this step.
        """
        pass

Copy link
Member Author

@zprobst zprobst Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea... thought about it a bit. Right now we have class hierarchies that look like this:

graph LR
    A[Step] --> B(Transformer)
    B --> C[MyAwesomeTransformer]
Loading

Lets assume that we want to add finalization to our MyAwesomeTransformer by inheriting from FinalizingStep. We'd need to have a class hierarchy like this:

graph LR
    A[Step] --> B(Transformer)
    B --> C[MyAwesomeTransformer]
    D[FinalizingStep] --> C
    A --> D
Loading

This creates a... confusing class hierarchy and can lead to weird to weird MRO issues.

Then imagine we have a ApronSpringsStep that gets notified every time we have operate on a give record.

graph LR
    A[Step] --> B(Transformer)
    B --> C[MyAwesomeTransformer]
    D[FinalizingStep] --> C
    A --> D
    E[ApronSpringsStep] --> C
    A --> E
Loading

This violates my personal rule for relatively shallow, flat hierarchies of classes. The more cases we add to this example the more it feels that its really the same case with the implementer of Step choosing to do something or not depending on the cases.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's a really good point. As I look at this, it feels like Finalizing is more of a Protocol than a SubClass. Would that feel any better?

It's a bit of an abuse because utilizing the protocol changes the frameworks treatment of Step outputs so maybe it's still not a great idea. I think I'm just trying to address the bad feeling of a boolean behavior flag and a method that is unimportant to most use cases.

I leave it to your judgement on how you want that interface and experience to work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've come down into there isn't really a need to distinguish a protocol or subclass to avoid the bools. Not having it in this case is the same as doing nothing. For Step is is always a reasonable default implementation that we can rely on. This case its just pass.

async def start(self, context: StepContext):
"""Start the step.

Expand Down Expand Up @@ -149,6 +147,15 @@ async def finish(self, context: StepContext):
"""
pass

async def finalize_record(self, record_or_token: object):
"""Finalize a record.

This method is called when a record produced by this step has been
fully processed by all downstream steps. It is not called for records
that are not produced by this step.
"""
pass


class PassStep(Step):
"""A `PassStep` passes records through."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ async def test_pipeline_interpretation_snapshot(
snapshot.snapshot_dir = "tests/integration/snapshots"
pipeline_file = get_pipeline_fixture_file_by_name(pipeline_name)
definition = PipelineDefinition.from_path(pipeline_file)
results = await drive_definition_to_completion(definition)
results_as_json = json.dumps(
[asdict(r) for r in (await drive_definition_to_completion(definition))],
[asdict(r) for r in results],
default=set_default,
indent=4,
sort_keys=True,
Expand Down
315 changes: 315 additions & 0 deletions tests/integration/test_pipeline_cleanup_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
from unittest.mock import Mock

import pytest

from nodestream.pipeline.extractors import Extractor
from nodestream.pipeline.object_storage import ObjectStore
from nodestream.pipeline.pipeline import Pipeline
from nodestream.pipeline.progress_reporter import PipelineProgressReporter
from nodestream.pipeline.transformers import Transformer
from nodestream.pipeline.writers import Writer


class ResourceTrackingExtractor(Extractor):
"""Extractor that tracks resource allocation and cleanup."""

tracks_lineage: bool = True

def __init__(self, data_items):
self.data_items = data_items
self.allocated_resources = {}
self.finalized_tokens = []

async def extract_records(self):
for i, item in enumerate(self.data_items):
# Simulate resource allocation
token = f"extractor_resource_{i}"
self.allocated_resources[token] = f"resource_for_{item}"
yield (item, token) # Emit tuple with callback token

async def finalize_record(self, record_token):
"""Clean up resources allocated for this record."""
if record_token in self.allocated_resources:
del self.allocated_resources[record_token]
self.finalized_tokens.append(record_token)


class ResourceTrackingTransformer(Transformer):
"""Transformer that tracks resource allocation and cleanup."""

tracks_lineage: bool = True

def __init__(self):
self.allocated_resources = {}
self.finalized_tokens = []
self.processed_records = []

async def process_record(self, record, context):
# Track the record we processed
self.processed_records.append(record)

# Simulate resource allocation for transformation
token = f"transformer_resource_{id(record)}"
self.allocated_resources[token] = f"transform_resource_for_{record}"

# Transform the record
transformed = f"transformed_{record}"
yield (transformed, token) # Emit with callback token

async def finalize_record(self, record_token):
"""Clean up transformation resources."""
if record_token in self.allocated_resources:
del self.allocated_resources[record_token]
self.finalized_tokens.append(record_token)


class ResourceTrackingWriter(Writer):
"""Writer that tracks resource allocation and cleanup."""

tracks_lineage: bool = True

def __init__(self):
self.allocated_resources = {}
self.finalized_tokens = []
self.written_records = []

async def write_record(self, record):
# Track what we wrote
self.written_records.append(record)

# Simulate resource allocation for writing
token = f"writer_resource_{id(record)}"
self.allocated_resources[token] = f"write_resource_for_{record}"
return token # Return token for cleanup

async def process_record(self, record, context):
# Write the record and get cleanup token
token = await self.write_record(record)
yield (record, token) # Pass through with cleanup token

async def finalize_record(self, record_token):
"""Clean up writing resources."""
if record_token in self.allocated_resources:
del self.allocated_resources[record_token]
self.finalized_tokens.append(record_token)


@pytest.mark.asyncio
async def test_end_to_end_cleanup_flow():
"""Test complete cleanup flow through extractor -> transformer -> writer."""
# Create steps with resource tracking
extractor = ResourceTrackingExtractor(["item1", "item2", "item3"])
transformer = ResourceTrackingTransformer()
writer = ResourceTrackingWriter()

# Create pipeline
steps = (extractor, transformer, writer)
object_store = Mock(spec=ObjectStore)
object_store.namespaced = Mock(return_value=Mock())

pipeline = Pipeline(steps, step_outbox_size=10, object_store=object_store)

# Create progress reporter
reporter = PipelineProgressReporter.for_testing([])

# Run pipeline
await pipeline.run(reporter)

# Verify all records were processed
assert len(transformer.processed_records) == 3
assert len(writer.written_records) == 3

# Verify finalize_record was called for writer (final step)
# Note: In multi-step pipelines, only the final step gets cleanup calls
# because intermediate records are transformed, not dropped
assert len(writer.finalized_tokens) == 3

# Writer resources should be cleaned up
assert len(writer.allocated_resources) == 0


@pytest.mark.asyncio
async def test_cleanup_flow_with_filtering():
"""Test cleanup flow when some records are filtered out."""

class FilteringTransformer(Transformer):
tracks_lineage: bool = True

def __init__(self):
self.allocated_resources = {}
self.finalized_tokens = []

async def process_record(self, record, context):
# Allocate resource for processing
token = f"filter_resource_{id(record)}"
self.allocated_resources[token] = f"resource_for_{record}"

# Only pass through records containing "keep"
if "keep" in str(record):
yield (f"filtered_{record}", token)
# If we don't yield, the record will be dropped and finalized

async def finalize_record(self, record_token):
if record_token in self.allocated_resources:
del self.allocated_resources[record_token]
self.finalized_tokens.append(record_token)

# Create steps
extractor = ResourceTrackingExtractor(["keep1", "drop1", "keep2", "drop2"])
filter_transformer = FilteringTransformer()
writer = ResourceTrackingWriter()

steps = (extractor, filter_transformer, writer)
object_store = Mock(spec=ObjectStore)
object_store.namespaced = Mock(return_value=Mock())

pipeline = Pipeline(steps, step_outbox_size=10, object_store=object_store)
reporter = PipelineProgressReporter.for_testing([])

await pipeline.run(reporter)

# Verify only "keep" records made it to writer
assert len(writer.written_records) == 2
assert all("keep" in str(record) for record in writer.written_records)

# Verify writer resources were cleaned up
assert len(writer.allocated_resources) == 0

# Verify finalize_record was called for writer (final step)
assert len(writer.finalized_tokens) == 2 # Only 2 kept records


@pytest.mark.asyncio
async def test_cleanup_flow_with_record_multiplication():
"""Test cleanup flow when one record generates multiple records."""

class MultiplyingTransformer(Transformer):
tracks_lineage: bool = True

def __init__(self):
self.allocated_resources = {}
self.finalized_tokens = []

async def process_record(self, record, context):
# Allocate resource for processing
token = f"multiply_resource_{id(record)}"
self.allocated_resources[token] = f"resource_for_{record}"

# Generate multiple records from one input
for i in range(3):
yield (f"{record}_copy_{i}", token)

async def finalize_record(self, record_token):
if record_token in self.allocated_resources:
del self.allocated_resources[record_token]
self.finalized_tokens.append(record_token)

# Create steps
extractor = ResourceTrackingExtractor(["item1", "item2"])
multiplier = MultiplyingTransformer()
writer = ResourceTrackingWriter()

steps = (extractor, multiplier, writer)
object_store = Mock(spec=ObjectStore)
object_store.namespaced = Mock(return_value=Mock())

pipeline = Pipeline(steps, step_outbox_size=10, object_store=object_store)
reporter = PipelineProgressReporter.for_testing([])

await pipeline.run(reporter)

# Verify multiplication worked
assert len(writer.written_records) == 6 # 2 input * 3 copies each

# Verify writer resources were cleaned up
assert len(writer.allocated_resources) == 0

# Verify finalize_record calls for writer (final step)
assert len(writer.finalized_tokens) == 6 # 6 output records


@pytest.mark.asyncio
async def test_cleanup_flow_with_exception():
"""Test cleanup flow when an exception occurs during processing."""

class FailingTransformer(Transformer):
tracks_lineage: bool = True

def __init__(self):
self.allocated_resources = {}
self.finalized_tokens = []

async def process_record(self, record, context):
# Allocate resource
token = f"failing_resource_{id(record)}"
self.allocated_resources[token] = f"resource_for_{record}"

if "fail" in str(record):
raise ValueError(f"Processing failed for {record}")

yield (f"processed_{record}", token)

async def finalize_record(self, record_token):
if record_token in self.allocated_resources:
del self.allocated_resources[record_token]
self.finalized_tokens.append(record_token)

# Create steps
extractor = ResourceTrackingExtractor(["good1", "fail1", "good2"])
failing_transformer = FailingTransformer()
writer = ResourceTrackingWriter()

steps = (extractor, failing_transformer, writer)
object_store = Mock(spec=ObjectStore)
object_store.namespaced = Mock(return_value=Mock())

pipeline = Pipeline(steps, step_outbox_size=10, object_store=object_store)

# Use a reporter that doesn't raise on fatal errors for this test
reporter = PipelineProgressReporter(
on_fatal_error_callback=lambda ex: None # Don't raise
)

await pipeline.run(reporter)

# The pipeline should handle the exception and stop processing
# Writer should have processed at least one successful record before failure
assert len(writer.written_records) >= 1 # At least one successful record


@pytest.mark.asyncio
async def test_cleanup_flow_performance():
"""Test cleanup flow performance with many records."""
# Create a large number of records to test performance
large_dataset = [f"item_{i}" for i in range(100)]

extractor = ResourceTrackingExtractor(large_dataset)
transformer = ResourceTrackingTransformer()
writer = ResourceTrackingWriter()

steps = (extractor, transformer, writer)
object_store = Mock(spec=ObjectStore)
object_store.namespaced = Mock(return_value=Mock())

pipeline = Pipeline(steps, step_outbox_size=10, object_store=object_store)
reporter = PipelineProgressReporter.for_testing([])

# Measure execution time
import time

start_time = time.time()

await pipeline.run(reporter)

end_time = time.time()
execution_time = end_time - start_time

# Verify all records were processed and cleaned up
assert len(writer.written_records) == 100
assert len(writer.allocated_resources) == 0

# Verify cleanup calls were made for writer (final step)
assert len(writer.finalized_tokens) == 100

# Performance should be reasonable (adjust threshold as needed)
assert execution_time < 5.0 # Should complete within 5 seconds
Loading
Loading