Skip to content

Commit 7cb97d6

Browse files
authored
Merge branch 'main' into fix-revert-so-easier-for-big-pr
2 parents 93f4477 + 259c504 commit 7cb97d6

File tree

14 files changed

+2102
-2
lines changed

14 files changed

+2102
-2
lines changed

dapr_agents/workflow/decorators/messaging.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
11
import logging
2+
import warnings
23
from copy import deepcopy
34
from typing import Any, Callable, Optional, get_type_hints
5+
46
from dapr_agents.workflow.utils.core import is_valid_routable_model
57
from dapr_agents.workflow.utils.messaging import extract_message_models
68

79
logger = logging.getLogger(__name__)
810

11+
_MESSAGE_ROUTER_DEPRECATION_MESSAGE = (
12+
"@message_router (legacy version from dapr_agents.workflow.decorators.messaging) "
13+
"is deprecated and will be removed in a future release. "
14+
"Please migrate to the updated decorator in "
15+
"`dapr_agents.workflow.decorators.routers`, which supports "
16+
"Union types, forward references, and explicit Dapr workflow integration."
17+
)
18+
919

1020
def message_router(
1121
func: Optional[Callable[..., Any]] = None,
@@ -16,7 +26,8 @@ def message_router(
1626
broadcast: bool = False,
1727
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
1828
"""
19-
Decorator for registering message handlers by inspecting type hints on the 'message' argument.
29+
[DEPRECATED] Legacy decorator for registering message handlers by inspecting type hints
30+
on the 'message' argument.
2031
2132
This decorator:
2233
- Extracts the expected message model type from function annotations.
@@ -36,6 +47,12 @@ def message_router(
3647
"""
3748

3849
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
50+
warnings.warn(
51+
_MESSAGE_ROUTER_DEPRECATION_MESSAGE,
52+
DeprecationWarning,
53+
stacklevel=2,
54+
)
55+
3956
is_workflow = hasattr(f, "_is_workflow")
4057
workflow_name = getattr(f, "_workflow_name", None)
4158

@@ -56,7 +73,9 @@ def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
5673
)
5774

5875
logger.debug(
59-
f"@message_router: '{f.__name__}' => models {[m.__name__ for m in message_models]}"
76+
"@message_router (legacy): '%s' => models %s",
77+
f.__name__,
78+
[m.__name__ for m in message_models],
6079
)
6180

6281
# Attach metadata for later registration
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
from __future__ import annotations
2+
3+
import inspect
4+
import logging
5+
from copy import deepcopy
6+
from typing import (
7+
Any,
8+
Callable,
9+
Optional,
10+
get_type_hints,
11+
)
12+
13+
from dapr_agents.workflow.utils.core import is_supported_model
14+
from dapr_agents.workflow.utils.routers import extract_message_models
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
def message_router(
20+
func: Optional[Callable[..., Any]] = None,
21+
*,
22+
pubsub: Optional[str] = None,
23+
topic: Optional[str] = None,
24+
dead_letter_topic: Optional[str] = None,
25+
broadcast: bool = False,
26+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
27+
"""
28+
Decorate a message handler with routing metadata.
29+
30+
The handler must accept a parameter named `message`. Its type hint defines the
31+
expected payload model(s), e.g.:
32+
33+
@message_router(pubsub="pubsub", topic="orders")
34+
def on_order(message: OrderCreated): ...
35+
36+
@message_router(pubsub="pubsub", topic="events")
37+
def on_event(message: Union[Foo, Bar]): ...
38+
39+
Args:
40+
func: (optional) bare-decorator form support.
41+
pubsub: Name of the Dapr pub/sub component (required when used with args).
42+
topic: Topic name to subscribe to (required when used with args).
43+
dead_letter_topic: Optional dead-letter topic (defaults to f"{topic}_DEAD").
44+
broadcast: Optional flag you can use downstream for fan-out semantics.
45+
46+
Returns:
47+
The original function tagged with `_message_router_data`.
48+
"""
49+
50+
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
51+
# Validate required kwargs only when decorator is used with args
52+
if pubsub is None or topic is None:
53+
raise ValueError(
54+
"`pubsub` and `topic` are required when using @message_router with arguments."
55+
)
56+
57+
sig = inspect.signature(f)
58+
if "message" not in sig.parameters:
59+
raise ValueError(f"'{f.__name__}' must have a 'message' parameter.")
60+
61+
# Resolve forward refs under PEP 563 / future annotations
62+
try:
63+
hints = get_type_hints(f, globalns=f.__globals__)
64+
except Exception:
65+
logger.debug(
66+
"Failed to fully resolve type hints for %s", f.__name__, exc_info=True
67+
)
68+
hints = getattr(f, "__annotations__", {}) or {}
69+
70+
raw_hint = hints.get("message")
71+
if raw_hint is None:
72+
raise TypeError(
73+
f"'{f.__name__}' must type-hint the 'message' parameter "
74+
"(e.g., 'message: MyModel' or 'message: Union[A, B]')"
75+
)
76+
77+
models = extract_message_models(raw_hint)
78+
if not models:
79+
raise TypeError(
80+
f"Unsupported or unresolved message type for '{f.__name__}': {raw_hint!r}"
81+
)
82+
83+
# Optional early validation of supported schema kinds
84+
for m in models:
85+
if not is_supported_model(m):
86+
raise TypeError(f"Unsupported model type in '{f.__name__}': {m!r}")
87+
88+
data = {
89+
"pubsub": pubsub,
90+
"topic": topic,
91+
"dead_letter_topic": dead_letter_topic
92+
or (f"{topic}_DEAD" if topic else None),
93+
"is_broadcast": broadcast,
94+
"message_schemas": models, # list[type]
95+
"message_types": [m.__name__ for m in models], # list[str]
96+
}
97+
98+
# Attach metadata; deepcopy for defensive isolation
99+
setattr(f, "_is_message_handler", True)
100+
setattr(f, "_message_router_data", deepcopy(data))
101+
102+
logger.debug(
103+
"@message_router: '%s' => models %s (topic=%s, pubsub=%s, broadcast=%s)",
104+
f.__name__,
105+
[m.__name__ for m in models],
106+
topic,
107+
pubsub,
108+
broadcast,
109+
)
110+
return f
111+
112+
# Support both @message_router(...) and bare @message_router usage
113+
return decorator if func is None else decorator(func)
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import inspect
5+
import logging
6+
from typing import Any, Callable, Iterable, List, Optional, Type
7+
8+
from dapr.clients import DaprClient
9+
from dapr.clients.grpc._response import TopicEventResponse
10+
from dapr.common.pubsub.subscription import SubscriptionMessage
11+
12+
from dapr_agents.workflow.utils.messaging import (
13+
extract_cloudevent_data,
14+
validate_message_model,
15+
)
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
def register_message_handlers(
21+
targets: Iterable[Any],
22+
dapr_client: DaprClient,
23+
*,
24+
loop: Optional[asyncio.AbstractEventLoop] = None,
25+
) -> List[Callable[[], None]]:
26+
"""Discover and subscribe handlers decorated with `@message_router`.
27+
28+
Scans each target:
29+
- If the target itself is a decorated function (has `_message_router_data`), it is registered.
30+
- If the target is an object, all its attributes are scanned for decorated callables.
31+
32+
Subscriptions use Dapr's streaming API (`subscribe_with_handler`) which invokes your handler
33+
on a background thread. This function returns a list of "closer" callables. Invoking a closer
34+
will unsubscribe the corresponding handler.
35+
36+
Args:
37+
targets: Functions and/or instances to inspect for `_message_router_data`.
38+
dapr_client: Active Dapr client used to create subscriptions.
39+
loop: Event loop to await async handlers. If omitted, uses the running loop
40+
or falls back to `asyncio.get_event_loop()`.
41+
42+
Returns:
43+
A list of callables. Each callable, when invoked, closes the associated subscription.
44+
"""
45+
# Resolve loop strategy once up front.
46+
if loop is None:
47+
try:
48+
loop = asyncio.get_running_loop()
49+
except RuntimeError:
50+
loop = asyncio.get_event_loop()
51+
52+
closers: List[Callable[[], None]] = []
53+
54+
def _iter_handlers(obj: Any):
55+
"""Yield (owner, fn) pairs for decorated handlers on `obj`.
56+
57+
If `obj` is itself a decorated function, yield (None, obj).
58+
If `obj` is an instance, scan its attributes for decorated callables.
59+
"""
60+
meta = getattr(obj, "_message_router_data", None)
61+
if callable(obj) and meta:
62+
yield None, obj
63+
return
64+
65+
for name in dir(obj):
66+
fn = getattr(obj, name)
67+
if callable(fn) and getattr(fn, "_message_router_data", None):
68+
yield obj, fn
69+
70+
for target in targets:
71+
for owner, handler in _iter_handlers(target):
72+
meta = getattr(handler, "_message_router_data")
73+
schemas: List[Type[Any]] = meta.get("message_schemas") or []
74+
75+
# Bind method to instance if needed (descriptor protocol).
76+
bound = (
77+
handler if owner is None else handler.__get__(owner, owner.__class__)
78+
)
79+
80+
async def _invoke(
81+
bound_handler: Callable[..., Any],
82+
parsed: Any,
83+
) -> TopicEventResponse:
84+
"""Invoke the user handler (sync or async) and normalize the result."""
85+
result = bound_handler(parsed)
86+
if inspect.iscoroutine(result):
87+
result = await result
88+
if isinstance(result, TopicEventResponse):
89+
return result
90+
# Treat any truthy/None return as success unless user explicitly returns a response.
91+
return TopicEventResponse("success")
92+
93+
def _make_handler(
94+
bound_handler: Callable[..., Any],
95+
) -> Callable[[SubscriptionMessage], TopicEventResponse]:
96+
"""Create a Dapr-compatible handler for a single decorated function."""
97+
98+
def handler_fn(message: SubscriptionMessage) -> TopicEventResponse:
99+
try:
100+
# 1) Extract payload + CloudEvent metadata (bytes/str/dict are also supported by the extractor)
101+
event_data, metadata = extract_cloudevent_data(message)
102+
103+
# 2) Validate against the first matching schema (or dict as fallback)
104+
parsed = None
105+
for model in schemas or [dict]:
106+
try:
107+
parsed = validate_message_model(model, event_data)
108+
break
109+
except Exception:
110+
# Try the next schema; log at debug for signal without noise.
111+
logger.debug(
112+
"Schema %r did not match payload; trying next.",
113+
model,
114+
exc_info=True,
115+
)
116+
continue
117+
118+
if parsed is None:
119+
# Permanent schema mismatch → drop (DLQ if configured by Dapr)
120+
logger.warning(
121+
"No matching schema for message on topic %r; dropping. Raw payload: %r",
122+
meta["topic"],
123+
event_data,
124+
)
125+
return TopicEventResponse("drop")
126+
127+
# 3) Attach CE metadata for downstream consumers
128+
if isinstance(parsed, dict):
129+
parsed["_message_metadata"] = metadata
130+
else:
131+
setattr(parsed, "_message_metadata", metadata)
132+
133+
# 4) Bridge worker thread → event loop
134+
if loop and loop.is_running():
135+
fut = asyncio.run_coroutine_threadsafe(
136+
_invoke(bound_handler, parsed), loop
137+
)
138+
return fut.result()
139+
return asyncio.run(_invoke(bound_handler, parsed))
140+
141+
except Exception:
142+
# Transient failure (I/O, handler crash, etc.) → retry
143+
logger.exception("Message handler error; requesting retry.")
144+
return TopicEventResponse("retry")
145+
146+
return handler_fn
147+
148+
close_fn = dapr_client.subscribe_with_handler(
149+
pubsub_name=meta["pubsub"],
150+
topic=meta["topic"],
151+
handler_fn=_make_handler(bound),
152+
dead_letter_topic=meta.get("dead_letter_topic"),
153+
)
154+
closers.append(close_fn)
155+
156+
return closers

0 commit comments

Comments
 (0)