-
Notifications
You must be signed in to change notification settings - Fork 13
Implement Record Linage Tracking and Finalization Callback #435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 0.15
Are you sure you want to change the base?
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #435 +/- ##
==========================================
+ Coverage 98.26% 98.30% +0.03%
==========================================
Files 152 152
Lines 6171 6247 +76
==========================================
+ Hits 6064 6141 +77
+ Misses 107 106 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really excited to see how all this is coming together. Just a few notes and questions.
nodestream/pipeline/step.py
Outdated
tracks_lineage: bool = False | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would there be any developer benefit to encapsulating this as a new subclass of Step
? Then finalize_record
could only be declared on that interface?
I feel like this could avoid confusion for Step developers that don't need this finalizing behaviour since if that boolean is false then finalize_record
is never called.
def FinalizingStep(Step):
async def finalize_record(self, callback_token: object):
"""Finalize a record.
This method is called when a record produced by this step has been
fully processed by all downstream steps. It is not called for records
that are not produced by this step.
"""
pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting idea... thought about it a bit. Right now we have class hierarchies that look like this:
graph LR
A[Step] --> B(Transformer)
B --> C[MyAwesomeTransformer]
Lets assume that we want to add finalization to our MyAwesomeTransformer
by inheriting from FinalizingStep
. We'd need to have a class hierarchy like this:
graph LR
A[Step] --> B(Transformer)
B --> C[MyAwesomeTransformer]
D[FinalizingStep] --> C
A --> D
This creates a... confusing class hierarchy and can lead to weird to weird MRO issues.
Then imagine we have a ApronSpringsStep
that gets notified every time we have operate on a give record.
graph LR
A[Step] --> B(Transformer)
B --> C[MyAwesomeTransformer]
D[FinalizingStep] --> C
A --> D
E[ApronSpringsStep] --> C
A --> E
This violates my personal rule for relatively shallow, flat hierarchies of classes. The more cases we add to this example the more it feels that its really the same case with the implementer of Step
choosing to do something or not depending on the cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's a really good point. As I look at this, it feels like Finalizing
is more of a Protocol than a SubClass. Would that feel any better?
It's a bit of an abuse because utilizing the protocol changes the frameworks treatment of Step
outputs so maybe it's still not a great idea. I think I'm just trying to address the bad feeling of a boolean behavior flag and a method that is unimportant to most use cases.
I leave it to your judgement on how you want that interface and experience to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've come down into there isn't really a need to distinguish a protocol or subclass to avoid the bools. Not having it in this case is the same as doing nothing. For Step
is is always a reasonable default implementation that we can rely on. This case its just pass
.
@dataclass(slots=True) | ||
class Record: | ||
"""A `Record` is a unit of data that is processed by a pipeline.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only a framework facing class so maybe not critical but I wonder if naming this Record
will cause some confusion. As I'm reading through the code, I'm realizing that throughout the system record
is used to refer to the input and output of Steps. But now we have a new Record
class where that object that is often called a record is the data
property of this class. Could get confusing for folks not deeply entrenched in the framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a good point... I'll workshop a better name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone with RecordContext
nodestream/pipeline/pipeline.py
Outdated
data = callback_token = emission | ||
if isinstance(emission, tuple) and step.tracks_lineage: | ||
data, callback_token = emission |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be beneficial for callback_token == None
when steps.tracks_lineage == None
? Would make it explicitly clear than no token was actually communicated out from the step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My pendulum has swung the other way on this. I think always calling it and calling and not having a flag is the most predictable pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to update docs and advertise the breaking change that any tuple returned from a step will have the last element stripped off.
self.step_outbox_size, current_output_name, current_input_name | ||
) | ||
pipeline_output = PipelineOutput(current_input, reporter) | ||
executors.append(Executor.pipeline_output(current_input, reporter)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be misremembering but I thought we needed the reporter
to be first in the list in order to fix the race?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is? Line 484
we create a blank list and 497
is the first we append to it so it will have position 0. Do you think its better if we create it there to be a touch more clear about that. Something like:
executors = [Executor.pipeline_output(current_input, reporter)]
Does that seem more appropriate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I was completely misinterpreting this section. I think I'm not fulling grokking all of the Executor
abstractions and channel management and that was clouding my read of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All good... this PR is doing a lot in it.
tests/unit/pipeline/test_record.py
Outdated
async def test_record_from_step_emission_tuple_data(): | ||
"""Test Record.from_step_emission with tuple (data, token).""" | ||
step = Mock(spec=Step) | ||
data = {"test": "data"} | ||
token = "callback_token" | ||
|
||
record = Record.from_step_emission(step, (data, token)) | ||
|
||
assert record.data == data | ||
assert record.callback_token == token | ||
assert record.originating_step == step |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this test pass if tracks_lineage
isn't set on the Step
? This seems like an issue with a number of tests in this file?
Maybe Mock defaults bools to True
? If so, I feel like it would be clearer to be explicit about that value.
tests/unit/pipeline/test_step.py
Outdated
# Should not raise any exceptions and should do nothing | ||
|
||
|
||
@pytest.mark.asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken, I think this test just tests that You can call a function. The mock_finalize
is defined, attached to the step
object and then called. I'm not sure this is testing anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably teh tests are pretty bad. I'll go through them
|
||
|
||
@pytest.mark.asyncio | ||
async def test_finalize_record_async_behavior(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually testing that Record.drop()
calls the finalize with await
?
I think due to the complexity and nuance in this change, we are due for merging this into a new breaking release. I see this as a good opportunity to make some other small changes we need to make a-la #407 |
This PR focuses on refactoring the pipeline code in order to better accommodate and implement a record lineage tracking system. The goal of which is to enable steps, often extractors, to free up resources associated with a yielded record to acknowledge back to systems that a record was truly and completely processed.
Refactor
The current pipeline code is tenuous and hard to introduce change into because of the fact that it is implemented as a series of procedures and not well encapsulated. In these situations, we need to make some cross cutting changes to refactor the pipeline code.
The refactor itself works on the observation that the previous code was implemented as a procedure so that there could be careful transitions from one state that a step is in to another. In order to remove the procedures, the pipeline was refactored to operate a state machine for each step in the pipeline as well as the output.
Steps Transition through a state flow that looks like this:
StartStepState
→ProcessRecordsState
→EmitOutstandingRecordsState
→StopStepExecution
and the pipeline output progresses via a state flow like this:
PipelineOutputStartState
→PipelineOutputProcessRecordsState
→PipelineOutputStopState
This means that both steps and the pipeline output use the same executor pattern, simplifying the overall architecture.
Lineage Tracking
The highlight of this PR i record lineage tracking. This, in short, builds a tree of intermediary and final records produced from every single output record. In other words, we track parent-child relationships of every record in and out of every step. With this information, we are able to know when
nodestream
is 'done' processing a record and trigger a callback to the originating step when its appropriate. All steps can use this following the same pattern. As an example an extractor is provided below:Bug Fixes
As miscellanies, this PR fixes to minor issues with exception handling in the pipeline logic as well.
on_start
callback is triggered, the CLI may not have started the progress spinner that reports error messages. This leads to a separate error on the crash that obscures the original error. The fix is to ensure that theon_start
is explicitly executed first before actual pipeline processing begins.on_finish
) had a try catch block around it. That try catch was not required because, by the time the pipeline executes this code, there is nothing else to do so there is no reason to swallow the exception in order to protect the integrity of the pipeline. the fix was to simply remove that try catch.Due to this behavior, we'll likely need to make a 0.15 release.