refactor: Migrate from programmatic to streaming subscriptions#349
refactor: Migrate from programmatic to streaming subscriptions#349SoulSniper-V2 wants to merge 12 commits intodapr:mainfrom
Conversation
37c2439 to
46666cb
Compare
sicoyle
left a comment
There was a problem hiding this comment.
Thank you so much for your interest in contributing and for adding tests—this is great 🙌
I have a few comments so far. Also, would you mind splitting the streaming logic out of registration.py into separate functions, or possibly a separate file (e.g., subscription.py)? With these great additions, that file is starting to get a bit heavy, and this would help keep things more maintainable.
Did you test these changes against some of the quickstarts/examples by chance?
|
Thanks for the review @sicoyle! 🙌 I moved the streaming logic into Ran the message router tests locally and everything looks good. |
|
Hi @sicoyle, just checking in on this. I have addressed the feedback regarding the refactor into subscription.py and the cleanup of the legacy paths. Let me know if there’s anything else needed to move this toward a final review or to start the CI builds |
Thank you! Could you please correct the lint/build failures? I just need to test these changes locally before merge (since I don't have automation yet running on PRs 🙃 ) |
sicoyle
left a comment
There was a problem hiding this comment.
Would you mind splitting up subscribe_message_bindings a bit? At almost 400 lines with several nested functions, it’s getting a little hard to follow and review. Breaking it into smaller pieces would make it much easier to maintain and reason about 🙏
|
Addressed all feedback. Split the function into smaller helpers, added constants, fixed exception handling, and renamed variables for clarity. Tests pass. |
Signed-off-by: Arush Wadhawan <soulsniper@Arushs-MacBook-Air.local>
Signed-off-by: Arush Wadhawan <soulsniper@Arushs-MacBook-Air.local>
Signed-off-by: Arush Wadhawan <soulsniper@Arushs-MacBook-Air.local>
Signed-off-by: Arush Wadhawan <warush23+github@gmail.com>
- Split subscribe_message_bindings into focused helper functions - Add module-level constants for delivery modes and status codes - Use WorkflowStatus enum instead of magic strings - Add proper validation for dead_letter_topics (ValueError if multiple) - Fix _resolve_event_loop to properly raise on missing loop - Add zombie thread detection (raise RuntimeError on timeout) - Move json import to top level - Rename variables for clarity: - grouped -> bindings_by_topic_key - b -> binding - plan -> binding_schema_pairs - preferred -> matching_ce_type_pairs - Replace logger.info with logger.debug for internal operations - Restore underscore prefixes for internal functions - Remove nested try/except where possible - Add proper docstrings explaining function purposes Signed-off-by: Arush Wadhawan <warush23+github@gmail.com>
478053b to
9143489
Compare
CasperGN
left a comment
There was a problem hiding this comment.
Thank you for this!
Left an ask to use f-strings consistently rather than %s.
| else: | ||
| setattr(parsed, METADATA_KEY, metadata) | ||
| except Exception: | ||
| logger.debug("Could not attach %s to payload; continuing.", METADATA_KEY) |
There was a problem hiding this comment.
Can you convert all these strings (including the error message ones further down below) to f strings so we're consistent?
There's a mix in this file as well.
Signed-off-by: Arush Wadhawan <warush23+github@gmail.com>
|
Converted all logging to f-strings. |
|
@sicoyle this lgtm now. Thank you @SoulSniper-V2! |
Metadata Schema Compatibility ReportBreaking Metadata Schema Changes
|
There was a problem hiding this comment.
Pull request overview
Migrates workflow message routing from legacy programmatic pub/sub subscriptions (subscribe_with_handler) to streaming subscriptions (dapr_client.subscribe) to use persistent gRPC connections and improve reliability.
Changes:
- Added a new
workflow/utils/subscription.pymodule implementing streaming subscription consumption in background threads with optional async delivery mode. - Refactored
workflow/utils/registration.pyto buildMessageRouteBindings and delegate subscription setup to the new streaming utilities. - Updated
tests/workflow/test_message_router.pyto mockdapr_client.subscribeinstead ofsubscribe_with_handler.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 12 comments.
| File | Description |
|---|---|
tests/workflow/test_message_router.py |
Updates unit tests to mock streaming subscriptions and validate subscription call parameters. |
dapr_agents/workflow/utils/subscription.py |
New streaming subscription implementation: composite routing per topic, consumer threads, async worker queue option, shutdown helpers. |
dapr_agents/workflow/utils/registration.py |
Refactors binding collection and replaces legacy subscription logic with calls into the new streaming subscription module. |
Comments suppressed due to low confidence (1)
dapr_agents/workflow/utils/registration.py:316
register_message_routesno longer accepts asubscribeoverride, but there are still internal call sites passingsubscribe=...(e.g.dapr_agents/workflow/runners/base.pycalls it withsubscribe=subscribe). This will raiseTypeError: got an unexpected keyword argument 'subscribe'at runtime. Either update all call sites to stop passingsubscribe, or keep the parameter (possibly deprecated) and handle/ignore it for streaming subscriptions.
def register_message_routes(
*,
dapr_client: DaprClient,
targets: Optional[Iterable[Any]] = None,
routes: Optional[Iterable[PubSubRouteSpec]] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
delivery_mode: Literal["sync", "async"] = "sync",
queue_maxsize: int = 1024,
deduper: Optional[DedupeBackend] = None,
scheduler: Optional[SchedulerFn] = None,
wf_client: Optional[wf.DaprWorkflowClient] = None,
await_result: bool = False,
await_timeout: Optional[int] = None,
fetch_payloads: bool = True,
log_outcome: bool = True,
) -> List[Callable[[], None]]:
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _ = _resolve_event_loop( | ||
| loop | ||
| ) # Parity with message registrar; FastAPI does not require it yet. |
There was a problem hiding this comment.
_mount_http_bindings calls _resolve_event_loop(loop) purely for “parity”, but this can raise in normal FastAPI setups where routes are registered from a synchronous context (common with Python 3.12+ where asyncio.get_event_loop() can fail). Since HTTP route mounting doesn’t require an event loop here, consider removing this call or making it non-fatal (e.g., only validate in truly async-required paths).
| _ = _resolve_event_loop( | |
| loop | |
| ) # Parity with message registrar; FastAPI does not require it yet. |
| loop.call_soon_threadsafe( | ||
| queue.put_nowait, | ||
| (binding.handler, parsed), | ||
| ) | ||
| return TopicEventResponse(STATUS_SUCCESS) |
There was a problem hiding this comment.
In async delivery mode the handler schedules queue.put_nowait(...) via loop.call_soon_threadsafe(...) and immediately returns success. If the queue is full, put_nowait will raise QueueFull in the event loop, and the message will still be ACKed (lost) with no retry. Add explicit backpressure handling: catch QueueFull and return retry (or block with an awaited put), and ensure failures to enqueue do not ACK the message.
| loop.call_soon_threadsafe( | |
| queue.put_nowait, | |
| (binding.handler, parsed), | |
| ) | |
| return TopicEventResponse(STATUS_SUCCESS) | |
| if loop.is_running(): | |
| # Backpressure-aware enqueue: block until the item is queued | |
| fut = asyncio.run_coroutine_threadsafe( | |
| queue.put((binding.handler, parsed)), | |
| loop, | |
| ) | |
| try: | |
| fut.result() | |
| except Exception: | |
| logger.exception( | |
| "Failed to enqueue workflow task for handler %s; " | |
| "requesting retry.", | |
| binding.name, | |
| ) | |
| return TopicEventResponse(STATUS_RETRY) | |
| return TopicEventResponse(STATUS_SUCCESS) | |
| # If the loop is not running, fall through to the sync path below. |
| _validate_delivery_mode(delivery_mode) | ||
| _validate_dead_letter_topics(bindings) | ||
|
|
||
| resolved_loop = _resolve_event_loop(loop) |
There was a problem hiding this comment.
subscribe_message_bindings resolves an event loop unconditionally (resolved_loop = _resolve_event_loop(loop)), even for delivery_mode='sync'. In Python 3.12+ this can raise when called from a synchronous context with loop=None, despite sync mode being able to operate via asyncio.run(...). Consider only requiring an existing/running loop for delivery_mode='async', and otherwise creating a new loop or bypassing loop resolution in sync mode.
| resolved_loop = _resolve_event_loop(loop) | |
| if delivery_mode == DELIVERY_MODE_ASYNC: | |
| resolved_loop = _resolve_event_loop(loop) | |
| else: | |
| # In sync mode we can rely on asyncio.run(...) and do not require | |
| # an existing/running event loop; avoid resolving it unconditionally. | |
| resolved_loop = loop |
| mock_sub = MagicMock() | ||
| mock_sub.__iter__.return_value = [] | ||
| mock_client.subscribe.return_value = mock_sub |
There was a problem hiding this comment.
mock_sub.__iter__.return_value is set to [], which is iterable but not an iterator; iterating the subscription in the consumer thread may raise a TypeError and the test won’t reliably exercise the streaming loop. Prefer mock_sub.__iter__.return_value = iter([]).
| mock_sub = MagicMock() | ||
| mock_sub.__iter__.return_value = [] | ||
| mock_client.subscribe.return_value = mock_sub |
There was a problem hiding this comment.
mock_sub.__iter__.return_value = [] returns a list rather than an iterator; the consumer thread’s for msg in sub: can fail with TypeError and exit. Set this to iter([]) to accurately model an empty subscription stream.
|
@SoulSniper-V2 this is super close! Would you mind addressing the copilot feedback please? 🙏 |
Metadata Schema Compatibility ReportBreaking Metadata Schema Changes
|
|
Hi @SoulSniper-V2 @sicoyle, just checking - are you still planning to address the Copilot feedback on this PR? |
|
thank you so much for your contributions @SoulSniper-V2 🤗 I went ahead and cherry picked your changes in this PR and addressed all of the remaining PR feedback here so we can get this change into the next release 🎉 |
Description
Migrates message subscriptions from the legacy
subscribe_with_handler(programmatic) todapr_client.subscribe(streaming).This fixes #348 by using persistent gRPC connections, which removes the requirement for exposing app ports and improves reliability.
Issue reference
Closes: #348
Checklist
Implementation
subscribe_with_handlerwithdapr_client.subscribeand a background consumer thread.Verification
tests/workflow/test_message_router.pyto mock streaming subscriptions. All 77 unit tests passed.ruffpassed.