Skip to content

Commit be765a7

Browse files
committed
make lint happy
Signed-off-by: Roberto Rodriguez <[email protected]>
1 parent c4bed83 commit be765a7

File tree

7 files changed

+48
-24
lines changed

7 files changed

+48
-24
lines changed

dapr_agents/workflow/decorators/messaging.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
"Union types, forward references, and explicit Dapr workflow integration."
1717
)
1818

19+
1920
def message_router(
2021
func: Optional[Callable[..., Any]] = None,
2122
*,
@@ -46,13 +47,12 @@ def message_router(
4647
"""
4748

4849
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
49-
5050
warnings.warn(
5151
_MESSAGE_ROUTER_DEPRECATION_MESSAGE,
5252
DeprecationWarning,
5353
stacklevel=2,
5454
)
55-
55+
5656
is_workflow = hasattr(f, "_is_workflow")
5757
workflow_name = getattr(f, "_workflow_name", None)
5858

@@ -74,7 +74,8 @@ def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
7474

7575
logger.debug(
7676
"@message_router (legacy): '%s' => models %s",
77-
f.__name__, [m.__name__ for m in message_models],
77+
f.__name__,
78+
[m.__name__ for m in message_models],
7879
)
7980

8081
# Attach metadata for later registration

dapr_agents/workflow/decorators/routers.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ def on_event(message: Union[Foo, Bar]): ...
5050
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
5151
# Validate required kwargs only when decorator is used with args
5252
if pubsub is None or topic is None:
53-
raise ValueError("`pubsub` and `topic` are required when using @message_router with arguments.")
53+
raise ValueError(
54+
"`pubsub` and `topic` are required when using @message_router with arguments."
55+
)
5456

5557
sig = inspect.signature(f)
5658
if "message" not in sig.parameters:
@@ -60,7 +62,9 @@ def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
6062
try:
6163
hints = get_type_hints(f, globalns=f.__globals__)
6264
except Exception:
63-
logger.debug("Failed to fully resolve type hints for %s", f.__name__, exc_info=True)
65+
logger.debug(
66+
"Failed to fully resolve type hints for %s", f.__name__, exc_info=True
67+
)
6468
hints = getattr(f, "__annotations__", {}) or {}
6569

6670
raw_hint = hints.get("message")
@@ -72,7 +76,9 @@ def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
7276

7377
models = extract_message_models(raw_hint)
7478
if not models:
75-
raise TypeError(f"Unsupported or unresolved message type for '{f.__name__}': {raw_hint!r}")
79+
raise TypeError(
80+
f"Unsupported or unresolved message type for '{f.__name__}': {raw_hint!r}"
81+
)
7682

7783
# Optional early validation of supported schema kinds
7884
for m in models:
@@ -82,10 +88,11 @@ def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
8288
data = {
8389
"pubsub": pubsub,
8490
"topic": topic,
85-
"dead_letter_topic": dead_letter_topic or (f"{topic}_DEAD" if topic else None),
91+
"dead_letter_topic": dead_letter_topic
92+
or (f"{topic}_DEAD" if topic else None),
8693
"is_broadcast": broadcast,
87-
"message_schemas": models, # list[type]
88-
"message_types": [m.__name__ for m in models], # list[str]
94+
"message_schemas": models, # list[type]
95+
"message_types": [m.__name__ for m in models], # list[str]
8996
}
9097

9198
# Attach metadata; deepcopy for defensive isolation
@@ -94,9 +101,13 @@ def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
94101

95102
logger.debug(
96103
"@message_router: '%s' => models %s (topic=%s, pubsub=%s, broadcast=%s)",
97-
f.__name__, [m.__name__ for m in models], topic, pubsub, broadcast
104+
f.__name__,
105+
[m.__name__ for m in models],
106+
topic,
107+
pubsub,
108+
broadcast,
98109
)
99110
return f
100111

101112
# Support both @message_router(...) and bare @message_router usage
102-
return decorator if func is None else decorator(func)
113+
return decorator if func is None else decorator(func)

dapr_agents/workflow/utils/registration.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ def _iter_handlers(obj: Any):
7373
schemas: List[Type[Any]] = meta.get("message_schemas") or []
7474

7575
# Bind method to instance if needed (descriptor protocol).
76-
bound = handler if owner is None else handler.__get__(owner, owner.__class__)
76+
bound = (
77+
handler if owner is None else handler.__get__(owner, owner.__class__)
78+
)
7779

7880
async def _invoke(
7981
bound_handler: Callable[..., Any],
@@ -92,20 +94,25 @@ def _make_handler(
9294
bound_handler: Callable[..., Any],
9395
) -> Callable[[SubscriptionMessage], TopicEventResponse]:
9496
"""Create a Dapr-compatible handler for a single decorated function."""
97+
9598
def handler_fn(message: SubscriptionMessage) -> TopicEventResponse:
9699
try:
97100
# 1) Extract payload + CloudEvent metadata (bytes/str/dict are also supported by the extractor)
98101
event_data, metadata = extract_cloudevent_data(message)
99102

100103
# 2) Validate against the first matching schema (or dict as fallback)
101104
parsed = None
102-
for model in (schemas or [dict]):
105+
for model in schemas or [dict]:
103106
try:
104107
parsed = validate_message_model(model, event_data)
105108
break
106109
except Exception:
107110
# Try the next schema; log at debug for signal without noise.
108-
logger.debug("Schema %r did not match payload; trying next.", model, exc_info=True)
111+
logger.debug(
112+
"Schema %r did not match payload; trying next.",
113+
model,
114+
exc_info=True,
115+
)
109116
continue
110117

111118
if parsed is None:
@@ -125,7 +132,9 @@ def handler_fn(message: SubscriptionMessage) -> TopicEventResponse:
125132

126133
# 4) Bridge worker thread → event loop
127134
if loop and loop.is_running():
128-
fut = asyncio.run_coroutine_threadsafe(_invoke(bound_handler, parsed), loop)
135+
fut = asyncio.run_coroutine_threadsafe(
136+
_invoke(bound_handler, parsed), loop
137+
)
129138
return fut.result()
130139
return asyncio.run(_invoke(bound_handler, parsed))
131140

@@ -144,4 +153,4 @@ def handler_fn(message: SubscriptionMessage) -> TopicEventResponse:
144153
)
145154
closers.append(close_fn)
146155

147-
return closers
156+
return closers

dapr_agents/workflow/utils/routers.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ def extract_message_models(type_hint: Any) -> list[type]:
3434
origin = get_origin(type_hint)
3535
if origin in (Union, types.UnionType): # handle both `Union[...]` and `A | B`
3636
return [
37-
t for t in get_args(type_hint)
38-
if t is not NoneType and isinstance(t, type)
37+
t for t in get_args(type_hint) if t is not NoneType and isinstance(t, type)
3938
]
4039

4140
return [type_hint] if isinstance(type_hint, type) else []
@@ -154,7 +153,9 @@ def extract_cloudevent_data(
154153
raise ValueError(f"Unexpected message type: {type(message)!r}")
155154

156155
if not isinstance(event_data, dict):
157-
logger.debug("Event data is not a dict (type=%s); value=%r", type(event_data), event_data)
156+
logger.debug(
157+
"Event data is not a dict (type=%s); value=%r", type(event_data), event_data
158+
)
158159

159160
return event_data, metadata
160161

quickstarts/04-message-router-workflow/app.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def _set_stop(*_: object) -> None:
4343

4444
async def main() -> None:
4545
runtime = wf.WorkflowRuntime()
46-
46+
4747
runtime.register_workflow(blog_workflow)
4848
runtime.register_activity(create_outline)
4949
runtime.register_activity(write_post)
@@ -74,4 +74,4 @@ async def main() -> None:
7474
try:
7575
asyncio.run(main())
7676
except KeyboardInterrupt:
77-
pass
77+
pass

quickstarts/04-message-router-workflow/handlers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ def start_blog_workflow(message: StartBlogMessage) -> TopicEventResponse:
3030
workflow=blog_workflow,
3131
input=message.model_dump(),
3232
)
33-
logger.info("Scheduled blog_workflow instance=%s topic=%s", instance_id, message.topic)
33+
logger.info(
34+
"Scheduled blog_workflow instance=%s topic=%s", instance_id, message.topic
35+
)
3436
return TopicEventResponse("success")
3537
except Exception as exc: # transient infra error → retry
3638
logger.exception("Failed to schedule blog workflow: %s", exc)
37-
return TopicEventResponse("retry")
39+
return TopicEventResponse("retry")

quickstarts/04-message-router-workflow/message_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,4 @@ def _stop(*_: object) -> None:
152152

153153

154154
if __name__ == "__main__":
155-
sys.exit(asyncio.run(main()))
155+
sys.exit(asyncio.run(main()))

0 commit comments

Comments
 (0)