Skip to content

Commit c4bed83

Browse files
committed
Added new message router workflow quickstart
Signed-off-by: Roberto Rodriguez <[email protected]>
1 parent 160004e commit c4bed83

File tree

9 files changed

+898
-0
lines changed

9 files changed

+898
-0
lines changed

quickstarts/04-message-router-workflow/README.md

Lines changed: 539 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import logging
5+
import signal
6+
7+
import dapr.ext.workflow as wf
8+
from dapr.clients import DaprClient
9+
from dotenv import load_dotenv
10+
from handlers import start_blog_workflow
11+
from workflow import (
12+
blog_workflow,
13+
create_outline,
14+
write_post,
15+
)
16+
17+
from dapr_agents.workflow.utils.registration import register_message_handlers
18+
19+
load_dotenv()
20+
21+
logging.basicConfig(level=logging.INFO)
22+
logger = logging.getLogger(__name__)
23+
24+
25+
async def _wait_for_shutdown() -> None:
26+
"""Block until Ctrl+C or SIGTERM."""
27+
loop = asyncio.get_running_loop()
28+
stop = asyncio.Event()
29+
30+
def _set_stop(*_: object) -> None:
31+
stop.set()
32+
33+
try:
34+
loop.add_signal_handler(signal.SIGINT, _set_stop)
35+
loop.add_signal_handler(signal.SIGTERM, _set_stop)
36+
except NotImplementedError:
37+
# Windows fallback
38+
signal.signal(signal.SIGINT, lambda *_: _set_stop())
39+
signal.signal(signal.SIGTERM, lambda *_: _set_stop())
40+
41+
await stop.wait()
42+
43+
44+
async def main() -> None:
45+
runtime = wf.WorkflowRuntime()
46+
47+
runtime.register_workflow(blog_workflow)
48+
runtime.register_activity(create_outline)
49+
runtime.register_activity(write_post)
50+
51+
runtime.start()
52+
53+
try:
54+
with DaprClient() as client:
55+
# Wire streaming subscriptions for our router(s)
56+
closers = register_message_handlers(
57+
targets=[start_blog_workflow],
58+
dapr_client=client,
59+
)
60+
61+
try:
62+
await _wait_for_shutdown()
63+
finally:
64+
for close in closers:
65+
try:
66+
close()
67+
except Exception:
68+
logger.exception("Error while closing subscription")
69+
finally:
70+
runtime.shutdown()
71+
72+
73+
if __name__ == "__main__":
74+
try:
75+
asyncio.run(main())
76+
except KeyboardInterrupt:
77+
pass
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: openai
5+
spec:
6+
type: conversation.openai
7+
version: v1
8+
metadata:
9+
- name: key
10+
value: "{{OPENAI_API_KEY}}"
11+
- name: model
12+
value: gpt-5-mini
13+
- name: temperature
14+
value: 1
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: messagepubsub
5+
spec:
6+
type: pubsub.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: workflowstatestore
5+
spec:
6+
type: state.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
11+
- name: actorStateStore
12+
value: "true"
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
5+
import dapr.ext.workflow as wf
6+
from dapr.clients.grpc._response import TopicEventResponse
7+
from pydantic import BaseModel, Field
8+
9+
from dapr_agents.workflow.decorators.routers import message_router
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class StartBlogMessage(BaseModel):
15+
topic: str = Field(min_length=1, description="Blog topic/title")
16+
17+
18+
# Import the workflow after defining models to avoid circular import surprises
19+
from workflow import blog_workflow # noqa: E402
20+
21+
22+
@message_router(pubsub="messagepubsub", topic="blog.requests")
23+
def start_blog_workflow(message: StartBlogMessage) -> TopicEventResponse:
24+
"""
25+
Triggered by pub/sub. Validates payload via Pydantic and schedules the workflow.
26+
"""
27+
try:
28+
client = wf.DaprWorkflowClient()
29+
instance_id = client.schedule_new_workflow(
30+
workflow=blog_workflow,
31+
input=message.model_dump(),
32+
)
33+
logger.info("Scheduled blog_workflow instance=%s topic=%s", instance_id, message.topic)
34+
return TopicEventResponse("success")
35+
except Exception as exc: # transient infra error → retry
36+
logger.exception("Failed to schedule blog workflow: %s", exc)
37+
return TopicEventResponse("retry")
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import json
5+
import logging
6+
import os
7+
import random
8+
import signal
9+
import sys
10+
from typing import Any, Dict
11+
12+
from dapr.clients import DaprClient
13+
14+
# ---------------------------
15+
# Config via environment vars
16+
# ---------------------------
17+
PUBSUB_NAME = os.getenv("PUBSUB_NAME", "messagepubsub")
18+
TOPIC_NAME = os.getenv("TOPIC_NAME", "blog.requests")
19+
BLOG_TOPIC = os.getenv("BLOG_TOPIC", "AI Agents") # used when RAW_DATA is not provided
20+
RAW_DATA = os.getenv("RAW_DATA") # if set, must be a JSON object (string)
21+
CONTENT_TYPE = os.getenv("CONTENT_TYPE", "application/json")
22+
CE_TYPE = os.getenv("CLOUDEVENT_TYPE") # optional CloudEvent 'type' metadata
23+
24+
# Publish behavior
25+
PUBLISH_ONCE = os.getenv("PUBLISH_ONCE", "true").lower() in {"1", "true", "yes"}
26+
INTERVAL_SEC = float(os.getenv("INTERVAL_SEC", "0")) # used when PUBLISH_ONCE=false
27+
MAX_ATTEMPTS = int(os.getenv("MAX_ATTEMPTS", "8"))
28+
INITIAL_DELAY = float(os.getenv("INITIAL_DELAY", "0.5"))
29+
BACKOFF_FACTOR = float(os.getenv("BACKOFF_FACTOR", "2.0"))
30+
JITTER_FRAC = float(os.getenv("JITTER_FRAC", "0.2"))
31+
32+
# Optional warmup (give sidecar/broker a moment)
33+
STARTUP_DELAY = float(os.getenv("STARTUP_DELAY", "1.0"))
34+
35+
logger = logging.getLogger("publisher")
36+
37+
38+
async def _backoff_sleep(delay: float, jitter: float, factor: float) -> float:
39+
"""Sleep for ~delay seconds with ±jitter% randomness, then return the next delay."""
40+
actual = max(0.0, delay * (1 + random.uniform(-jitter, jitter)))
41+
if actual:
42+
await asyncio.sleep(actual)
43+
return delay * factor
44+
45+
46+
def _build_payload() -> Dict[str, Any]:
47+
"""
48+
Build the JSON payload:
49+
- if RAW_DATA is set → parse as JSON (must be an object)
50+
- else → {"topic": BLOG_TOPIC}
51+
"""
52+
if RAW_DATA:
53+
try:
54+
data = json.loads(RAW_DATA)
55+
except Exception as exc: # noqa: BLE001
56+
raise ValueError(f"Invalid RAW_DATA JSON: {exc}") from exc
57+
if not isinstance(data, dict):
58+
raise ValueError("RAW_DATA must be a JSON object")
59+
return data
60+
61+
return {"topic": BLOG_TOPIC}
62+
63+
64+
def _encode_payload(payload: Dict[str, Any]) -> bytes:
65+
"""Encode the payload as UTF-8 JSON bytes."""
66+
return json.dumps(payload, ensure_ascii=False).encode("utf-8")
67+
68+
69+
async def publish_once(client: DaprClient, payload: Dict[str, Any]) -> None:
70+
"""Publish once with retries and exponential backoff."""
71+
delay = INITIAL_DELAY
72+
body = _encode_payload(payload)
73+
74+
for attempt in range(1, MAX_ATTEMPTS + 1):
75+
try:
76+
logger.info("publish attempt %d → %s/%s", attempt, PUBSUB_NAME, TOPIC_NAME)
77+
client.publish_event(
78+
pubsub_name=PUBSUB_NAME,
79+
topic_name=TOPIC_NAME,
80+
data=body,
81+
data_content_type=CONTENT_TYPE,
82+
publish_metadata=({"cloudevent.type": CE_TYPE} if CE_TYPE else None),
83+
)
84+
logger.info("published successfully")
85+
return
86+
except Exception as exc: # noqa: BLE001
87+
logger.warning("publish failed: %s", exc)
88+
if attempt == MAX_ATTEMPTS:
89+
raise
90+
logger.info("retrying in ~%.2fs …", delay)
91+
delay = await _backoff_sleep(delay, JITTER_FRAC, BACKOFF_FACTOR)
92+
93+
94+
async def main() -> int:
95+
logging.basicConfig(level=logging.INFO)
96+
stop_event = asyncio.Event()
97+
98+
# Signal-aware shutdown
99+
loop = asyncio.get_running_loop()
100+
101+
def _stop(*_: object) -> None:
102+
stop_event.set()
103+
104+
try:
105+
loop.add_signal_handler(signal.SIGINT, _stop)
106+
loop.add_signal_handler(signal.SIGTERM, _stop)
107+
except NotImplementedError:
108+
signal.signal(signal.SIGINT, lambda *_: _stop())
109+
signal.signal(signal.SIGTERM, lambda *_: _stop())
110+
111+
# Optional warmup
112+
if STARTUP_DELAY > 0:
113+
await asyncio.sleep(STARTUP_DELAY)
114+
115+
payload = _build_payload()
116+
logger.info("payload: %s", payload)
117+
118+
try:
119+
with DaprClient() as client:
120+
if PUBLISH_ONCE:
121+
await publish_once(client, payload)
122+
# brief wait so logs flush nicely under dapr
123+
await asyncio.sleep(0.2)
124+
return 0
125+
126+
# periodic mode
127+
if INTERVAL_SEC <= 0:
128+
logger.error("INTERVAL_SEC must be > 0 when PUBLISH_ONCE=false")
129+
return 2
130+
131+
logger.info("starting periodic publisher every %.2fs", INTERVAL_SEC)
132+
while not stop_event.is_set():
133+
try:
134+
await publish_once(client, payload)
135+
except Exception as exc: # noqa: BLE001
136+
logger.error("giving up after %d attempts: %s", MAX_ATTEMPTS, exc)
137+
138+
# wait for next tick or shutdown
139+
try:
140+
await asyncio.wait_for(stop_event.wait(), timeout=INTERVAL_SEC)
141+
except asyncio.TimeoutError:
142+
pass
143+
144+
logger.info("shutdown requested; exiting")
145+
return 0
146+
147+
except KeyboardInterrupt:
148+
return 130
149+
except Exception as exc: # noqa: BLE001
150+
logger.exception("fatal error: %s", exc)
151+
return 1
152+
153+
154+
if __name__ == "__main__":
155+
sys.exit(asyncio.run(main()))
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from __future__ import annotations
2+
3+
from dapr.ext.workflow import DaprWorkflowContext
4+
from dotenv import load_dotenv
5+
6+
from dapr_agents.llm.dapr import DaprChatClient
7+
from dapr_agents.workflow.decorators import llm_activity
8+
9+
load_dotenv()
10+
11+
# Initialize the LLM client and workflow runtime
12+
llm = DaprChatClient(component_name="openai")
13+
14+
15+
def blog_workflow(ctx: DaprWorkflowContext, wf_input: dict) -> str:
16+
"""
17+
Workflow input must be JSON-serializable. We accept a dict like:
18+
{"topic": "<string>"}
19+
"""
20+
topic = wf_input["topic"]
21+
outline = yield ctx.call_activity(create_outline, input={"topic": topic})
22+
post = yield ctx.call_activity(write_post, input={"outline": outline})
23+
return post
24+
25+
26+
@llm_activity(
27+
prompt="Create a short outline about {topic}. Output 3-5 bullet points.",
28+
llm=llm,
29+
)
30+
async def create_outline(ctx, topic: str) -> str:
31+
# Implemented by the decorator; body can be empty.
32+
pass
33+
34+
35+
@llm_activity(
36+
prompt="Write a short blog post following this outline:\n{outline}",
37+
llm=llm,
38+
)
39+
async def write_post(ctx, outline: str) -> str:
40+
# Implemented by the decorator; body can be empty.
41+
pass

quickstarts/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,19 @@ This quickstart demonstrates how to design and run **agent-based workflows**, st
117117

118118
[Go to Agent-based Workflow Patterns](./04-agent-based-workflows/)
119119

120+
### Message Router Workflow
121+
122+
Learn how to trigger Dapr Workflows via Pub/Sub messages using the `@message_router` decorator.
123+
This pattern connects event-driven systems with LLM-powered workflows, validating and routing structured messages to durable workflow executions.
124+
125+
- **Event-Driven Orchestration**: Start workflows automatically when messages arrive on a topic
126+
- **Edge Validation**: Enforce schema integrity with Pydantic before invoking workflows
127+
- **Seamless Integration**: Combine Dapr Pub/Sub, Workflow Runtime, and LLM activities for resilient automation
128+
129+
This quickstart demonstrates how to design a message-driven workflow where each published event triggers a workflow instance such as creating a blog post outline and draft powered by an LLM.
130+
131+
[Go to Message Router Workflow](./04-message-router-workflow/)
132+
120133
### Multi-Agent Workflows
121134

122135
Advanced example of event-driven workflows with multiple autonomous agents:

0 commit comments

Comments
 (0)