Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5208,3 +5208,248 @@
mcp_server.tool()(get_available_tools)

return mcp_server

@dependencies_required("fastapi")
def to_openai_compatible_server(self) -> Any:
r"""Create an OpenAI-compatible FastAPI server for this ChatAgent.
Returns:
FastAPI: A FastAPI application that can be served to provide
OpenAI-compatible API endpoints for this ChatAgent.
Example:
```python
agent = ChatAgent(model="gpt-4")
app = agent.to_openai_compatible_server()
# Serve with uvicorn
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
```
"""
import asyncio
import json
import time

from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel

# Define Pydantic models for request/response
class ChatMessage(BaseModel):
Comment on lines +5258 to +5259
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems defined but not used

role: str
content: str = ""
name: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None

app = FastAPI(
title="CAMEL OpenAI-compatible API",
description="OpenAI-compatible API for CAMEL ChatAgent",
)

@app.post("/v1/chat/completions")
async def chat_completions(request_data: dict):
try:
print("\n" + "=" * 80)
print(f"[{time.strftime('%H:%M:%S')}] 📨 Received Request:")
print(json.dumps(request_data, indent=2, ensure_ascii=False))
print("=" * 80)

messages = request_data.get("messages", [])
model = request_data.get("model", "camel-model")
stream = request_data.get("stream", False)
functions = request_data.get("functions")
tools = request_data.get("tools")
Comment on lines +5278 to +5282
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The agent's state persisted across different API requests, which would causing conversations from different clients to mix and memory leaks, would better to add self.reset() and self._external_tool_schemas.clear() at the start of each request?


# Convert OpenAI messages to CAMEL format and record in memory
current_user_message = None
for msg in messages:
msg_role = msg.get("role", "")
msg_content = msg.get("content", "")

if msg_role == "user":
user_msg = BaseMessage.make_user_message(
role_name="User", content=msg_content
)
# Record all but the last user message in memory
# The last user message will be passed to step()
if current_user_message is not None:
self.update_memory(
current_user_message, OpenAIBackendRole.USER
)
current_user_message = user_msg
elif msg_role == "assistant":
# Record previous assistant messages
assistant_msg = BaseMessage.make_assistant_message(
role_name="Assistant", content=msg_content
)
self.record_message(assistant_msg)
elif msg_role == "tool":
# Handle tool response messages if needed
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also support taking system message to align with OpenAI interface?


# Process tools/functions if provided
if tools or functions:
tools_to_use = tools if tools else functions
# Type guard to ensure tools_to_use is not None
if tools_to_use is not None:
for tool in tools_to_use:
self.add_external_tool(tool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems here would accumulate the tools added to agent, should we clean after one request ended?


# Get the response from the agent
if current_user_message is not None:
if stream:
return StreamingResponse(
_stream_response(
current_user_message, request_data
),
media_type="text/event-stream",
)
else:
agent_response = await self.astep(current_user_message)

# Convert CAMEL response to OpenAI format
if not agent_response.msgs:
# Empty response or error
content = "No response generated"
finish_reason = "error"
else:
content = agent_response.msgs[0].content
finish_reason = "stop"

# Check for tool calls
tool_calls_response = None
external_tool_requests = agent_response.info.get(
"external_tool_call_requests"
)
if external_tool_requests:
tool_calls_response = []
for tool_call in external_tool_requests:
tool_calls_response.append(
{
"id": (
tool_call.tool_call_id
or f"call_{int(time.time())}"
),
"type": "function",
"function": {
"name": tool_call.tool_name,
"arguments": json.dumps(
tool_call.args
),
},
}
)
finish_reason = "tool_calls"

usage = agent_response.info.get("token_usage") or {
"prompt_tokens": agent_response.info.get(
"prompt_tokens", 0
),
"completion_tokens": agent_response.info.get(
"completion_tokens", 0
),
"total_tokens": agent_response.info.get(
"total_tokens", 0
),
}

response = {
"id": agent_response.info.get(
"response_id", f"chatcmpl-{int(time.time())}"
),
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": (
content
if not tool_calls_response
else None
),
"tool_calls": tool_calls_response,
},
"finish_reason": finish_reason,
}
],
"usage": usage,
}

print(f"[{time.strftime('%H:%M:%S')}] 💬 Response:")
print(
json.dumps(response, indent=2, ensure_ascii=False)
)
print("=" * 80 + "\n")

return response
else:
# No user message provided
return JSONResponse(
status_code=400,
content={"error": "No user message provided"},
)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"Internal server error: {e!s}"},

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 7 days ago

The ideal fix is to avoid sending the exception string to the user and instead provide a generic error message such as "Internal server error." To maintain developer visibility, the exception (including stack trace) should be logged using the existing logging facility (get_logger).

Specifically:

  • In the except Exception as e: block on line 5432, change the response at line 5435 to have a generic message.
  • Add a line to log the exception using the project logger (logger.error), capturing the stack trace with exc_info=True.
  • Ensure logger is defined; from the code, get_logger is imported—so obtain a logger instance in this scope if not present.
  • Do not otherwise alter application logic.

All changes must be in the shown snippet(s), i.e., entirely within camel/agents/chat_agent.py.


Suggested changeset 1
camel/agents/chat_agent.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/camel/agents/chat_agent.py b/camel/agents/chat_agent.py
--- a/camel/agents/chat_agent.py
+++ b/camel/agents/chat_agent.py
@@ -5430,9 +5430,11 @@
                         content={"error": "No user message provided"},
                     )
             except Exception as e:
+                logger = get_logger()  # Ensure logger in scope
+                logger.error("Exception in chat_completions handler", exc_info=True)
                 return JSONResponse(
                     status_code=500,
-                    content={"error": f"Internal server error: {e!s}"},
+                    content={"error": "Internal server error"},
                 )
 
         async def _stream_response(message: BaseMessage, request_data: dict):
EOF
@@ -5430,9 +5430,11 @@
content={"error": "No user message provided"},
)
except Exception as e:
logger = get_logger() # Ensure logger in scope
logger.error("Exception in chat_completions handler", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": f"Internal server error: {e!s}"},
content={"error": "Internal server error"},
)

async def _stream_response(message: BaseMessage, request_data: dict):
Copilot is powered by AI and may make mistakes. Always verify output.
)

async def _stream_response(message: BaseMessage, request_data: dict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streaming implementation awaited the full response first, then split it into words and sent them one-by-one with artificial delays. This defeats the purpose of streaming

# Start a separate task for the agent processing
agent_response = await self.astep(message)

if not agent_response.msgs:
# Stream an error message if no response
error_data = {'error': 'No response generated'}
yield f"data: {json.dumps(error_data)}\n\n"
return

content = agent_response.msgs[0].content
# This provides a good streaming experience without complex
# token handling
words = content.split()

# Send the first event with model info
first_chunk = {
'id': f'chatcmpl-{int(time.time())}',
'object': 'chat.completion.chunk',
'created': int(time.time()),
'model': request_data.get("model", "camel-model"),
'choices': [
{
'index': 0,
'delta': {'role': 'assistant'},
'finish_reason': None,
}
],
}
yield f"data: {json.dumps(first_chunk)}\n\n"

# Stream the content word by word
for i, word in enumerate(words):
# Add space before each word except the first
word_content = word if i == 0 else f" {word}"
word_chunk = {
'choices': [
{
'index': 0,
'delta': {'content': word_content},
'finish_reason': None,
}
]
}
yield f"data: {json.dumps(word_chunk)}\n\n"
await asyncio.sleep(0.05) # Reasonable streaming speed

# Send the final event
final_chunk = {
'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"

return app
94 changes: 94 additions & 0 deletions camel/schemas/openai_schema.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems content in this file is not used in this PR

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========


# examples/agents/openai-server/openai_schema.py
from typing import Any, Dict, List, Literal, Optional, Union

from pydantic import BaseModel


class ChatCompletionMessage(BaseModel):
role: str
content: Optional[str] = None
name: Optional[str] = None
refusal: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None


class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatCompletionMessage]
audio: Optional[Dict[str, Any]] = None
frequency_penalty: Optional[float] = None
function_call: Optional[Dict[str, Any]] = None
functions: Optional[List[Dict[str, Any]]] = None
logit_bias: Optional[Dict[str, int]] = None
logprobs: Optional[bool] = None
max_completion_tokens: Optional[int] = None
max_tokens: Optional[int] = None
metadata: Optional[Dict[str, Any]] = None
modalities: Optional[List[Literal["text", "audio"]]] = None
n: Optional[int] = None
parallel_tool_calls: Optional[bool] = None
prediction: Optional[Dict[str, Any]] = None
presence_penalty: Optional[float] = None
prompt_cache_key: Optional[str] = None
reasoning_effort: Optional[str] = None
response_format: Optional[Dict[str, Any]] = None
safety_identifier: Optional[str] = None
seed: Optional[int] = None
service_tier: Optional[
Literal["auto", "default", "flex", "scale", "priority"]
] = None
stop: Optional[Union[str, List[str]]] = None
store: Optional[bool] = None
stream: Optional[bool] = False
stream_options: Optional[Dict[str, Any]] = None
temperature: Optional[float] = 1.0
tool_choice: Optional[Any] = None
tools: Optional[List[Dict[str, Any]]] = None
top_logprobs: Optional[int] = None
top_p: Optional[float] = 1.0
user: Optional[str] = None
verbosity: Optional[Literal["low", "medium", "high"]] = None
web_search_options: Optional[Dict[str, Any]] = None


class ChatCompletionChoice(BaseModel):
index: int
message: ChatCompletionMessage
finish_reason: Literal[
"stop", "length", "tool_calls", "content_filter", "function_call"
]


class ChatCompletionUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int


class ChatCompletionResponse(BaseModel):
id: str
object: Literal["chat.completion"] = "chat.completion"
created: int
model: str
choices: List[ChatCompletionChoice]
service_tier: Optional[
Literal["auto", "default", "flex", "scale", "priority"]
] = None
system_fingerprint: Optional[str] = None
usage: Optional[ChatCompletionUsage] = None
Loading
Loading