Skip to content

Commit 37c2439

Browse files
Arush WadhawanArush Wadhawan
authored andcommitted
refactor: migrate from programmatic to streaming pub/sub subscriptions
1 parent a72b6e4 commit 37c2439

File tree

1 file changed

+100
-17
lines changed

1 file changed

+100
-17
lines changed

dapr_agents/workflow/utils/registration.py

Lines changed: 100 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ def _subscribe_message_bindings(
239239
return []
240240

241241
loop = _resolve_loop(loop)
242-
if subscribe is None:
243-
subscribe = dapr_client.subscribe_with_handler # type: ignore[assignment]
242+
# subscribe defaults to None (handled in loop for streaming)
244243
if delivery_mode not in ("sync", "async"):
245244
raise ValueError("delivery_mode must be 'sync' or 'async'")
246245

@@ -486,22 +485,106 @@ def handler(message: SubscriptionMessage) -> TopicEventResponse:
486485

487486
# subscribe one composite handler per (pubsub, topic)
488487
for (pubsub_name, topic_name), group in grouped.items():
488+
# Validate dead_letter_topic consistency
489+
dl_topics = {b.dead_letter_topic for b in group}
490+
if len(dl_topics) > 1:
491+
logger.warning(
492+
"Multiple dead_letter_topics found for %s:%s %s. Using %s.",
493+
pubsub_name,
494+
topic_name,
495+
dl_topics,
496+
group[0].dead_letter_topic,
497+
)
498+
489499
handler_fn = _composite_handler_fn(group)
490-
close_fn = subscribe( # type: ignore[misc]
491-
pubsub_name=pubsub_name,
492-
topic=topic_name,
493-
handler_fn=handler_fn,
494-
dead_letter_topic=group[0].dead_letter_topic,
495-
)
496-
logger.info(
497-
"Subscribed COMPOSITE(%d handlers) to pubsub=%s topic=%s (delivery=%s await=%s)",
498-
len(group),
499-
pubsub_name,
500-
topic_name,
501-
delivery_mode,
502-
await_result,
503-
)
504-
closers.append(close_fn)
500+
501+
if subscribe is not None:
502+
# Legacy/Custom subscription injection
503+
close_fn = subscribe(
504+
pubsub_name=pubsub_name,
505+
topic=topic_name,
506+
handler_fn=handler_fn,
507+
dead_letter_topic=group[0].dead_letter_topic,
508+
)
509+
logger.info(
510+
"Subscribed (Custom/Legacy) to pubsub=%s topic=%s",
511+
pubsub_name,
512+
topic_name,
513+
)
514+
closers.append(close_fn)
515+
else:
516+
# Streaming subscription (Default)
517+
subscription = dapr_client.subscribe(
518+
pubsub_name=pubsub_name,
519+
topic=topic_name,
520+
dead_letter_topic=group[0].dead_letter_topic,
521+
)
522+
523+
def _consumer_loop(
524+
sub: Any,
525+
h_fn: Callable[[SubscriptionMessage], TopicEventResponse],
526+
p_name: str,
527+
t_name: str
528+
) -> None:
529+
logger.info("Starting stream consumer for %s:%s", p_name, t_name)
530+
try:
531+
for message in sub:
532+
if message is None:
533+
continue
534+
try:
535+
response = h_fn(message)
536+
if response.status == "success":
537+
sub.respond_success(message)
538+
elif response.status == "retry":
539+
sub.respond_retry(message)
540+
elif response.status == "drop":
541+
sub.respond_drop(message)
542+
else:
543+
logger.warning("Unknown status %s, retrying", response.status)
544+
sub.respond_retry(message)
545+
except Exception:
546+
logger.exception("Handler exception in stream %s:%s", p_name, t_name)
547+
# Default to retry on handler crash
548+
try:
549+
sub.respond_retry(message)
550+
except Exception:
551+
logger.warning("Failed to send retry response for %s:%s", p_name, t_name, exc_info=True)
552+
except Exception as e:
553+
# If closed explicitly, we might get an error or simple exit
554+
logger.debug("Stream consumer %s:%s exited: %s", p_name, t_name, e)
555+
finally:
556+
sub.close()
557+
558+
# Start consumer in a separate thread to avoid blocking the event loop
559+
import threading
560+
t = threading.Thread(
561+
target=_consumer_loop,
562+
args=(subscription, handler_fn, pubsub_name, topic_name),
563+
daemon=True # Ensure process can exit if thread hangs
564+
)
565+
t.start()
566+
567+
def _make_streaming_closer(sub: Any, thread: threading.Thread) -> Callable[[], None]:
568+
def _close() -> None:
569+
try:
570+
sub.close() # Signal the subscription to stop
571+
except Exception:
572+
logger.debug("Error closing subscription", exc_info=True)
573+
574+
# Wait for thread to finish with a timeout
575+
thread.join(timeout=10.0)
576+
if thread.is_alive():
577+
logger.warning("Consumer thread for %s:%s did not stop within timeout; abandoning.", pubsub_name, topic_name)
578+
return _close
579+
580+
closers.append(_make_streaming_closer(subscription, t))
581+
logger.info(
582+
"Subscribed STREAMING to pubsub=%s topic=%s (delivery=%s await=%s)",
583+
pubsub_name,
584+
topic_name,
585+
delivery_mode,
586+
await_result,
587+
)
505588

506589
if worker_tasks:
507590

0 commit comments

Comments
 (0)