@@ -19,65 +19,69 @@ async def run_task(
19
19
ctx : Context , task : Task ,
20
20
) -> str :
21
21
"""Execute any list of steps needed to complete a task. Useful for anything the user wants to do."""
22
- state = await ctx .get ("state" , default = {})
23
- task = Task .model_validate (task )
24
22
25
- state = {
26
- "steps" : [
27
- {
28
- "description" : step .description ,
29
- "status" : "pending"
30
- }
31
- for step in task .steps
32
- ]
33
- }
23
+ async with ctx .store .edit_state () as global_state :
24
+ state = global_state .get ("state" , {})
25
+ task = Task .model_validate (task )
34
26
35
- # Send initial state snapshot
36
- ctx .write_event_to_stream (
37
- StateSnapshotWorkflowEvent (
38
- snapshot = state
39
- )
40
- )
41
-
42
- # Sleep for 1 second
43
- await asyncio .sleep (1.0 )
44
-
45
- # Create a copy to track changes for JSON patches
46
- previous_state = copy .deepcopy (state )
27
+ state = {
28
+ "steps" : [
29
+ {
30
+ "description" : step .description ,
31
+ "status" : "pending"
32
+ }
33
+ for step in task .steps
34
+ ]
35
+ }
47
36
48
- # Update each step and send deltas
49
- for i , step in enumerate (state ["steps" ]):
50
- step ["status" ] = "completed"
51
-
52
- # Generate JSON patch from previous state to current state
53
- patch = jsonpatch .make_patch (previous_state , state )
54
-
55
- # Send state delta event
37
+ # Send initial state snapshot
56
38
ctx .write_event_to_stream (
57
- StateDeltaWorkflowEvent (
58
- delta = patch . patch
39
+ StateSnapshotWorkflowEvent (
40
+ snapshot = state
59
41
)
60
42
)
61
-
62
- # Update previous state for next iteration
63
- previous_state = copy .deepcopy (state )
64
-
43
+
65
44
# Sleep for 1 second
66
45
await asyncio .sleep (1.0 )
67
46
68
- # Optionally send a final snapshot to the client
69
- ctx .write_event_to_stream (
70
- StateSnapshotWorkflowEvent (
71
- snapshot = state
47
+ # Create a copy to track changes for JSON patches
48
+ previous_state = copy .deepcopy (state )
49
+
50
+ # Update each step and send deltas
51
+ for i , step in enumerate (state ["steps" ]):
52
+ step ["status" ] = "completed"
53
+
54
+ # Generate JSON patch from previous state to current state
55
+ patch = jsonpatch .make_patch (previous_state , state )
56
+
57
+ # Send state delta event
58
+ ctx .write_event_to_stream (
59
+ StateDeltaWorkflowEvent (
60
+ delta = patch .patch
61
+ )
62
+ )
63
+
64
+ # Update previous state for next iteration
65
+ previous_state = copy .deepcopy (state )
66
+
67
+ # Sleep for 1 second
68
+ await asyncio .sleep (1.0 )
69
+
70
+ # Optionally send a final snapshot to the client
71
+ ctx .write_event_to_stream (
72
+ StateSnapshotWorkflowEvent (
73
+ snapshot = state
74
+ )
72
75
)
73
- )
74
76
75
- return "Done!"
77
+ global_state ["state" ] = state
78
+
79
+ return "Task Done!"
76
80
77
81
78
82
agentic_generative_ui_router = get_ag_ui_workflow_router (
79
83
llm = OpenAI (model = "gpt-4.1" ),
80
- frontend_tools = [run_task ],
84
+ backend_tools = [run_task ],
81
85
initial_state = {},
82
86
system_prompt = (
83
87
"You are a helpful assistant that can help the user with their task. "
0 commit comments