Skip to content

Conversation

@vaibhavtiwari33
Copy link
Contributor

@vaibhavtiwari33 vaibhavtiwari33 commented Dec 23, 2025

This PR aims to add support for on_success sink in pynumaflow (pynumaflow-lite already has it)

Fixes: #298

Testing:

  • Added unit tests
  • Tested by running a working example

Local Testing

Pipeline spec:

Using rust sdk for logger for onSuccess udsink since server info prevents this python SDK version to be deployed as onSuccess ud sink container.

spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 1
          duration: 1s
          msgSize: 10
    - name: out
      sink:
        udsink:
          container:
            args:
            - python
            - example.py
            image: quay.io/numaio/numaflow-python/all-sinks:stable
            imagePullPolicy: IfNotPresent
            env:
              - name: PYTHONDEBUG
                value: "true"
              - name: INVOKE
                value: "func_handler"
        fallback:
          udsink:
            container:
              image: quay.io/numaio/numaflow-python/sink-log:stable
              imagePullPolicy: IfNotPresent
        onSuccess:
          udsink:
            container:
              image: quay.io/numaio/numaflow-rs/sink-log:stable
              imagePullPolicy: IfNotPresent
    - name: log-output
      sink:
        log: {}
  edges:
    - from: in
      to: out
    - from: in
      to: log-output

UD sink logs:

DEBUG:grpc._cython.cygrpc:[_cygrpc] Loaded running loop: id(loop)=94599583797344
INFO:__main__:Write to User Defined Sink failed, writing {"id":"1766511375495770916-0","value":1766511375495769284,"padding":[212,112]} to fallback sink
INFO:__main__:Write to User Defined Sink failed, writing {"id":"1766511376495585228-0","value":1766511376495583491,"padding":[36,34]} to fallback sink
INFO:__main__:Write to User Defined Sink failed, writing {"id":"1766511377495386195-0","value":1766511377495384457,"padding":[9,24]} to fallback sink
INFO:__main__:Write to User Defined Sink failed, writing {"id":"1766511378495878824-0","value":1766511378495876973,"padding":[59,81]} to fallback sink
INFO:__main__:Write to User Defined Sink succeeded, writing {"id":"1766511379495795898-0","value":1766511379495794333,"padding":[178,17]} to onSuccess sink
INFO:__main__:Write to User Defined Sink succeeded, writing {"id":"1766511380496136496-0","value":1766511380496134038,"padding":[116,59]} to onSuccess sink
INFO:__main__:Write to User Defined Sink failed, writing {"id":"1766511381496074389-0","value":1766511381496072745,"padding":[40,0]} to fallback sink

OnSuccess sink logs:

{"id":"1766512461495670560-0","value":1766512461495668458,"padding":[46,44]}
{"id":"1766512462495658834-0","value":1766512462495656702,"padding":[213,95]}
{"id":"1766512464495421493-0","value":1766512464495419882,"padding":[94,183]}
{"id":"1766512465495725587-0","value":1766512465495723714,"padding":[55,78]}
{"id":"1766512466495434411-0","value":1766512466495432709,"padding":[20,21]}
{"id":"1766512468495633684-0","value":1766512468495631095,"padding":[84,56]}
{"id":"1766512470495307685-0","value":1766512470495305948,"padding":[0,253]}
{"id":"1766512471495301394-0","value":1766512471495299930,"padding":[170,162]}

Numa container logs:

{"timestamp":"2025-12-23T17:38:00.496106Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2025-12-23T17:38:01.496039Z","level":"INFO","message":"Processed messages per second","processed":"1","target":"numaflow_core::tracker"}
{"timestamp":"2025-12-23T17:38:02.496557Z","level":"INFO","message":"Processed messages per second","processed":"2","target":"numaflow_core::tracker"}
{"timestamp":"2025-12-23T17:38:03.496244Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2025-12-23T17:38:04.496469Z","level":"INFO","message":"Processed messages per second","processed":"1","target":"numaflow_core::tracker"}
{"timestamp":"2025-12-23T17:38:05.496596Z","level":"INFO","message":"Processed messages per second","processed":"2","target":"numaflow_core::tracker"}
{"timestamp":"2025-12-23T17:38:06.496168Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}

@codecov
Copy link

codecov bot commented Dec 23, 2025

Codecov Report

❌ Patch coverage is 93.10345% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.85%. Comparing base (5500636) to head (68c7f27).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
packages/pynumaflow/pynumaflow/sinker/_dtypes.py 89.18% 4 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #302   +/-   ##
=======================================
  Coverage   93.85%   93.85%           
=======================================
  Files          66       66           
  Lines        3009     3058   +49     
  Branches      155      159    +4     
=======================================
+ Hits         2824     2870   +46     
- Misses        135      138    +3     
  Partials       50       50           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Vaibhav Tiwari <[email protected]>
@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review December 23, 2025 21:09
Comment on lines +42 to +48
def with_keys(self, keys: Optional[list[str]]):
self._keys = keys
return self

def with_user_metadata(self, user_metadata: Optional[UserMetadata]):
self._user_metadata = user_metadata
return self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python's design choice is to return None if a method mutates the object in place (as per std lib methods like list.sort(), list.append(), list.extend(), dict.update(), set.add()).

my_list.sort()
# vs
s_list = sorted(my_list)

I'm not sure what to do here since we follow this pattern for Message object in other modules too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it as is for now. I can create a separate issue to fix this, will need to call it out in the next patch release as it is a breaking change for anyone already using previous way.

@vaibhavtiwari33 vaibhavtiwari33 merged commit 2ebe47f into main Jan 4, 2026
13 checks passed
@vaibhavtiwari33 vaibhavtiwari33 deleted the on-success-sink branch January 4, 2026 21:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python SDK Changes to Support OnSuccess Sink

4 participants