diff --git a/README.md b/README.md
index 2e711f17b..3febd32df 100644
--- a/README.md
+++ b/README.md
@@ -1,81 +1,119 @@
-
+# Codegen API Server
-
-
-
-
-
+A FastAPI server that provides a streaming interface to the Codegen SDK.
-
- The SWE that Never Sleeps
-
+## Features
-
+- Run Codegen agents with real-time status updates
+- Server-Sent Events (SSE) for live task progress
+- Step-by-step progress tracking
+- Configurable timeouts (default 30 minutes)
+- Heartbeat events to maintain connections
-[](https://pypi.org/project/codegen/)
-[](https://docs.codegen.com)
-[](https://community.codegen.com)
-[](https://github.com/codegen-sh/codegen-sdk/tree/develop?tab=Apache-2.0-1-ov-file)
-[](https://x.com/codegen)
+## Installation
-
+```bash
+pip install -r requirements.txt
+```
-
+## Environment Variables
-The Codegen SDK provides a programmatic interface to code agents provided by [Codegen](https://codegen.com).
+- `CODEGEN_ORG_ID`: Your Codegen organization ID
+- `CODEGEN_TOKEN`: Your Codegen API token
+- `SERVER_PORT`: Port to run the server on (default: 8000)
-```python
-from codegen.agents.agent import Agent
+## Usage
-# Initialize the Agent with your organization ID and API token
-agent = Agent(
- org_id="YOUR_ORG_ID", # Find this at codegen.com/developer
- token="YOUR_API_TOKEN", # Get this from codegen.com/developer
- # base_url="https://codegen-sh-rest-api.modal.run", # Optional - defaults to production
-)
+Start the server:
-# Run an agent with a prompt
-task = agent.run(prompt="Implement a new feature to sort users by last login.")
+```bash
+python backend/api.py
+```
-# Check the initial status
-print(task.status)
+### API Endpoints
-# Refresh the task to get updated status (tasks can take time)
-task.refresh()
+#### Run an Agent
-# Check the updated status
-print(task.status)
+```http
+POST /run
+Content-Type: application/json
-# Once task is complete, you can access the result
-if task.status == "completed":
- print(task.result) # Result often contains code, summaries, or links
+{
+ "prompt": "Your prompt here",
+ "thread_id": "optional_thread_id"
+}
```
-## Installation and Usage
+Response:
+```json
+{
+ "task_id": "task_123",
+ "thread_id": "thread_456"
+}
+```
-Install the SDK using pip or uv:
+#### Get Task Status
-```bash
-pip install codegen
-# or
-uv pip install codegen
+```http
+GET /status/{task_id}
```
-Get started at [codegen.com](https://codegen.com) and get your API token at [codegen.com/developer](https://codegen.com/developer).
+Response:
+```json
+{
+ "status": "in_progress",
+ "result": null,
+ "error": null
+}
+```
+
+#### Stream Task Events
-You can interact with your AI engineer via API, or chat with it in Slack, Linear, Github, or on our website.
+```http
+GET /events/{task_id}
+```
+
+Response (Server-Sent Events):
+```
+data: {"status": "in_progress", "task_id": "123", "current_step": "Analyzing code", "step_number": 1}
-## Resources
+data: {"status": "completed", "task_id": "123", "result": "Success"}
-- [Docs](https://docs.codegen.com)
-- [Getting Started](https://docs.codegen.com/introduction/getting-started)
-- [Contributing](CONTRIBUTING.md)
-- [Contact Us](https://codegen.com/contact)
+data: [DONE]
+```
+
+## Development
+
+Run tests:
+
+```bash
+pytest tests/
+```
+
+## Event Format
+
+Events are sent in Server-Sent Events (SSE) format with the following structure:
+
+```json
+{
+ "status": "in_progress",
+ "task_id": "123",
+ "timestamp": "2025-06-12T14:58:40Z",
+ "current_step": "Analyzing repository structure",
+ "step_number": 1
+}
+```
-## Contributing
+Status values:
+- `queued`: Task is queued
+- `in_progress`: Task is running
+- `completed`: Task completed successfully
+- `failed`: Task failed with error
+- `error`: Internal server error
-Please see our [Contributing Guide](CONTRIBUTING.md) for instructions on how to set up the development environment and submit contributions.
+## Timeouts and Heartbeats
-## Enterprise
+- Tasks timeout after 30 minutes (configurable via `max_retries`)
+- Heartbeat events sent every 5 seconds to keep connections alive
+- Failed tasks automatically cleaned up
-For more information on enterprise engagements, please [contact us](https://codegen.com/contact) or [request a demo](https://codegen.com/request-demo).
diff --git a/backend/api.py b/backend/api.py
new file mode 100644
index 000000000..388f4df75
--- /dev/null
+++ b/backend/api.py
@@ -0,0 +1,251 @@
+"""
+Backend API server using official Codegen SDK
+"""
+
+import asyncio
+import json
+import logging
+import os
+from datetime import datetime
+from typing import AsyncGenerator, Optional
+
+from fastapi import FastAPI, HTTPException
+from fastapi.middleware.cors import CORSMiddleware
+from fastapi.responses import StreamingResponse
+from pydantic import BaseModel
+
+from codegen.agents.agent import Agent
+
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+app = FastAPI()
+
+# Add CORS middleware
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+# Store active tasks
+active_tasks = {}
+
+class TaskRequest(BaseModel):
+ prompt: str
+ thread_id: Optional[str] = None
+
+class TaskResponse(BaseModel):
+ task_id: str
+ thread_id: Optional[str] = None
+
+class TaskStatusResponse(BaseModel):
+ status: str
+ result: Optional[str] = None
+ error: Optional[str] = None
+
+class AgentCallback:
+ def __init__(self, task_id: str, thread_id: Optional[str] = None):
+ self.task_id = task_id
+ self.thread_id = thread_id
+ self.queue = asyncio.Queue()
+ self.completed = False
+ self.error = None
+ self.current_step = 0
+ self.total_steps = 0
+
+ async def on_status_change(self, status: str, result: Optional[str] = None, error: Optional[str] = None, step_info: Optional[dict] = None):
+ """Handle status change events"""
+ event_data = {
+ "status": status,
+ "task_id": self.task_id,
+ "timestamp": datetime.now().isoformat()
+ }
+
+ if self.thread_id:
+ event_data["thread_id"] = self.thread_id
+
+ if result:
+ event_data["result"] = result
+ if error:
+ event_data["error"] = error
+
+ # Include step information if available
+ if step_info:
+ event_data.update(step_info)
+
+ # Format as proper SSE event
+ event_str = f"data: {json.dumps(event_data)}\n\n"
+ await self.queue.put(event_str)
+
+ if status in ["completed", "failed"]:
+ self.completed = True
+ if error:
+ self.error = error
+ # Send completion event
+ await self.queue.put("data: [DONE]\n\n")
+
+ async def get_events(self):
+ """Get events from the queue"""
+ try:
+ while not self.completed or not self.queue.empty():
+ try:
+ # Use timeout to prevent infinite waiting
+ event = await asyncio.wait_for(self.queue.get(), timeout=5.0)
+ yield event
+ except asyncio.TimeoutError:
+ # Send heartbeat to keep connection alive
+ yield ": heartbeat\n\n"
+ except Exception as e:
+ logger.error(f"Error getting events: {e}")
+ yield f"data: {json.dumps({'status': 'error', 'error': str(e)})}\n\n"
+ yield "data: [DONE]\n\n"
+ break
+ except Exception as e:
+ logger.error(f"Stream error: {e}")
+ yield f"data: {json.dumps({'status': 'error', 'error': str(e)})}\n\n"
+ yield "data: [DONE]\n\n"
+
+async def monitor_task(task, callback: AgentCallback):
+ """Monitor task status and trigger callbacks"""
+ try:
+ logger.info(f"Starting to monitor task {callback.task_id}")
+ max_retries = 900 # 30 minutes with 2-second intervals
+ retry_count = 0
+ last_step = None
+
+ while not callback.completed and retry_count < max_retries:
+ task.refresh()
+ status = task.status.lower() if task.status else "unknown"
+ logger.info(f"Task {callback.task_id} status: {status}")
+
+ # Extract step information from task
+ current_step = None
+ try:
+ # Try to get step information from task.result or task.summary
+ if hasattr(task, 'result') and isinstance(task.result, dict):
+ current_step = task.result.get('current_step')
+ elif hasattr(task, 'summary') and isinstance(task.summary, dict):
+ current_step = task.summary.get('current_step')
+ except Exception as e:
+ logger.warning(f"Could not extract step info: {e}")
+
+ # Only send update if step has changed
+ if current_step and current_step != last_step:
+ step_info = {
+ 'current_step': current_step,
+ 'step_number': callback.current_step + 1
+ }
+ await callback.on_status_change(status, step_info=step_info)
+ last_step = current_step
+ callback.current_step += 1
+
+ if status == "completed":
+ result = (
+ getattr(task, 'result', None) or
+ getattr(task, 'summary', None) or
+ getattr(task, 'output', None) or
+ "Task completed successfully."
+ )
+ logger.info(f"Task {callback.task_id} completed with result")
+ await callback.on_status_change("completed", result=result)
+ break
+ elif status == "failed":
+ error = getattr(task, 'error', None) or getattr(task, 'failure_reason', None) or 'Task failed'
+ logger.error(f"Task {callback.task_id} failed: {error}")
+ await callback.on_status_change("failed", error=error)
+ break
+ elif status != "unknown":
+ logger.info(f"Task {callback.task_id} status update: {status}")
+ await callback.on_status_change(status)
+
+ await asyncio.sleep(2) # Poll every 2 seconds
+ retry_count += 1
+
+ if retry_count >= max_retries:
+ logger.error(f"Task {callback.task_id} timed out after {max_retries * 2} seconds")
+ await callback.on_status_change("failed", error="Task timed out")
+
+ except Exception as e:
+ logger.error(f"Error monitoring task {callback.task_id}: {e}")
+ await callback.on_status_change("error", error=str(e))
+ finally:
+ # Clean up
+ active_tasks.pop(callback.task_id, None)
+ logger.info(f"Cleaned up task {callback.task_id}")
+
+@app.post("/run", response_model=TaskResponse)
+async def run_agent(request: TaskRequest):
+ """Run an agent with the given prompt"""
+ try:
+ # Initialize the agent
+ agent = Agent(
+ org_id=os.getenv("CODEGEN_ORG_ID"),
+ token=os.getenv("CODEGEN_TOKEN")
+ )
+
+ # Run the agent
+ task = agent.run(request.prompt)
+
+ if not task or not task.id:
+ raise HTTPException(status_code=500, detail="Failed to create task")
+
+ # Create callback and start monitoring
+ callback = AgentCallback(task.id, request.thread_id)
+ active_tasks[task.id] = (task, callback)
+
+ # Start monitoring task in background
+ asyncio.create_task(monitor_task(task, callback))
+
+ return TaskResponse(task_id=task.id, thread_id=request.thread_id)
+
+ except Exception as e:
+ logger.error(f"Error running agent: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+@app.get("/status/{task_id}")
+async def get_task_status(task_id: str) -> TaskStatusResponse:
+ """Get task status"""
+ try:
+ if task_id not in active_tasks:
+ raise HTTPException(status_code=404, detail="Task not found")
+
+ task, _ = active_tasks[task_id]
+ task.refresh()
+
+ return TaskStatusResponse(
+ status=task.status.lower() if task.status else "unknown",
+ result=getattr(task, 'result', None),
+ error=getattr(task, 'error', None)
+ )
+
+ except Exception as e:
+ logger.error(f"Error getting task status: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+@app.get("/events/{task_id}")
+async def get_task_events(task_id: str) -> StreamingResponse:
+ """Get SSE events for task status updates"""
+ try:
+ if task_id not in active_tasks:
+ raise HTTPException(status_code=404, detail="Task not found")
+
+ _, callback = active_tasks[task_id]
+
+ return StreamingResponse(
+ callback.get_events(),
+ media_type="text/event-stream"
+ )
+
+ except Exception as e:
+ logger.error(f"Error getting task events: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+if __name__ == "__main__":
+ import uvicorn
+ port = int(os.getenv("SERVER_PORT", "8000"))
+ uvicorn.run(app, host="0.0.0.0", port=port)
+
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 000000000..6b6c7ee31
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,8 @@
+fastapi>=0.68.0,<0.69.0
+uvicorn>=0.15.0,<0.16.0
+pydantic>=1.8.0,<2.0.0
+codegen>=0.1.0
+pytest>=7.0.0
+pytest-asyncio>=0.19.0
+httpx>=0.24.0 # Required by TestClient
+
diff --git a/tests/test_api.py b/tests/test_api.py
new file mode 100644
index 000000000..58d04032a
--- /dev/null
+++ b/tests/test_api.py
@@ -0,0 +1,187 @@
+"""
+Tests for the backend API server
+"""
+
+import asyncio
+import json
+import os
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from fastapi.testclient import TestClient
+
+from backend.api import app
+
+# Test client
+client = TestClient(app)
+
+# Mock environment variables
+os.environ["CODEGEN_ORG_ID"] = "test_org"
+os.environ["CODEGEN_TOKEN"] = "test_token"
+
+@pytest.fixture
+def mock_agent():
+ """Mock Agent class"""
+ with patch("backend.api.Agent") as mock:
+ yield mock
+
+@pytest.fixture
+def mock_task():
+ """Mock task object"""
+ task = MagicMock()
+ task.id = "test_task_id"
+ task.status = "in_progress"
+ task.result = {
+ "current_step": "Analyzing code"
+ }
+ return task
+
+def test_run_agent(mock_agent, mock_task):
+ """Test running an agent"""
+ # Setup mock
+ mock_agent.return_value.run.return_value = mock_task
+
+ # Test request
+ response = client.post(
+ "/run",
+ json={"prompt": "Test prompt", "thread_id": "test_thread"}
+ )
+
+ # Verify response
+ assert response.status_code == 200
+ data = response.json()
+ assert data["task_id"] == "test_task_id"
+ assert data["thread_id"] == "test_thread"
+
+ # Verify agent was called correctly
+ mock_agent.assert_called_once_with(
+ org_id="test_org",
+ token="test_token"
+ )
+ mock_agent.return_value.run.assert_called_once_with("Test prompt")
+
+def test_get_task_status(mock_task):
+ """Test getting task status"""
+ # Add mock task to active tasks
+ from backend.api import active_tasks
+ active_tasks["test_task_id"] = (mock_task, None)
+
+ # Test request
+ response = client.get("/status/test_task_id")
+
+ # Verify response
+ assert response.status_code == 200
+ data = response.json()
+ assert data["status"] == "in_progress"
+ assert data["result"] == {"current_step": "Analyzing code"}
+
+ # Clean up
+ active_tasks.clear()
+
+def test_get_task_status_not_found():
+ """Test getting status of non-existent task"""
+ response = client.get("/status/nonexistent_task")
+ assert response.status_code == 404
+
+@pytest.mark.asyncio
+async def test_get_task_events():
+ """Test getting task events"""
+ # Setup mock callback
+ mock_callback = MagicMock()
+ mock_callback.get_events = AsyncMock(return_value=[
+ 'data: {"status": "in_progress", "task_id": "test_task_id", "current_step": "Analyzing code"}\n\n',
+ 'data: {"status": "completed", "task_id": "test_task_id", "result": "Success"}\n\n',
+ 'data: [DONE]\n\n'
+ ])
+
+ # Add mock task to active tasks
+ from backend.api import active_tasks
+ active_tasks["test_task_id"] = (None, mock_callback)
+
+ # Test request
+ response = client.get("/events/test_task_id")
+
+ # Verify response
+ assert response.status_code == 200
+ assert response.headers["content-type"] == "text/event-stream"
+
+ # Clean up
+ active_tasks.clear()
+
+@pytest.mark.asyncio
+async def test_monitor_task():
+ """Test task monitoring"""
+ from backend.api import AgentCallback, monitor_task
+
+ # Setup mock task
+ mock_task = MagicMock()
+ mock_task.status = "in_progress"
+ mock_task.result = {"current_step": "Step 1"}
+
+ # Create callback
+ callback = AgentCallback("test_task_id", "test_thread")
+
+ # Start monitoring in background
+ monitor_task_future = asyncio.create_task(monitor_task(mock_task, callback))
+
+ # Wait a bit for events
+ await asyncio.sleep(0.1)
+
+ # Simulate task completion
+ mock_task.status = "completed"
+ mock_task.result = "Success"
+
+ # Wait for monitoring to complete
+ await monitor_task_future
+
+ # Verify callback was completed
+ assert callback.completed == True
+ assert callback.error is None
+
+def test_task_timeout():
+ """Test task timeout handling"""
+ from backend.api import monitor_task, AgentCallback
+
+ # Setup mock task that never completes
+ mock_task = MagicMock()
+ mock_task.status = "in_progress"
+
+ # Create callback
+ callback = AgentCallback("test_task_id")
+
+ # Override max_retries for faster testing
+ max_retries = 2
+
+ # Run monitoring
+ asyncio.run(monitor_task(mock_task, callback))
+
+ # Verify timeout was handled
+ assert callback.completed == True
+ assert callback.error is not None
+ assert "timed out" in callback.error.lower()
+
+def test_step_tracking():
+ """Test step tracking in events"""
+ from backend.api import AgentCallback
+
+ # Create callback
+ callback = AgentCallback("test_task_id")
+
+ # Simulate multiple steps
+ steps = [
+ {"current_step": "Analyzing code", "step_number": 1},
+ {"current_step": "Making changes", "step_number": 2},
+ {"current_step": "Running tests", "step_number": 3}
+ ]
+
+ # Process steps
+ for step in steps:
+ asyncio.run(callback.on_status_change("in_progress", step_info=step))
+
+ # Complete the task
+ asyncio.run(callback.on_status_change("completed", result="Success"))
+
+ # Verify step tracking
+ assert callback.current_step == 3 # Should have processed all steps
+ assert callback.completed == True
+