Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 78 additions & 68 deletions camel/agents/chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1957,26 +1957,6 @@ def append_to_system_message(
if reset_memory:
self.init_messages()

def reset_system_message(
self, content: str, reset_memory: bool = True
) -> None:
"""Reset context to new system message.

Args:
content (str): The new system message.
reset_memory (bool):
Whether to reinitialize conversation messages after appending
additional context. Defaults to True.
"""
self._original_system_message = BaseMessage.make_system_message(
content
)
self._system_message = (
self._generate_system_message_for_output_language()
)
if reset_memory:
self.init_messages()

def reset_to_original_system_message(self) -> None:
r"""Reset system message to original, removing any appended context.

Expand Down Expand Up @@ -5247,20 +5227,11 @@ def to_openai_compatible_server(self) -> Any:
uvicorn.run(app, host="0.0.0.0", port=8000)
```
"""
import asyncio
import json
import time

from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel

# Define Pydantic models for request/response
class ChatMessage(BaseModel):
role: str
content: str = ""
name: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None

app = FastAPI(
title="CAMEL OpenAI-compatible API",
Expand All @@ -5275,6 +5246,12 @@ async def chat_completions(request_data: dict):
print(json.dumps(request_data, indent=2, ensure_ascii=False))
print("=" * 80)

# Reset agent state for stateless API behavior
# Each request should start with a clean slate
self.reset()
# Clear external tools from previous requests
self._external_tool_schemas.clear()

messages = request_data.get("messages", [])
model = request_data.get("model", "camel-model")
stream = request_data.get("stream", False)
Expand All @@ -5299,11 +5276,9 @@ async def chat_completions(request_data: dict):
)
current_user_message = user_msg
elif msg_role == "system":
sys_msg = BaseMessage.make_system_message(
role_name="System", content=msg_content
)
self.update_memory(sys_msg, OpenAIBackendRole.SYSTEM)
self.reset_system_message(msg_content, True)
# Update system message without clearing memory
# to preserve conversation history
self.update_system_message(msg_content, False)
elif msg_role == "assistant":
# Record previous assistant messages
assistant_msg = BaseMessage.make_assistant_message(
Expand Down Expand Up @@ -5436,57 +5411,92 @@ async def chat_completions(request_data: dict):
)

async def _stream_response(message: BaseMessage, request_data: dict):
# Start a separate task for the agent processing
agent_response = await self.astep(message)

if not agent_response.msgs:
# Stream an error message if no response
error_data = {'error': 'No response generated'}
yield f"data: {json.dumps(error_data)}\n\n"
return
# Use the agent's real streaming capability
model = request_data.get("model", "camel-model")
chunk_id = f'chatcmpl-{int(time.time())}'
created_time = int(time.time())

content = agent_response.msgs[0].content
# This provides a good streaming experience without complex
# token handling
words = content.split()

# Send the first event with model info
# Send the first chunk with role
first_chunk = {
'id': f'chatcmpl-{int(time.time())}',
'id': chunk_id,
'object': 'chat.completion.chunk',
'created': int(time.time()),
'model': request_data.get("model", "camel-model"),
'created': created_time,
'model': model,
'choices': [
{
'index': 0,
'delta': {'role': 'assistant'},
'delta': {'role': 'assistant', 'content': ''},
'finish_reason': None,
}
],
}
yield f"data: {json.dumps(first_chunk)}\n\n"

# Stream the content word by word
for i, word in enumerate(words):
# Add space before each word except the first
word_content = word if i == 0 else f" {word}"
word_chunk = {
# Stream chunks from the agent
accumulated_content = ""
last_response = None
try:
async for chunk_response in self._astream(message):
last_response = chunk_response
if chunk_response.msgs:
# Get the new content from this chunk
current_content = chunk_response.msgs[0].content
# Calculate delta content
if len(current_content) > len(accumulated_content):
delta_content = current_content[
len(accumulated_content) :
]
accumulated_content = current_content

# Send content chunk
content_chunk = {
'id': chunk_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': model,
'choices': [
{
'index': 0,
'delta': {'content': delta_content},
'finish_reason': None,
}
],
}
yield f"data: {json.dumps(content_chunk)}\n\n"

# Determine finish reason
finish_reason = 'stop'
if last_response and last_response.info.get(
'external_tool_call_requests'
):
finish_reason = 'tool_calls'

# Send the final chunk
final_chunk = {
'id': chunk_id,
'object': 'chat.completion.chunk',
'created': created_time,
'model': model,
'choices': [
{
'index': 0,
'delta': {'content': word_content},
'finish_reason': None,
'delta': {},
'finish_reason': finish_reason,
}
]
],
}
yield f"data: {json.dumps(word_chunk)}\n\n"
await asyncio.sleep(0.05) # Reasonable streaming speed
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"

# Send the final event
final_chunk = {
'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
# Stream error
error_chunk = {
'error': {
'message': str(e),
'type': 'server_error',
}
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"

return app
94 changes: 0 additions & 94 deletions camel/schemas/openai_schema.py

This file was deleted.