-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Closed as not planned
Labels
feature:chat-completionsfeature:lite-llmneeds-more-infoWaiting for a reply/more info from the authorWaiting for a reply/more info from the authorstale
Description
Describe the bug
Whatever I do I always encounter this error: agents.exceptions.MaxTurnsExceeded: Max turns (10) exceeded. I am returning dummy string and output type is correct. I did try to increase/decrease max_turns parameter too.
Debug information
openai -> 1.99.1
openai-agents -> 0.2.5
python -> 3..11.9
This is my code:
import asyncio, json
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
import httpx
import mlflow
from openai import AsyncOpenAI
from agents import Agent, OpenAIChatCompletionsModel, Runner, WebSearchTool, enable_verbose_stdout_logging, function_tool, set_default_openai_api, set_default_openai_client, set_trace_processors
from pydantic import BaseModel, Field
import requests
enable_verbose_stdout_logging()
set_trace_processors([]) # disable OpenAI tracing
# # Enable auto tracing for OpenAI Agents SDK
mlflow.openai.autolog()
# Optional: Set a tracking URI and an experiment
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("OpenAI Agent Plan")
class Step(BaseModel):
agent: str
input: Any = "last" # could be "last" or an object
class Plan(BaseModel):
stages: List[List[Step]] = Field(default_factory=list)
target_language: Optional[str] = None
class WebSearchResult(BaseModel):
result: str
local_client = AsyncOpenAI(
base_url= llama_3_3_url,
api_key="local",
http_client=httpx.AsyncClient(verify = False),
timeout=10
)
set_default_openai_client(local_client)
set_default_openai_api("chat_completions")
model = OpenAIChatCompletionsModel(model=llama_3_3_model_name, openai_client=local_client)
# Agent registry (add/remove freely)
@dataclass
class AgentSpec:
name: str
model: str
instructions: str
capabilities: List[str] # natural-language descriptors, e.g. ["retrieve fresh info from the web", "web search"]
input_kind: str # "text", "list[doc]", "text+lang", etc.
output_kind: str # "text", "list[doc]", etc.
tools: list = field(default_factory=list)
output_type: Any = str
def build(self) -> Agent:
return Agent(name=self.name, model=self.model, instructions=self.instructions, tools=self.tools, output_type=self.output_type)
REGISTRY: Dict[str, AgentSpec] = {}
def register(spec: AgentSpec): REGISTRY[spec.name] = spec
def remove(name: str): REGISTRY.pop(name, None)
@function_tool
def web_search_tool(query: str) -> str:
# Simulate a deterministic result
result = (
"AI-First apps becoming default; rise of AI agents; "
"multimodal reasoning in production; small specialized models; "
"governance & evals as first-class."
)
return WebSearchResult(result=result)
# Register ONLY the two you want for now
register(AgentSpec(
name="WebSearch",
model=model,
instructions=(
"Call web_search_tool exactly once with the user query. "
"Then RETURN ONLY the tool's string result as your final answer. "
"Do not add any extra words, formatting, or analysis. Do not call any other tools."
),
capabilities=[
"find fresh information on the internet",
"retrieve reports, news, rankings, analyses",
"produce a combined string of web search results"
],
input_kind="text",
output_kind="text",
output_type=WebSearchResult,
tools=[web_search_tool],
))
register(AgentSpec(
name="Translator",
model=model,
instructions="Translate given text into a specified target language. Preserve structure and bullets.",
capabilities=[
"translate text to a target language",
"preserve formatting while translating"
],
input_kind="text+target_language",
output_kind="text"
))
# ── Neutral, capability-driven planner (no agent-specific rules) ────────────
PLANNER = Agent(
name="Planner",
model=model,
instructions=(
"You design a minimal pipeline from a live registry of agents.\n"
"Inputs:\n"
" - user_query: free text\n"
" - target_language: optional language name (null if not requested)\n"
" - registry: list of agents with fields {name, capabilities[], input_kind, output_kind}\n\n"
"Constraints and objectives:\n"
" - Choose steps whose capabilities semantically satisfy the user intent.\n"
" - Ensure I/O compatibility between consecutive steps (output_kind → input_kind).\n"
" - Prefer the fewest steps that satisfy the intent; parallelize only if it reduces total work.\n"
" - If a step requires a parameter (e.g., target language), include it in the step input object.\n"
" - Do NOT rely on specific agent names; select by capability semantics.\n"
"Return ONLY valid JSON matching this schema: "
'{"stages":[[{"agent":"<name>","input":"last" or object}]],"target_language":string|null}'
),
output_type=Plan
)
async def run(agent: Agent, content: Any) -> Any:
if not isinstance(content, (str, list)):
content = json.dumps(content, ensure_ascii=False)
stream = Runner.run_streamed(agent, content)
async for _ in stream.stream_events(): pass
return stream.final_output
async def plan_and_execute(user_query: str, target_language: Optional[str] = None):
# Build real agents
built = {name: spec.build() for name, spec in REGISTRY.items()}
# Prepare registry view (names included for selection, but planner is told not to rely on them)
registry_view = [{
"name": spec.name,
"capabilities": spec.capabilities,
"input_kind": spec.input_kind,
"output_kind": spec.output_kind
} for spec in REGISTRY.values()]
plan_input = {
"user_query": user_query,
"target_language": target_language,
"registry": registry_view
}
plan_list = json.dumps(plan_input)
plan_json = await run(PLANNER, plan_list)
plan = json.loads(plan_json) if isinstance(plan_json, str) else plan_json
last = None
for stage in plan.stages:
tasks = []
for step in stage:
name = step.agent
payload = getattr(step, "input", "last")
if payload == "last":
payload = last
# Small helper to pass target language when required by capability
if name == "Translator" and isinstance(payload, dict) is False:
payload = {"text": last if last is not None else user_query,
"target_language": target_language or "en"}
tasks.append(run(built[name], payload))
results = await asyncio.gather(*tasks)
last = results if len(results) > 1 else results[0]
return {"plan": plan, "result": last}
# Demo
if __name__ == "__main__":
async def main():
# Example 1: “find and summarize” – with only two agents, planner should pick WebSearch;
# (No explicit summarizer registered yet.)
q1 = "what are the most popular AI trends now? find them and give a concise output"
o1 = await plan_and_execute(q1)
print("\n=== PLAN 1 ==="); print(o1["plan"])
print("\n=== RESULT 1 ==="); print(o1["result"])
asyncio.run(main())Metadata
Metadata
Assignees
Labels
feature:chat-completionsfeature:lite-llmneeds-more-infoWaiting for a reply/more info from the authorWaiting for a reply/more info from the authorstale