Skip to content

Conversation

zprobst
Copy link
Member

@zprobst zprobst commented Aug 26, 2025

This PR introduces the concept of record lineage tracking. The core value proposition here is to expose another hook to Step to allow it to be called back when the record has been fully processed.

class SomeStepThatIsProbablyAnExtractor:
      def process_record(self, record):
            results = self.actually_process_record(record)
            from i, result in results:
                yield result, i

      def finalize_record(self, token: int): 
             # do something with the value you got. 
  

NOTE: This is currently a draft for comment purposes. Tests will not work, need to add some tests to cover this case, and (i think) there are a few edge cases to work out but this is the crux of the solution.

@zprobst zprobst requested a review from ccloes as a code owner August 26, 2025 21:40
@zprobst zprobst marked this pull request as draft August 26, 2025 21:42
Comment on lines +40 to +46
The `emission` can either be a single value or a tuple of two values.
If it is a single value, then it is assumed to be the data for the
record. If it is a tuple of two values, then the first value is
assumed to be the data for the record and the second value is assumed
to be the callback token for the record. If any other value is
provided, the data and callback token are both set to the value
provided.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how i figured it would work. Documentation will have to clearly state that after this change tuple is not an acceptable record type unless the steps provides some kind of callback token. That is to say, tuple record types will only work if the emission type is tuple[tuple, Any].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth cutting a breaking change here.

Comment on lines +76 to +79
# If we are being told to drop, then we need to run our callback so
# that the step that created us can clean up any resources it has
# allocated for this record.
await self.originating_step.finalize_record(self.callback_token)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the call to finalize_record be skipped if data == callback_token? From lines 58-60, I figured that was the intent. Or is finalize() always called and the callback_token is just a way for the step author to have the callback call with different data than the original data record?

Copy link
Member Author

@zprobst zprobst Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the code is evidencing to me I am of two minds here.

At first, I was thinking finalize_record would only be called when you provide a token, but there is some chance you'd want to operate the on the record itself when handling finalize_record. And one may argue that its speculative generality, but it actually (assuming cleaning the comments) produces cleaner code to just not treat passing a token as a special case and instead just always call finalize_record with a default implementation of pass

Do you have a feeling one way or another? I think for most cases, it is functionally immaterial.

Copy link

@cbadke cbadke Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the argument and temptation to always call finalize_record but to change what is passed.

The concern I have with calling finalize in either case is the inconsistency and element of surprise. If I return one thing from process_record I get one behaviour, if I return a magic tuple, I get a different behaviour. This is technically true in either scenario...

My feeling is that there should generally be one way for things to work. If you only call finalize if they return tuple with a callback object, the system is explicitly forcing the developer to say "let me know when this is finished processing". If they want the original message, they can return (record, record).

Maybe it should even be a flaggable feature but that would complicate matters I imagine.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter which way you choose, the tuple vs not tuple behaviour is going to confuse someone at some point. :-/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your thought process here. I've solicited @jbristow to get another perspective. I definitely see this both ways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this as if I were writing Haskell or F# code, it feels like there should be "current-context" struct being passed from step to step that contains the result rather than a result.

That way you could bind things to the context that would be called by specific lifecycle events. Maybe make some Global hooks like "after-successful-ingest" or whatever.

I apologize for bringing up monad adjacent thought while discussing Python code.

Copy link
Contributor

@jbristow jbristow Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between passing:

Context {
  data: dict[str,Any]
}
FinalizingContext(Context) {
  finalize_fn: Callable[context, None]
}

and what we do now other than one level of wrapping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we could probably make a bit more elegant modeling it after the old state hooks ways of Javaland

have a dict[NodestreamState, list[HookFn]] that a state handler can pick up the context object and say "I AM STATE X! EXECUTE ANY HOOKS YOU HAVE FOR ME"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That really got me thinking... a potential compromise is if we force extractors to look like this:

class SomeExtractor(Extractor):
       async def extract_records(self):
               for i in range(1000):
                    yield self.record(data=i) # if you do not want callbacks 
                    yield self.managed_record(data=i, callback_token=i) # if you do want callbacks 

Then we have some polymorphism on the Record type. Thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a similar thought early on but wasn't sure on the appetite to change the interface. I think it could work.

  1. I'm not sure we should be singularly focused on Extractors for this feature, there might be cases for other constructs (like Transforms?) that could want to access this mechanism once it's available.
  2. It could feel strange to have components return this record object but receive just the data element from the upstream step. Should all steps receive this Record wrapper instead? I would guess the interface would use the base class.


self.call_handling_errors(self.reporter.on_finish_callback, metrics)
class Exectutor:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be Executor 😄

Copy link

codecov bot commented Aug 28, 2025

Codecov Report

❌ Patch coverage is 97.05882% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 98.17%. Comparing base (466466a) to head (a4f0925).

Files with missing lines Patch % Lines
nodestream/pipeline/pipeline.py 97.01% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #435      +/-   ##
==========================================
- Coverage   98.26%   98.17%   -0.10%     
==========================================
  Files         152      152              
  Lines        6171     6248      +77     
==========================================
+ Hits         6064     6134      +70     
- Misses        107      114       +7     
Flag Coverage Δ
3.10-macos-latest ?
3.10-ubuntu-latest ?
3.10-windows-latest 98.15% <97.05%> (-0.08%) ⬇️
3.11-macos-latest ?
3.11-ubuntu-latest ?
3.11-windows-latest 98.15% <97.05%> (-0.08%) ⬇️
3.12-macos-latest ?
3.12-ubuntu-latest ?
3.12-windows-latest 98.15% <97.05%> (-0.08%) ⬇️
3.13-macos-latest ?
3.13-ubuntu-latest ?
3.13-windows-latest 98.15% <97.05%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants