Skip to content

Commit 9c8126b

Browse files
sicoyleyaron2
andauthored
feat: add readme + imgs + todos + cleanup (#136)
* feat: add readme + imgs + todos + cleanup Signed-off-by: Samantha Coyle <[email protected]> * style: lint things Signed-off-by: Samantha Coyle <[email protected]> * fix: address feedback Signed-off-by: Samantha Coyle <[email protected]> * style: rm comment Signed-off-by: Samantha Coyle <[email protected]> * style: update comments Signed-off-by: Samantha Coyle <[email protected]> * fix(build): tox -e ruff Signed-off-by: Samantha Coyle <[email protected]> --------- Signed-off-by: Samantha Coyle <[email protected]> Co-authored-by: Yaron Schneider <[email protected]>
1 parent c2eff2b commit 9c8126b

File tree

12 files changed

+96
-23
lines changed

12 files changed

+96
-23
lines changed

dapr_agents/agent/actor/base.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ async def _on_activate(self) -> None:
3838
if not has_state:
3939
# Initialize state with default values if it doesn't exist
4040
logger.info(f"Initializing state for {self.actor_id}")
41-
self.state = AgentActorState(overall_status=AgentStatus.IDLE)
41+
self.agent_state = AgentActorState(status=AgentStatus.IDLE)
4242
await self._state_manager.set_state(
43-
self.agent_state_key, self.state.model_dump()
43+
self.agent_state_key, self.agent_state.model_dump()
4444
)
4545
await self._state_manager.save_state()
4646
else:
4747
# Load existing state
4848
logger.info(f"Loading existing state for {self.actor_id}")
4949
logger.debug(f"Existing state for {self.actor_id}: {state_data}")
50-
self.state = AgentActorState(**state_data)
50+
self.agent_state = AgentActorState(**state_data)
5151

5252
async def _on_deactivate(self) -> None:
5353
"""
@@ -61,9 +61,9 @@ async def set_status(self, status: AgentStatus) -> None:
6161
"""
6262
Sets the current operational status of the agent and saves the state.
6363
"""
64-
self.state.overall_status = status
64+
self.agent_state.status = status
6565
await self._state_manager.set_state(
66-
self.agent_state_key, self.state.model_dump()
66+
self.agent_state_key, self.agent_state.model_dump()
6767
)
6868
await self._state_manager.save_state()
6969

@@ -101,11 +101,11 @@ async def invoke_task(self, task: Optional[str] = None) -> str:
101101
input=task_entry_input,
102102
status=AgentTaskStatus.IN_PROGRESS,
103103
)
104-
self.state.task_history.append(task_entry)
104+
self.agent_state.task_history.append(task_entry)
105105

106106
# Save initial task state with IN_PROGRESS status
107107
await self._state_manager.set_state(
108-
self.agent_state_key, self.state.model_dump()
108+
self.agent_state_key, self.agent_state.model_dump()
109109
)
110110
await self._state_manager.save_state()
111111

@@ -134,7 +134,7 @@ async def invoke_task(self, task: Optional[str] = None) -> str:
134134
finally:
135135
# Ensure the final state of the task is saved
136136
await self._state_manager.set_state(
137-
self.agent_state_key, self.state.model_dump()
137+
self.agent_state_key, self.agent_state.model_dump()
138138
)
139139
await self._state_manager.save_state()
140140
# Revert the agent's status to idle
@@ -152,12 +152,12 @@ async def add_message(self, message: Union[AgentActorMessage, dict]) -> None:
152152
message = AgentActorMessage(**message)
153153

154154
# Add the new message to the state
155-
self.state.messages.append(message)
156-
self.state.message_count += 1
155+
self.agent_state.messages.append(message)
156+
self.agent_state.message_count += 1
157157

158158
# Save state back to Dapr
159159
await self._state_manager.set_state(
160-
self.agent_state_key, self.state.model_dump()
160+
self.agent_state_key, self.agent_state.model_dump()
161161
)
162162
await self._state_manager.save_state()
163163

@@ -178,7 +178,6 @@ async def get_messages(self) -> List[dict]:
178178
# Return the list of messages as dictionaries (timestamp will be automatically serialized to ISO format)
179179
return [message.model_dump() for message in state.messages]
180180
except ValidationError as e:
181-
# Handle validation errors
182181
print(f"Validation error: {e}")
183182
return []
184183
return []

dapr_agents/agent/patterns/react/base.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class ReActAgent(AgentBase):
3636

3737
model_config = ConfigDict(arbitrary_types_allowed=True)
3838

39+
# TODO: build this from a template file
40+
# TODO: tool for evaluating prompt updates here and catching err drift
3941
def construct_system_prompt(self) -> str:
4042
"""
4143
Constructs a system prompt in the ReAct reasoning-action format based on the agent's attributes and tools.
@@ -54,7 +56,6 @@ def construct_system_prompt(self) -> str:
5456
prompt_parts.append("## Role\nYour role is {{role}}.")
5557
prompt_parts.append("## Goal\n{{goal}}.")
5658

57-
# Append instructions if provided
5859
if self.instructions:
5960
prompt_parts.append("## Instructions\n{{instructions}}")
6061

@@ -68,7 +69,7 @@ def construct_system_prompt(self) -> str:
6869
tools_section.rstrip()
6970
) # Trim any trailing newlines from tools_section
7071

71-
# Additional Guidelines
72+
# TODO: move this to a separate file
7273
additional_guidelines = textwrap.dedent(
7374
"""
7475
If you think about using tool, it must use the correct tool JSON blob format as shown below:
@@ -83,6 +84,7 @@ def construct_system_prompt(self) -> str:
8384
prompt_parts.append(additional_guidelines)
8485

8586
# ReAct specific guidelines
87+
# TODO: move this to a separate file
8688
react_guidelines = textwrap.dedent(
8789
"""
8890
## ReAct Format
@@ -157,7 +159,6 @@ async def run(self, input_data: Optional[Union[str, Dict[str, Any]]] = None) ->
157159
if user_message:
158160
self.text_formatter.print_message(user_message)
159161

160-
# Get Tool Names to validate tool selection
161162
available_tools = self.tool_executor.get_tool_names()
162163

163164
# Initialize react_loop for iterative reasoning
@@ -215,12 +216,11 @@ async def run(self, input_data: Optional[Union[str, Dict[str, Any]]] = None) ->
215216
"No action specified; continuing with further reasoning."
216217
)
217218
react_loop += f"Thought:{thought_action}\n"
218-
continue # Proceed to the next iteration
219+
continue
219220

220221
action_name = action["name"]
221222
action_args = action["arguments"]
222223

223-
# Print Action
224224
self.text_formatter.print_react_part("Action", json.dumps(action))
225225

226226
if action_name not in available_tools:
@@ -229,7 +229,6 @@ async def run(self, input_data: Optional[Union[str, Dict[str, Any]]] = None) ->
229229
logger.info(f"Executing {action_name} with arguments {action_args}")
230230
result = await self.tool_executor.run_tool(action_name, **action_args)
231231

232-
# Print Observation
233232
self.text_formatter.print_react_part("Observation", result)
234233
react_loop += f"Thought:{thought_action}\nAction:{json.dumps(action)}\nObservation:{result}\n"
235234

dapr_agents/service/fastapi/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ def model_post_init(self, __context: Any) -> None:
5858
lifespan=self.lifespan,
5959
)
6060

61-
# Configure CORS settings
6261
self.app.add_middleware(
6362
CORSMiddleware,
6463
allow_origins=self.cors_origins,

dapr_agents/types/agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,6 @@ class AgentActorState(BaseModel):
7878
task_history: Optional[List[AgentTaskEntry]] = Field(
7979
default_factory=list, description="History of tasks the agent has performed"
8080
)
81-
overall_status: AgentStatus = Field(
81+
status: AgentStatus = Field(
8282
AgentStatus.IDLE, description="Current operational status of the agent"
8383
)

dapr_agents/workflow/agentic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class AgenticWorkflow(WorkflowApp, DaprPubSub, MessageRoutingMixin):
7474
agents_registry_key: str = Field(
7575
default="agents_registry", description="Key for agents registry in state store."
7676
)
77+
# TODO: test this is respected by runtime.
7778
max_iterations: int = Field(
7879
default=20, description="Maximum iterations for workflows.", ge=1
7980
)
@@ -109,6 +110,7 @@ def model_post_init(self, __context: Any) -> None:
109110
self._text_formatter = ColorTextFormatter()
110111

111112
# Initialize state store client (used for persisting workflow state to Dapr)
113+
# Why make a state store client and dapr_client here?
112114
self._state_store_client = DaprStateStore(store_name=self.state_store_name)
113115
logger.info(f"State store '{self.state_store_name}' initialized.")
114116

dapr_agents/workflow/base.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class WorkflowApp(BaseModel):
4444
llm: Optional[ChatClientBase] = Field(
4545
default=None, description="The default LLM client for all LLM-based tasks."
4646
)
47+
# TODO: I think this should be within the wf client or wf runtime...?
4748
timeout: int = Field(
4849
default=300,
4950
description="Default timeout duration in seconds for workflow tasks.",
@@ -181,6 +182,21 @@ def wrapper(ctx: WorkflowActivityContext, *args, **kwargs):
181182

182183
return wrapper
183184

185+
# TODO: workflow discovery can also come from dapr runtime
186+
# Python workflows can be registered in a variety of ways, and we need to support all of them.
187+
# This supports decorator-based registration;
188+
# however, there is also manual registration approach.
189+
# See example below:
190+
# def setup_workflow_runtime():
191+
# wf_runtime = WorkflowRuntime()
192+
# wf_runtime.register_workflow(order_processing_workflow)
193+
# wf_runtime.register_workflow(fulfillment_workflow)
194+
# wf_runtime.register_activity(process_payment)
195+
# wf_runtime.register_activity(send_notification)
196+
# return wf_runtime
197+
198+
# runtime = setup_workflow_runtime()
199+
# runtime.start()
184200
def _discover_workflows(self) -> Dict[str, Callable]:
185201
"""Gather all @workflow-decorated functions and methods."""
186202
module = sys.modules["__main__"]
@@ -236,7 +252,8 @@ def register_agent(
236252
key (str): The key to update.
237253
data (dict): The data to update the store with.
238254
"""
239-
# retry the entire operation up to ten times sleeping 1 second between each attempt
255+
# retry the entire operation up to ten times sleeping 1 second between each
256+
# TODO: rm the custom retry logic here and use the DaprClient retry_policy instead.
240257
for attempt in range(1, 11):
241258
try:
242259
response: StateResponse = self.client.get_state(
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Orchestrators
2+
3+
Available Workflow options to orchestrate communication between agents:
4+
5+
- LLM-based: Uses a large language model (e.g., GPT-4o) to determine the most suitable agent based on the message and context.
6+
- Random: Selects an agent randomly for each task.
7+
- RoundRobin: Cycles through agents in a fixed order, ensuring each agent gets an equal opportunity to process tasks.
8+
9+
## Visual representation of each orchestration option:
10+
![Orchestrator workflows visualized](./orchestratorWorkflows.png)

dapr_agents/workflow/orchestrators/llm/orchestrator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ def model_post_init(self, __context: Any) -> None:
6060

6161
@message_router
6262
@workflow(name="LLMWorkflow")
63+
# TODO: set retry policies on the activities!
64+
# TODO: utilize prompt verdict value of failed as we do not currently use.
65+
# https://github.com/dapr/dapr-agents/pull/136#discussion_r2175751545
6366
def main_workflow(self, ctx: DaprWorkflowContext, message: TriggerAction):
6467
"""
6568
Executes an LLM-driven agentic workflow where the next agent is dynamically selected
@@ -710,6 +713,7 @@ async def finish_workflow(
710713
# Store the final summary and verdict in workflow state
711714
await self.update_workflow_state(instance_id=instance_id, final_output=summary)
712715

716+
# TODO: this should be a compensating activity called in the event of an error from any other activity.
713717
async def update_workflow_state(
714718
self,
715719
instance_id: str,
1.83 MB
Loading

dapr_agents/workflow/orchestrators/orchestratorWorkflows.svg

Lines changed: 31 additions & 0 deletions
Loading

0 commit comments

Comments
 (0)