Skip to content

Commit 1b162ca

Browse files
committed
Added new message_router decorator
Signed-off-by: Roberto Rodriguez <[email protected]>
1 parent 3cf2532 commit 1b162ca

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
from __future__ import annotations
2+
3+
import inspect
4+
import logging
5+
from copy import deepcopy
6+
from typing import (
7+
Any,
8+
Callable,
9+
Optional,
10+
get_type_hints,
11+
)
12+
13+
from dapr_agents.workflow.utils.core import is_supported_model
14+
from dapr_agents.workflow.utils.routers import extract_message_models
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
def message_router(
20+
func: Optional[Callable[..., Any]] = None,
21+
*,
22+
pubsub: Optional[str] = None,
23+
topic: Optional[str] = None,
24+
dead_letter_topic: Optional[str] = None,
25+
broadcast: bool = False,
26+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
27+
"""
28+
Decorate a message handler with routing metadata.
29+
30+
The handler must accept a parameter named `message`. Its type hint defines the
31+
expected payload model(s), e.g.:
32+
33+
@message_router(pubsub="pubsub", topic="orders")
34+
def on_order(message: OrderCreated): ...
35+
36+
@message_router(pubsub="pubsub", topic="events")
37+
def on_event(message: Union[Foo, Bar]): ...
38+
39+
Args:
40+
func: (optional) bare-decorator form support.
41+
pubsub: Name of the Dapr pub/sub component (required when used with args).
42+
topic: Topic name to subscribe to (required when used with args).
43+
dead_letter_topic: Optional dead-letter topic (defaults to f"{topic}_DEAD").
44+
broadcast: Optional flag you can use downstream for fan-out semantics.
45+
46+
Returns:
47+
The original function tagged with `_message_router_data`.
48+
"""
49+
50+
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
51+
# Validate required kwargs only when decorator is used with args
52+
if pubsub is None or topic is None:
53+
raise ValueError("`pubsub` and `topic` are required when using @message_router with arguments.")
54+
55+
sig = inspect.signature(f)
56+
if "message" not in sig.parameters:
57+
raise ValueError(f"'{f.__name__}' must have a 'message' parameter.")
58+
59+
# Resolve forward refs under PEP 563 / future annotations
60+
try:
61+
hints = get_type_hints(f, globalns=f.__globals__)
62+
except Exception:
63+
logger.debug("Failed to fully resolve type hints for %s", f.__name__, exc_info=True)
64+
hints = getattr(f, "__annotations__", {}) or {}
65+
66+
raw_hint = hints.get("message")
67+
if raw_hint is None:
68+
raise TypeError(
69+
f"'{f.__name__}' must type-hint the 'message' parameter "
70+
"(e.g., 'message: MyModel' or 'message: Union[A, B]')"
71+
)
72+
73+
models = extract_message_models(raw_hint)
74+
if not models:
75+
raise TypeError(f"Unsupported or unresolved message type for '{f.__name__}': {raw_hint!r}")
76+
77+
# Optional early validation of supported schema kinds
78+
for m in models:
79+
if not is_supported_model(m):
80+
raise TypeError(f"Unsupported model type in '{f.__name__}': {m!r}")
81+
82+
data = {
83+
"pubsub": pubsub,
84+
"topic": topic,
85+
"dead_letter_topic": dead_letter_topic or (f"{topic}_DEAD" if topic else None),
86+
"is_broadcast": broadcast,
87+
"message_schemas": models, # list[type]
88+
"message_types": [m.__name__ for m in models], # list[str]
89+
}
90+
91+
# Attach metadata; deepcopy for defensive isolation
92+
setattr(f, "_is_message_handler", True)
93+
setattr(f, "_message_router_data", deepcopy(data))
94+
95+
logger.debug(
96+
"@message_router: '%s' => models %s (topic=%s, pubsub=%s, broadcast=%s)",
97+
f.__name__, [m.__name__ for m in models], topic, pubsub, broadcast
98+
)
99+
return f
100+
101+
# Support both @message_router(...) and bare @message_router usage
102+
return decorator if func is None else decorator(func)

0 commit comments

Comments
 (0)