-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Expand file tree
/
Copy pathbasic_pipeline.py
More file actions
58 lines (42 loc) · 2.09 KB
/
basic_pipeline.py
File metadata and controls
58 lines (42 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Copyright (c) Microsoft. All rights reserved.
"""Basic sequential pipeline using the functional workflow API.
The simplest possible workflow: plain async functions orchestrated by @workflow.
No @step decorator needed — just write Python.
"""
import asyncio
from agent_framework import workflow
# These are plain async functions — no decorators needed.
# They run normally inside the workflow, just like any other Python function.
async def fetch_data(url: str) -> dict[str, str | int]:
"""Simulate fetching data from a URL."""
return {"url": url, "content": f"Data from {url}", "status": 200}
async def transform_data(data: dict[str, str | int]) -> str:
"""Transform raw data into a summary string."""
return f"[{data['status']}] {data['content']}"
# @workflow turns this async function into a FunctionalWorkflow object.
# Without it, this is just a normal async function. With it, you get:
# - .run() that returns a WorkflowRunResult with events and outputs
# - .run(stream=True) for streaming events in real time
# - .as_agent() to use this workflow anywhere an agent is expected
#
# The function's first parameter receives the input from .run("...").
# Add a `ctx: RunContext` parameter only if you need HITL, state, or custom events.
@workflow
async def data_pipeline(url: str) -> str:
"""A simple sequential data pipeline."""
raw = await fetch_data(url)
summary = await transform_data(raw)
# This is just a function — plain Python works between calls.
# No need to wrap every operation in a separate async function.
is_valid = len(summary) > 0 and "[200]" in summary
tag = "VALID" if is_valid else "INVALID"
# Returning a value automatically emits it as an output.
# Callers retrieve it via result.get_outputs().
return f"[{tag}] {summary}"
async def main():
# .run() is provided by @workflow — a plain async function wouldn't have it
result = await data_pipeline.run("https://example.com/api/data")
print("Output:", result.get_outputs()[0])
print("State:", result.get_final_state())
if __name__ == "__main__":
asyncio.run(main())