-
Notifications
You must be signed in to change notification settings - Fork 104
Description
Maybe I am doing something wrong but here is my code which gives correct answer but never stops.
LLM Orchestrator continously repeats following output:
== APP == INFO:dapr_agents.workflow.task:Invoking Task with LLM...
== APP == INFO:dapr_agents.llm.utils.request:Structured Mode Activated! Mode=json.
== APP == INFO:dapr_agents.llm.openai.chat:Invoking ChatCompletion API.
== APP == INFO:httpx:HTTP Request: POST https://******.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2025-01-01-preview "HTTP/1.1 200 OK"
== APP == INFO:dapr_agents.llm.openai.chat:Chat completion retrieved successfully.
== APP == INFO:dapr_agents.llm.utils.response:Structured output was successfully validated.
WARN[0103] dffe06c53d8746bea5a87a5208e1aff8::2::1: execution failed with a recoverable error and will be retried later: failed to invoke 'AddWorkflowEvent' method on workflow actor: error from worfklow actor: no such instance exists app_id=OrchestratorApp instance=ignatiev-hzfsnddv01 scope=dapr.runtime.actors.targets.workflow type=log ver=1.15.4
ERRO[0103] failed to invoke scheduled actor reminder named: run-activity due to: failed to invoke 'AddWorkflowEvent' method on workflow actor: error from worfklow actor: no such instance exists app_id=OrchestratorApp instance=ignatiev-hzfsnddv01 scope=dapr.runtime.scheduler.cluster type=log ver=1.15.4
It retries the same operation.
At orchestrator application launch I am getting also :
ERRO[0013] failed to invoke scheduled actor reminder named: run-activity due to: api error: code = FailedPrecondition desc = did not find address for actor app_id=OrchestratorApp instance=ignatiev-hzfsnddv01 scope=dapr.runtime.scheduler.cluster type=log ver=1.15.4
Messages and state
agents_registry:
{"calculator_assistant": {"role": "Calculator Assistant", "goal": "Assist Humans with calculation tasks.", "topic_name": "calculator_assistant", "pubsub_name": "pubsub", "orchestrator": false, "name": "calculator_assistant"}, "LLMOrchestrator": {"name": "LLMOrchestrator", "topic_name": "LLMOrchestrator", "pubsub_name": "pubsub", "orchestrator": true}}
LLMOrchestrator Topic:
{"cloudevent.source":"calculator_assistant","cloudevent.type":"AgentTaskResponse"}
{"data":{"content":"The result of the addition operation has been recorded:\n\n- Result: 2\n\n**Status:** Completed.\n\nNow, I will verify the accuracy of the result obtained from the addition operation. Let's move on to that step.","name":"calculator_assistant","role":"user","workflow_instance_id":"cfa4eeeebaac47e3b423b6f4a91ef8ec"},"datacontenttype":"application/json","id":"20a77efa-a93c-4421-9fe2-df886a019fc0","pubsubname":"pubsub","source":"calculator_assistant","specversion":"1.0","time":"2025-04-09T22:50:24+02:00","topic":"LLMOrchestrator","traceid":"00-09b2e7b70ff16f08314aacd300424d0d-a6c0107f20138e37-01","traceparent":"00-09b2e7b70ff16f08314aacd300424d0d-a6c0107f20138e37-01","tracestate":"","type":"AgentTaskResponse"}
DAPR Version
β ~ dapr --version
CLI version: 1.15.0
Runtime version: 1.15.4
Code
Calculator Agent:
from dapr_agents import tool
from dapr_agents.llm import OpenAIChatClient
from dapr_agents import AgentActor, AssistantAgent
from pydantic import BaseModel, Field
from dapr_agents import Agent
from dotenv import load_dotenv
import logging
import asyncio
import os
class AddSchema(BaseModel):
a: float = Field(description="first number to add")
b: float = Field(description="second number to add")
@tool(args_model=AddSchema)
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
class SubSchema(BaseModel):
a: float = Field(description="first number to subtract")
b: float = Field(description="second number to subtract")
@tool(args_model=SubSchema)
def sub(a: float, b: float) -> float:
"""Subtract two numbers."""
return a - b
async def main():
llm = OpenAIChatClient(
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION")
)
calculator_agent = Agent(
name="calculator_assistant",
role="Calculator Assistant",
llm=llm,
goal="Assist Humans with calculation tasks.",
instructions=[
"Get accurate calculation results",
"Break down the calculation into smaller steps.",
],
tools=[add, sub],
)
calculator_service = AgentActor(
agent=calculator_agent,
message_bus_name="pubsub",
agents_registry_key="agents_registry",
agents_registry_store_name="agentstatestore",
service_port=8002,
)
await calculator_service.start()
if __name__ == "__main__":
load_dotenv()
logging.basicConfig(level=logging.INFO)
asyncio.run(main())LLM orchestrator:
from dapr_agents import LLMOrchestrator
from dapr_agents.llm import OpenAIChatClient
from dotenv import load_dotenv
import asyncio
import logging
import os
async def main():
try:
llm = OpenAIChatClient(
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
)
workflow_service = LLMOrchestrator(
name="LLMOrchestrator",
llm=llm,
message_bus_name="pubsub",
state_store_name="workflowstatestore",
state_key="workflow_state",
agents_registry_store_name="agentstatestore",
agents_registry_key="agents_registry",
max_iterations=3,
).as_service(port=8004)
await workflow_service.start()
except Exception as e:
print(f"Error starting service: {e}")
if __name__ == "__main__":
load_dotenv()
logging.basicConfig(level=logging.INFO)
asyncio.run(main())Client:
#!/usr/bin/env python3
import json
import sys
import time
from dapr.clients import DaprClient
# Default Pub/Sub component
PUBSUB_NAME = "pubsub"
def main(orchestrator_topic, max_attempts=10, retry_delay=1):
"""
Publishes a task to a specified Dapr Pub/Sub topic with retries.
Args:
orchestrator_topic (str): The name of the orchestrator topic.
max_attempts (int): Maximum number of retry attempts.
retry_delay (int): Delay in seconds between attempts.
"""
task_message = {
"task": "What is 1 + 1?",
}
time.sleep(5)
attempt = 1
while attempt <= max_attempts:
try:
print(f"π’ Attempt {attempt}: Publishing to topic '{orchestrator_topic}'...")
with DaprClient() as client:
client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=orchestrator_topic,
data=json.dumps(task_message),
data_content_type="application/json",
publish_metadata={
"cloudevent.type": "TriggerAction",
}
)
print(f"β
Successfully published request to '{orchestrator_topic}'")
sys.exit(0)
except Exception as e:
print(f"β Request failed: {e}")
attempt += 1
print(f"β³ Waiting {retry_delay}s before next attempt...")
time.sleep(retry_delay)
print(f"β Maximum attempts ({max_attempts}) reached without success.")
sys.exit(1)
if __name__ == "__main__":
orchestrator_topic = 'LLMOrchestrator'
main(orchestrator_topic)