Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 28 additions & 67 deletions python/uagents-adapter/src/uagents_adapter/langchain/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
import time
from datetime import datetime
from typing import Any, Dict
import logging
import inspect

logger = logging.getLogger(__name__)

DEFAULT_PORT_RANGE = (8000, 9000) # inclusive start, exclusive end
HTTP_TIMEOUT = 5 # seconds


import requests
from langchain_core.callbacks import CallbackManagerForToolRun
Expand Down Expand Up @@ -121,6 +129,17 @@ def _find_available_port(
f"Could not find an available port in range {start_range}-{end_range}"
)


def _exec_langchain(agent: Any, query: str):
"""Try every known LangChain verb once and return the result."""
if inspect.iscoroutinefunction(agent):
return agent(query) # coroutine, caller must await
for m in ("arun", "ainvoke", "run", "invoke"):
if hasattr(agent, m):
return getattr(agent, m)(query)
return agent({"input": query}) # fallback for simple chains


def _langchain_to_uagent(
self,
agent_obj: Any,
Expand Down Expand Up @@ -190,29 +209,8 @@ async def handle_query(ctx: Context, sender: str, msg: QueryMessage):
try:
result = None

# Check if agent is a coroutine function (async function)
import inspect

if inspect.iscoroutinefunction(agent):
# Agent is async, await it
result = await agent(msg.query)
else:
# Try different sync methods
if hasattr(agent, "arun"):
# Try .arun() method first for async support
result = await agent.arun(msg.query)
elif hasattr(agent, "ainvoke"):
# Try .ainvoke() for newer async agent versions
result = await agent.ainvoke(msg.query)
elif hasattr(agent, "run"):
# Try .run() method (most common with agents)
result = agent.run(msg.query)
elif hasattr(agent, "invoke"):
# Try .invoke() for newer agent versions
result = agent.invoke(msg.query)
else:
# Fall back to direct call for chains
result = agent({"input": msg.query})
raw = _exec_langchain(agent, msg.query)
+ result = await raw if inspect.isawaitable(raw) else raw

# Handle different return types
if isinstance(result, dict):
Expand Down Expand Up @@ -269,27 +267,9 @@ async def handle_message(ctx: Context, sender: str, msg: ChatMessage):
# Check if agent is a coroutine function (async function)
import inspect

if inspect.iscoroutinefunction(agent):
# Agent is async, await it
result = await agent(item.text)
# Try different sync/async methods
elif hasattr(agent, "arun"):
# Try .arun() method first for async support
result = await agent.arun(item.text)
elif hasattr(agent, "ainvoke"):
# Try .ainvoke() for newer async agent versions
result = await agent.ainvoke(item.text)
elif hasattr(agent, "invoke"):
result = agent.invoke(item.text)
elif hasattr(agent, "run"):
result = agent.run(item.text)
else:
result = agent({"input": item.text})
if isinstance(result, dict):
if "output" in result:
result = result["output"]
elif "text" in result:
result = result["text"]
raw = _exec_langchain(agent, msg.query)
result = await raw if inspect.isawaitable(raw) else raw


await ctx.send(sender, create_text_chat(str(result)))
except Exception as e:
Expand Down Expand Up @@ -359,27 +339,8 @@ async def handle_structured_output_response(
# Check if agent is a coroutine function (async function)
import inspect

if inspect.iscoroutinefunction(agent):
# Agent is async, await it
result = await agent(query.query)
# Try different sync/async methods
elif hasattr(agent, "arun"):
# Try .arun() method first for async support
result = await agent.arun(query.query)
elif hasattr(agent, "ainvoke"):
# Try .ainvoke() for newer async agent versions
result = await agent.ainvoke(query.query)
elif hasattr(agent, "invoke"):
result = agent.invoke(query.query)
elif hasattr(agent, "run"):
result = agent.run(query.query)
else:
result = agent({"input": query.query})
if isinstance(result, dict):
if "output" in result:
result = result["output"]
elif "text" in result:
result = result["text"]
raw = _exec_langchain(agent, query.query)
result = await raw if inspect.isawaitable(raw) else raw

await ctx.send(session_sender, create_text_chat(str(result)))
except Exception as e:
Expand Down Expand Up @@ -464,7 +425,7 @@ def _register_agent_with_agentverse(self, agent_info):

try:
connect_response = requests.post(
connect_url, json=connect_payload, headers=headers
connect_url, json=connect_payload, headers=headers, timeout=HTTP_TIMEOUT
)
if connect_response.status_code == 200:
print(f"Successfully connected agent '{name}' to Agentverse")
Expand Down Expand Up @@ -508,7 +469,7 @@ class ResponseMessage(Model):

try:
update_response = requests.put(
update_url, json=update_payload, headers=headers
update_url, json=update_payload, headers=headers, timeout=HTTP_TIMEOUT
)
if update_response.status_code == 200:
print(f"Successfully updated agent '{name}' README on Agentverse")
Expand Down
Loading