diff --git a/python/fastapi-uvicorn/.gitignore b/python/fastapi-uvicorn/.gitignore index 838e0c2..9fd34ff 100644 --- a/python/fastapi-uvicorn/.gitignore +++ b/python/fastapi-uvicorn/.gitignore @@ -1,3 +1,5 @@ .venv +venv .env -__pycache__ \ No newline at end of file +__pycache__ +*.pyc \ No newline at end of file diff --git a/python/fastapi-uvicorn/README.md b/python/fastapi-uvicorn/README.md index e3cd1c3..a5d612b 100644 --- a/python/fastapi-uvicorn/README.md +++ b/python/fastapi-uvicorn/README.md @@ -1,242 +1,410 @@ -# Auto instrumentating FastAPI application using OpenTelemetry +# FastAPI with OpenTelemetry and Last9 GenAI Tracking -This example demonstrates how to instrument a simple FastAPI application with -OpenTelemetry. +This example demonstrates how to instrument a FastAPI application with OpenTelemetry and the Last9 GenAI SDK for comprehensive LLM cost tracking and observability. -1. Create a virtual environment and install the dependencies: +## Features + +- **Standard OpenTelemetry instrumentation** for FastAPI endpoints +- **Last9 GenAI SDK integration** for LLM cost tracking +- **Automatic cost calculation** for Claude, GPT-4, Gemini, and other models +- **Workflow-level cost aggregation** across multi-step processes +- **Multi-turn conversation tracking** with context preservation +- **Tool/function call tracking** with performance metrics +- **Content events** for prompt/completion tracking + +## Prerequisites + +1. **Python 3.7+** installed +2. **Last9 account** - Get your OTLP credentials from [Last9 Dashboard](https://app.last9.io) +3. **Anthropic API key** - Get from [Anthropic Console](https://console.anthropic.com/) + +## Quick Start + +### 1. Create virtual environment and install dependencies ```bash -python -m venv .venv +cd /home/karthikeyan/Documents/last9/opentelemetry-examples/python/fastapi-uvicorn + +# Create and activate virtual environment +python3 -m venv .venv source .venv/bin/activate + +# Install dependencies pip install -r requirements.txt ``` -2. Install the Auto Instrumentation packages using the `opentelemetry-bootstrap` - tool: +### 2. Configure environment variables + +Copy the example environment file and fill in your credentials: ```bash -opentelemetry-bootstrap -a requirements +cp .env.example .env ``` -It will output the packages that you can add to `requirements.txt`. + +Edit `.env` and set: ```bash -opentelemetry-api>=1.15.0 -opentelemetry-sdk>=1.15.0 -opentelemetry-instrumentation-fastapi>=0.36b0 -opentelemetry-exporter-otlp>=1.15.0 -opentelemetry-instrumentation-requests>=0.36b0 -opentelemetry-distro==0.48b0 +# Last9 Configuration +OTEL_SERVICE_NAME=fastapi-genai-app +OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp.last9.io +OTEL_EXPORTER_OTLP_HEADERS=Authorization=Basic%20 +OTEL_TRACES_EXPORTER=otlp +OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf + +# Anthropic API Key +ANTHROPIC_API_KEY= ``` -Additionally, install these optional packages for enhanced functionality: +**Important:** The `OTEL_EXPORTER_OTLP_HEADERS` value must be URL encoded. Replace spaces with `%20`. -```bash -# AWS SDK extension for better AWS resource detection -pip install opentelemetry-sdk-extension-aws +#### How to get Last9 credentials: -# Container ID resource detector for containerized environments -pip install opentelemetry-resource-detector-containerid -``` +1. Go to [Last9 Dashboard](https://app.last9.io) +2. Navigate to Settings → OTLP +3. Copy the Basic Auth header +4. URL encode it (replace spaces with `%20`) + +Example: +- Original: `Basic dXNlcjpwYXNz` +- Encoded: `Basic%20dXNlcjpwYXNz` -For more details on the container ID resource detector, see: https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/resource/opentelemetry-resource-detector-containerid +For more details: [Python OTEL SDK - Whitespace in OTLP Headers](https://last9.io/blog/whitespace-in-otlp-headers-and-opentelemetry-python-sdk/) -Copy these packages to your `requirements.txt` file and run the command again to install the packages. +### 3. Run the application + +Load environment variables and start: ```bash -pip install -r requirements.txt +# Load environment variables +source .env # or: export $(cat .env | xargs) + +# Start the application +./start.sh ``` -3. Obtain the OTLP Auth Header from the [Last9 dashboard](https://app.last9.io). - The Auth header is required in the next step. +The script will automatically: +- Use Gunicorn + Uvicorn workers in production mode (if `OTEL_EXPORTER_OTLP_ENDPOINT` is set) +- Use simple Uvicorn for local development (if endpoint not set) -4. Next, run the commands below to set the environment variables. +### 4. Test the API +**Check health:** ```bash -export OTEL_SERVICE_NAME=fastapi-app -export OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp.last9.io -export OTEL_EXPORTER_OTLP_HEADERS="Authorization=" -export OTEL_TRACES_EXPORTER=otlp -export OTEL_EXPORTER_OTLP_PROTOCOL="http/protobuf" +curl http://localhost:8000/health ``` -> Note: `BASIC_AUTH_HEADER` should be replaced with the URL encoded value of the -> basic authorization header. Read this post to know how -> [Python Otel SDK](https://last9.io/blog/whitespace-in-otlp-headers-and-opentelemetry-python-sdk/) -> handles whitespace in headers for more details. +**Simple chat (with cost tracking):** +```bash +curl -X POST http://localhost:8000/chat \ + -H "Content-Type: application/json" \ + -d '{ + "message": "What is the capital of France?", + "model": "claude-sonnet-4-5-20250929", + "max_tokens": 100 + }' +``` -5. Run the FastAPI application: +Response: +```json +{ + "response": "The capital of France is Paris...", + "model": "claude-sonnet-4-5-20250929", + "cost": 0.000045, + "input_tokens": 15, + "output_tokens": 25 +} +``` -**Local Development (Simple Uvicorn):** +**Multi-step workflow (with cost aggregation):** ```bash -./start.sh +curl -X POST http://localhost:8000/workflow \ + -H "Content-Type: application/json" \ + -d '{ + "task": "Analyze the benefits of serverless architecture" + }' +``` + +Response: +```json +{ + "result": "Based on the analysis, serverless architecture offers...", + "workflow_id": "workflow_1234567890", + "total_cost": 0.000123, + "llm_calls": 2, + "tool_calls": 1 +} ``` -**Production (Gunicorn + Auto Instrumentation):** +**Multi-turn conversation:** ```bash -OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 ./start.sh +curl -X POST http://localhost:8000/conversation \ + -H "Content-Type: application/json" \ + -d '{ + "conversation_id": "conv_user123_session456", + "message": "Hello, how are you?" + }' + +# Second turn +curl -X POST http://localhost:8000/conversation \ + -H "Content-Type: application/json" \ + -d '{ + "conversation_id": "conv_user123_session456", + "message": "Tell me about AI" + }' ``` -The start script automatically detects the presence of `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable: -- **If set**: Uses gunicorn with uvicorn workers and full OpenTelemetry auto instrumentation -- **If not set**: Uses simple uvicorn for local development +### 5. View traces in Last9 -6. Once the server is running, you can access the application at - `http://127.0.0.1:8000` by default. The API endpoints are: +1. Go to [Last9 Dashboard](https://app.last9.io) +2. Navigate to APM → Traces +3. You should see traces with: + - Cost attributes for each LLM call + - Workflow-level cost aggregation + - Conversation tracking + - Content events (prompts/completions) + - Tool call events -- GET `/` - Hello World -- GET `/items/:id` - Get items by ID +## API Endpoints -7. Sign in to [Last9 Dashboard](https://app.last9.io) and visit the APM - dashboard to see the traces in action. +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/` | GET | API information and available endpoints | +| `/health` | GET | Health check | +| `/chat` | POST | Simple LLM chat with cost tracking | +| `/workflow` | POST | Multi-step workflow with cost aggregation | +| `/conversation` | POST | Multi-turn conversation tracking | -![Traces](./traces.png) +## Last9 GenAI SDK Features -## How the Conditional Startup Works +The `last9_genai_attributes.py` file provides: -The `app.py` file contains a simple FastAPI application with a main function: +### 1. Automatic Cost Tracking ```python -if __name__ == "__main__": - uvicorn.run(app, host="0.0.0.0", port=8000) +cost_breakdown = last9_genai.add_llm_cost_attributes( + span=span, + model="claude-sonnet-4-5-20250929", + usage={ + "input_tokens": 100, + "output_tokens": 200 + } +) ``` -**Important**: This main function only executes during local development when you run `python app.py` directly. In production: - -- Gunicorn imports the `app` object from `app.py` -- When imported, `__name__ == "app"` (not `"__main__"`) -- The main function block is **never executed** -- Gunicorn manages server startup using uvicorn workers -- OpenTelemetry initialization happens via the `post_fork` hook in `gunicorn.conf.py` +Supports 20+ models: +- Anthropic: Claude 3.5 Sonnet, Claude 3 Opus, Claude 3 Haiku +- OpenAI: GPT-4o, GPT-4, GPT-3.5 Turbo +- Google: Gemini Pro, Gemini 1.5 Pro/Flash +- Cohere: Command R, Command R+ -## Running with Circus and Gunicorn for Production +### 2. Workflow Cost Aggregation -For production deployments with multiple workers and process management, you can use Circus + Gunicorn instead of running Uvicorn directly. This setup properly handles OpenTelemetry auto-instrumentation with forked worker processes. +```python +# Initialize workflow +last9_genai.add_workflow_attributes( + span=span, + workflow_id="workflow_123", + workflow_type="customer_support" +) + +# Add costs from multiple LLM calls +last9_genai.add_llm_cost_attributes( + span=span1, + model="claude-sonnet-4-5-20250929", + usage=usage1, + workflow_id="workflow_123" +) + +last9_genai.add_llm_cost_attributes( + span=span2, + model="claude-3-haiku-20240307", + usage=usage2, + workflow_id="workflow_123" +) + +# Get total cost +workflow_cost = global_workflow_tracker.get_workflow_cost("workflow_123") +# Returns: {total_cost: 0.000123, llm_call_count: 2, tool_call_count: 0} +``` -### Why Circus + Gunicorn? +### 3. Conversation Tracking -- **Process Management**: Circus provides robust process monitoring and management -- **Multi-Worker Support**: Gunicorn handles multiple workers better than Uvicorn's `--workers` option -- **OpenTelemetry Compatibility**: Auto-instrumentation works correctly with Gunicorn's worker forking model -- **Production Ready**: Better resource management and fault tolerance +```python +# Start conversation +global_conversation_tracker.start_conversation( + conversation_id="conv_123", + user_id="user_456" +) + +# Add turns +global_conversation_tracker.add_turn( + conversation_id="conv_123", + user_message="Hello", + assistant_message="Hi there!", + model="claude-sonnet-4-5-20250929", + usage=usage, + cost=0.000012 +) + +# Get stats +stats = global_conversation_tracker.get_conversation_stats("conv_123") +# Returns: {turn_count: 5, total_cost: 0.000234, ...} +``` -### Setup Steps +### 4. Tool/Function Call Tracking -1. **Update requirements.txt** to include Gunicorn: -```bash -# Add to your requirements.txt -gunicorn>=20.0.0 -circus>=0.18.0 +```python +last9_genai.add_tool_attributes( + span=span, + tool_name="database_query", + function_name="get_user_profile", + arguments={"user_id": "123"}, + result={"name": "John", "email": "john@example.com"}, + workflow_id="workflow_123" +) ``` -2. **Create gunicorn.conf.py** configuration: +### 5. Content Events + ```python -import os +# Add prompt/completion as span events +last9_genai.add_content_events( + span=span, + prompt="What is AI?", + completion="Artificial Intelligence is...", + truncate_at=1000 # Optional truncation +) +``` -# Server socket -bind = "0.0.0.0:8000" -backlog = 2048 +## Last9 Attributes Reference -# Worker processes -workers = 2 -worker_class = "uvicorn.workers.UvicornWorker" -worker_connections = 1000 -timeout = 60 -keepalive = 2 +The SDK adds these custom attributes to your traces: -# Restart workers after this many requests, to prevent memory leaks -max_requests = 5000 -max_requests_jitter = 1000 +| Attribute | Description | Example | +|-----------|-------------|---------| +| `gen_ai.l9.span.kind` | Span classification | `llm`, `tool`, `prompt` | +| `gen_ai.l9.cost.input` | Input cost in USD | `0.000015` | +| `gen_ai.l9.cost.output` | Output cost in USD | `0.000030` | +| `gen_ai.l9.cost.total` | Total cost in USD | `0.000045` | +| `gen_ai.l9.workflow.id` | Workflow identifier | `workflow_123` | +| `gen_ai.l9.workflow.type` | Workflow type | `customer_support` | +| `gen_ai.l9.workflow.cost.total` | Total workflow cost | `0.000234` | +| `gen_ai.l9.conversation.id` | Conversation ID | `conv_user123` | +| `gen_ai.l9.conversation.turn` | Turn number | `3` | +| `gen_ai.l9.tool.name` | Tool/function name | `database_query` | +| `gen_ai.l9.performance.response_time` | Response time (seconds) | `1.234` | -# Logging -loglevel = "info" -errorlog = "-" -accesslog = "-" +## Architecture -# Process naming -proc_name = "fastapi-otel-app" +``` +FastAPI Application + ↓ +OpenTelemetry Auto-Instrumentation + ↓ +Last9 GenAI SDK (Manual Instrumentation) + ├─→ Cost Calculation + ├─→ Workflow Tracking + ├─→ Conversation Tracking + └─→ Content Events + ↓ +OTLP Exporter → Last9 Backend +``` -# Server mechanics -daemon = False -pidfile = "/tmp/gunicorn.pid" -preload_app = False +## Local Development (Console Exporter) -``` +For local testing without sending to Last9: + +```bash +# Don't set OTEL_EXPORTER_OTLP_ENDPOINT +unset OTEL_EXPORTER_OTLP_ENDPOINT -3. **Create circus.ini** configuration: -```ini -[circus] -check_delay = 5 -endpoint = tcp://127.0.0.1:5555 -pubsub_endpoint = tcp://127.0.0.1:5556 -stats_endpoint = tcp://127.0.0.1:5557 +# Set console exporter +export OTEL_TRACES_EXPORTER=console +export OTEL_SERVICE_NAME=fastapi-genai-app -[watcher:fastapi-app] -cmd = opentelemetry-instrument gunicorn -c gunicorn.conf.py app:app -numprocesses = 1 -copy_env = True +# Set Anthropic key +export ANTHROPIC_API_KEY= -[env:fastapi-app] -OTEL_EXPORTER_OTLP_TRACES_EXPORTER = console -OTEL_SERVICE_NAME = fastapi-gunicorn-app -OTEL_SERVICE_VERSION = 1.0.0 +# Run +./start.sh ``` -4. **Create start.sh** script: -```bash -#!/bin/bash +This will print traces to console instead of sending to Last9. -# Set OpenTelemetry environment variables -export OTEL_SERVICE_NAME="${OTEL_SERVICE_NAME:-fastapi-gunicorn-app}" -export OTEL_SERVICE_VERSION="${OTEL_SERVICE_VERSION:-1.0.0}" -export OTEL_RESOURCE_ATTRIBUTES="service.name=${OTEL_SERVICE_NAME},service.version=${OTEL_SERVICE_VERSION}" -export OTEL_EXPORTER_OTLP_ENDPOINT="${OTEL_EXPORTER_OTLP_ENDPOINT:-http://localhost:4317}" +## Production Deployment -# Start Circus with the configuration -echo "Starting FastAPI application with Circus (Gunicorn 2 workers) and OpenTelemetry auto-instrumentation..." -circusd circus.ini -``` +### Using Gunicorn + Circus -5. **Simplify your app.py** (remove manual instrumentation): -```python -from fastapi import FastAPI +The included configuration supports production deployment with: -# Create a FastAPI application -app = FastAPI() +- **Gunicorn** with Uvicorn workers (2 workers by default) +- **Circus** process manager for monitoring and restarts +- **OpenTelemetry auto-instrumentation** via `opentelemetry-instrument` wrapper +- **Graceful worker lifecycle management** -@app.get("/") -async def root(): - return {"message": "Hello World from FastAPI with auto-instrumentation!"} +Configuration files: +- `gunicorn.conf.py` - Gunicorn configuration (workers, timeouts, etc.) +- `circus.ini` - Circus process manager configuration +- `start.sh` - Startup script with conditional logic -@app.get("/items/{item_id}") -async def read_item(item_id: int): - return {"item_id": item_id} -``` +### Start in production mode: -6. **Start the application**: ```bash -chmod +x start.sh +# Ensure OTEL_EXPORTER_OTLP_ENDPOINT is set +export OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp.last9.io +export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic%20" +export ANTHROPIC_API_KEY= + +# Start ./start.sh ``` -### Key Differences from Uvicorn +## Troubleshooting + +### Issue: "Anthropic client not configured" + +**Solution:** Set the `ANTHROPIC_API_KEY` environment variable: +```bash +export ANTHROPIC_API_KEY= +``` + +### Issue: Traces not showing in Last9 + +**Solutions:** +1. Check `OTEL_EXPORTER_OTLP_ENDPOINT` is set correctly +2. Verify `OTEL_EXPORTER_OTLP_HEADERS` is URL encoded +3. Check Last9 credentials are valid +4. Look for errors in application logs -| Aspect | Uvicorn Direct | Circus + Gunicorn | -|--------|----------------|-------------------| -| **Workers** | `uvicorn --workers 2` (problematic with OTEL) | Gunicorn manages 2 workers properly | -| **Process Management** | Basic process handling | Circus provides monitoring, restart, scaling | -| **OpenTelemetry** | Auto-instrumentation breaks with `--workers` | Works correctly with worker forking | -| **Production Ready** | Basic development server | Full production deployment setup | -| **Resource Management** | Limited control | Fine-grained worker lifecycle management | +### Issue: Import error for `last9_genai_attributes` -### Benefits +**Solution:** Ensure `last9_genai_attributes.py` is in the same directory as `app.py` -- ✅ **Full OpenTelemetry auto-instrumentation** works with multiple workers -- ✅ **Process monitoring** and automatic restarts via Circus -- ✅ **Load balancing** across multiple Gunicorn workers -- ✅ **Production-grade** setup with proper resource management -- ✅ **Easy scaling** by adjusting worker count in configuration -- ✅ **macOS Compatibility** - Automatically sets `NO_PROXY=*` to prevent proxy-related crashes on macOS +### Issue: Cost calculation seems wrong + +**Solution:** Check model name matches exactly. The SDK uses model name to look up pricing. + +## Additional Resources + +- [Last9 Documentation](https://docs.last9.io/) +- [OpenTelemetry Python Docs](https://opentelemetry.io/docs/instrumentation/python/) +- [Anthropic API Reference](https://docs.anthropic.com/) +- [Last9 GenAI SDK Repository](https://github.com/last9/ai/tree/master/sdk/python) + +## Project Structure + +``` +fastapi-uvicorn/ +├── app.py # FastAPI application with GenAI endpoints +├── last9_genai_attributes.py # Last9 GenAI SDK (single file utility) +├── requirements.txt # Python dependencies +├── .env.example # Environment configuration template +├── start.sh # Startup script +├── gunicorn.conf.py # Gunicorn configuration +├── circus.ini # Circus process manager config +└── README.md # This file +``` -### Testing +## License -After starting, you should see traces being generated for each request, with different worker processes handling the load balancing. +MIT diff --git a/python/fastapi-uvicorn/app.py b/python/fastapi-uvicorn/app.py index 9d17ec3..0562624 100644 --- a/python/fastapi-uvicorn/app.py +++ b/python/fastapi-uvicorn/app.py @@ -1,16 +1,478 @@ +import os +import time import uvicorn -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from typing import Optional, List +from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode + +# Import Last9 GenAI SDK +from last9_genai_attributes import ( + Last9GenAI, + global_workflow_tracker, + global_conversation_tracker, +) + +# Import Anthropic SDK (optional, graceful fallback if not installed) +try: + from anthropic import Anthropic + ANTHROPIC_AVAILABLE = True +except ImportError: + ANTHROPIC_AVAILABLE = False + print("⚠️ Anthropic SDK not installed. Install with: pip install anthropic") # Create a FastAPI application -app = FastAPI() +app = FastAPI(title="Last9 GenAI FastAPI Example") + +# Initialize Last9 GenAI utility +last9_genai = Last9GenAI() + +# Get tracer +tracer = trace.get_tracer(__name__) + +# Initialize Anthropic client if available +anthropic_client = None +if ANTHROPIC_AVAILABLE and os.getenv("ANTHROPIC_API_KEY"): + anthropic_client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) + print("✅ Anthropic client initialized") +else: + print("⚠️ Anthropic API key not set. Set ANTHROPIC_API_KEY environment variable.") + + +# Request/Response Models +class ChatRequest(BaseModel): + message: str + model: Optional[str] = "claude-sonnet-4-5-20250929" + max_tokens: Optional[int] = 1024 + + +class ChatResponse(BaseModel): + response: str + model: str + cost: float + input_tokens: int + output_tokens: int + +class WorkflowRequest(BaseModel): + task: str + + +class WorkflowResponse(BaseModel): + result: str + workflow_id: str + total_cost: float + llm_calls: int + tool_calls: int + + +class ConversationRequest(BaseModel): + conversation_id: str + message: str + model: Optional[str] = "claude-sonnet-4-5-20250929" + + +class ConversationResponse(BaseModel): + response: str + conversation_id: str + turn_count: int + total_cost: float + + +# Basic endpoints @app.get("/") async def root(): - return {"message": "Hello World from FastAPI with auto-instrumentation!"} + return { + "message": "Last9 GenAI FastAPI Example", + "endpoints": { + "/chat": "POST - Simple LLM chat with cost tracking", + "/workflow": "POST - Multi-step workflow with cost aggregation", + "/conversation": "POST - Multi-turn conversation tracking", + "/health": "GET - Health check" + }, + "anthropic_available": ANTHROPIC_AVAILABLE and anthropic_client is not None + } + + +@app.get("/health") +async def health(): + return { + "status": "healthy", + "anthropic_configured": ANTHROPIC_AVAILABLE and anthropic_client is not None + } + + +@app.post("/chat", response_model=ChatResponse) +async def chat(request: ChatRequest): + """ + Simple chat endpoint with Last9 GenAI cost tracking. + + This demonstrates: + - LLM span creation and classification + - Automatic cost calculation + - Token usage tracking + - Content events (prompt/completion) + """ + if not anthropic_client: + raise HTTPException( + status_code=503, + detail="Anthropic client not configured. Set ANTHROPIC_API_KEY environment variable." + ) + + # Create a span for the LLM operation + with tracer.start_as_current_span("chat_endpoint") as parent_span: + try: + # Classify the span and add workflow attributes + last9_genai.set_span_kind(parent_span, "llm") + + # Create LLM span + with tracer.start_as_current_span("anthropic.chat") as llm_span: + try: + # Set span kind as LLM + last9_genai.set_span_kind(llm_span, "llm") + + # Add standard LLM attributes + last9_genai.add_standard_llm_attributes( + span=llm_span, + model=request.model, + operation="chat", + request_params={ + "max_tokens": request.max_tokens, + "temperature": 1.0 + } + ) + + # Add prompt as content event + last9_genai.add_content_events( + span=llm_span, + prompt=request.message, + completion=None # Will add after response + ) + + # Track performance + start_time = time.time() + + # Call Anthropic API + response = anthropic_client.messages.create( + model=request.model, + max_tokens=request.max_tokens, + messages=[{"role": "user", "content": request.message}] + ) + + response_time = time.time() - start_time + + # Extract response + completion_text = response.content[0].text + + # Add completion as content event + last9_genai.add_content_events( + span=llm_span, + prompt=None, + completion=completion_text + ) + + # Add cost attributes + usage = { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens + } + + cost_breakdown = last9_genai.add_llm_cost_attributes( + span=llm_span, + model=request.model, + usage=usage + ) + + # Add performance attributes + last9_genai.add_performance_attributes( + span=llm_span, + response_time_ms=response_time * 1000, # Convert to milliseconds + response_size_bytes=len(completion_text) + ) + + # Set span status as OK + llm_span.set_status(Status(StatusCode.OK)) + parent_span.set_status(Status(StatusCode.OK)) + + return ChatResponse( + response=completion_text, + model=request.model, + cost=cost_breakdown.total, + input_tokens=usage["input_tokens"], + output_tokens=usage["output_tokens"] + ) + + except Exception as e: + # Set child span status as error + llm_span.set_status(Status(StatusCode.ERROR, str(e))) + llm_span.record_exception(e) + # Propagate to parent span handler + raise + + except Exception as e: + # Set parent span status as error + parent_span.set_status(Status(StatusCode.ERROR, str(e))) + parent_span.record_exception(e) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/workflow", response_model=WorkflowResponse) +async def workflow(request: WorkflowRequest): + """ + Multi-step workflow with cost aggregation. + + This demonstrates: + - Workflow-level cost tracking + - Multiple LLM calls aggregated + - Tool calls within workflow + - Final cost summary + """ + if not anthropic_client: + raise HTTPException( + status_code=503, + detail="Anthropic client not configured. Set ANTHROPIC_API_KEY environment variable." + ) + + workflow_id = f"workflow_{int(time.time())}" + + with tracer.start_as_current_span("workflow_endpoint") as parent_span: + try: + # Initialize workflow tracking + last9_genai.add_workflow_attributes( + span=parent_span, + workflow_id=workflow_id, + workflow_type="task_processing", + user_id="user_123", + session_id="session_456" + ) + # Step 1: Analyze task (LLM call) + with tracer.start_as_current_span("step_1_analyze") as step1_span: + last9_genai.set_span_kind(step1_span, "llm") + last9_genai.add_standard_llm_attributes( + span=step1_span, + model="claude-sonnet-4-5-20250929", + operation="chat" + ) + + analyze_response = anthropic_client.messages.create( + model="claude-sonnet-4-5-20250929", + max_tokens=200, + messages=[{ + "role": "user", + "content": f"Analyze this task and provide 3 key points: {request.task}" + }] + ) + + usage1 = { + "input_tokens": analyze_response.usage.input_tokens, + "output_tokens": analyze_response.usage.output_tokens + } + + last9_genai.add_llm_cost_attributes( + span=step1_span, + model="claude-sonnet-4-5-20250929", + usage=usage1, + workflow_id=workflow_id + ) + + analysis = analyze_response.content[0].text + + # Step 2: Simulate tool call (database lookup) + with tracer.start_as_current_span("step_2_database_lookup") as step2_span: + last9_genai.set_span_kind(step2_span, "tool") + + start_time = time.time() + time.sleep(0.1) # Simulate database query + duration_ms = (time.time() - start_time) * 1000 + + last9_genai.add_tool_attributes( + span=step2_span, + tool_name="database_query", + tool_type="datastore", + description="Lookup context from database", + arguments={"query": "task_context"}, + result={"context": "Additional context from database"}, + duration_ms=duration_ms, + workflow_id=workflow_id + ) + + # Step 3: Generate final response (LLM call - using faster Haiku model) + with tracer.start_as_current_span("step_3_generate_response") as step3_span: + last9_genai.set_span_kind(step3_span, "llm") + last9_genai.add_standard_llm_attributes( + span=step3_span, + model="claude-haiku-4-5-20251001", + operation="chat" + ) + + final_response = anthropic_client.messages.create( + model="claude-haiku-4-5-20251001", + max_tokens=300, + messages=[{ + "role": "user", + "content": f"Based on this analysis, provide a concise solution: {analysis}" + }] + ) + + usage2 = { + "input_tokens": final_response.usage.input_tokens, + "output_tokens": final_response.usage.output_tokens + } + + last9_genai.add_llm_cost_attributes( + span=step3_span, + model="claude-haiku-4-5-20251001", + usage=usage2, + workflow_id=workflow_id + ) + + solution = final_response.content[0].text + + # Get workflow cost summary + workflow_cost = global_workflow_tracker.get_workflow_cost(workflow_id) + + # Clean up workflow tracking + global_workflow_tracker.delete_workflow(workflow_id) + + # Set span status as OK + parent_span.set_status(Status(StatusCode.OK)) + + return WorkflowResponse( + result=solution, + workflow_id=workflow_id, + total_cost=workflow_cost["total_cost"], + llm_calls=workflow_cost["llm_call_count"], + tool_calls=workflow_cost["tool_call_count"] + ) + + except Exception as e: + # Set parent span status as error + parent_span.set_status(Status(StatusCode.ERROR, str(e))) + parent_span.record_exception(e) + raise HTTPException(status_code=500, detail=str(e)) + + +@app.post("/conversation", response_model=ConversationResponse) +async def conversation(request: ConversationRequest): + """ + Multi-turn conversation tracking. + + This demonstrates: + - Conversation ID tracking + - Turn-by-turn cost tracking + - Conversation statistics + - Context preservation + """ + if not anthropic_client: + raise HTTPException( + status_code=503, + detail="Anthropic client not configured. Set ANTHROPIC_API_KEY environment variable." + ) + + with tracer.start_as_current_span("conversation_endpoint") as parent_span: + try: + # Start or continue conversation + if not global_conversation_tracker.get_conversation_stats(request.conversation_id): + global_conversation_tracker.start_conversation( + conversation_id=request.conversation_id, + user_id="user_123", + metadata={"session_id": "session_456"} + ) + + with tracer.start_as_current_span("conversation_turn") as turn_span: + try: + last9_genai.set_span_kind(turn_span, "llm") + + # Add conversation tracking + turn_number = len(global_conversation_tracker._conversations[request.conversation_id]) + 1 + last9_genai.add_conversation_tracking( + span=turn_span, + conversation_id=request.conversation_id, + turn_number=turn_number, + user_id="user_123" + ) + + last9_genai.add_standard_llm_attributes( + span=turn_span, + model=request.model, + operation="chat" + ) + + # Add content events + last9_genai.add_content_events( + span=turn_span, + prompt=request.message, + completion=None + ) + + # Call Anthropic API + start_time = time.time() + response = anthropic_client.messages.create( + model=request.model, + max_tokens=1024, + messages=[{"role": "user", "content": request.message}] + ) + response_time = time.time() - start_time + + completion_text = response.content[0].text + + # Add completion event + last9_genai.add_content_events( + span=turn_span, + prompt=None, + completion=completion_text + ) + + # Calculate costs + usage = { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens + } + + cost_breakdown = last9_genai.add_llm_cost_attributes( + span=turn_span, + model=request.model, + usage=usage + ) + + # Add to conversation tracker + global_conversation_tracker.add_turn( + conversation_id=request.conversation_id, + user_message=request.message, + assistant_message=completion_text, + model=request.model, + usage=usage, + cost=cost_breakdown + ) + + # Get conversation stats + conv_stats = global_conversation_tracker.get_conversation_stats(request.conversation_id) + + # Set span status as OK + turn_span.set_status(Status(StatusCode.OK)) + parent_span.set_status(Status(StatusCode.OK)) + + return ConversationResponse( + response=completion_text, + conversation_id=request.conversation_id, + turn_count=conv_stats["turn_count"], + total_cost=conv_stats["total_cost"] + ) + + except Exception as e: + # Set child span status as error + turn_span.set_status(Status(StatusCode.ERROR, str(e))) + turn_span.record_exception(e) + # Propagate to parent span handler + raise + + except Exception as e: + # Set parent span status as error + parent_span.set_status(Status(StatusCode.ERROR, str(e))) + parent_span.record_exception(e) + raise HTTPException(status_code=500, detail=str(e)) -@app.get("/items/{item_id}") -async def read_item(item_id: int): - return {"item_id": item_id} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/python/fastapi-uvicorn/circus.ini b/python/fastapi-uvicorn/circus.ini index 583f354..db0c336 100644 --- a/python/fastapi-uvicorn/circus.ini +++ b/python/fastapi-uvicorn/circus.ini @@ -10,6 +10,6 @@ numprocesses = 1 copy_env = True [env:fastapi-app] -OTEL_EXPORTER_OTLP_TRACES_EXPORTER = console -OTEL_SERVICE_NAME = fastapi-gunicorn-app +OTEL_TRACES_EXPORTER = console +OTEL_SERVICE_NAME = fastapi-genai-app OTEL_SERVICE_VERSION = 1.0.0 \ No newline at end of file diff --git a/python/fastapi-uvicorn/gunicorn.conf.py b/python/fastapi-uvicorn/gunicorn.conf.py index 010343f..21eb028 100644 --- a/python/fastapi-uvicorn/gunicorn.conf.py +++ b/python/fastapi-uvicorn/gunicorn.conf.py @@ -11,7 +11,7 @@ os.environ["NO_PROXY"] = "*" # Server socket -bind = "0.0.0.0:8005" +bind = "0.0.0.0:8000" backlog = 2048 # Worker processes (set to 1 on macOS to avoid fork issues) diff --git a/python/fastapi-uvicorn/last9_genai_attributes.py b/python/fastapi-uvicorn/last9_genai_attributes.py new file mode 100644 index 0000000..a553af2 --- /dev/null +++ b/python/fastapi-uvicorn/last9_genai_attributes.py @@ -0,0 +1,985 @@ +#!/usr/bin/env python3 +""" +Last9 GenAI Attributes for Python OpenTelemetry + +This utility provides Last9-specific gen_ai attributes that complement the standard +OpenTelemetry gen_ai semantic conventions. It adds cost tracking, workflow management, +and enhanced observability features similar to the last9-node-agent. + +Usage: + from last9_genai_attributes import Last9GenAI, model_pricing + + # Initialize the utility + l9_genai = Last9GenAI() + + # Add Last9 attributes to your spans + l9_genai.add_llm_cost_attributes(span, model_name, usage_data) + l9_genai.set_span_kind(span, 'llm') + l9_genai.add_workflow_attributes(span, workflow_id='my-workflow') + +Requirements: + pip install opentelemetry-api opentelemetry-sdk +""" + +import hashlib +import time +from datetime import datetime +from typing import Dict, Any, Optional, Union, List +from dataclasses import dataclass, field +import json +import logging + +try: + from opentelemetry.trace import Span + from opentelemetry import trace + from opentelemetry.trace.status import Status, StatusCode +except ImportError: + raise ImportError( + "OpenTelemetry packages not found. Install with: " + "pip install opentelemetry-api opentelemetry-sdk" + ) + +logger = logging.getLogger(__name__) + +# ============================================================================ +# LAST9 GenAI Semantic Conventions - Python Implementation +# Based on last9-node-agent/src/semantic/gen-ai.ts +# ============================================================================ + +class GenAIAttributes: + """OpenTelemetry GenAI semantic convention constants""" + + # Standard OpenTelemetry GenAI attributes (v1.28.0) + PROVIDER_NAME = 'gen_ai.provider.name' + OPERATION_NAME = 'gen_ai.operation.name' + CONVERSATION_ID = 'gen_ai.conversation.id' + + # Request attributes + REQUEST_MODEL = 'gen_ai.request.model' + REQUEST_MAX_TOKENS = 'gen_ai.request.max_tokens' + REQUEST_TEMPERATURE = 'gen_ai.request.temperature' + REQUEST_TOP_P = 'gen_ai.request.top_p' + REQUEST_FREQUENCY_PENALTY = 'gen_ai.request.frequency_penalty' + REQUEST_PRESENCE_PENALTY = 'gen_ai.request.presence_penalty' + + # Response attributes + RESPONSE_ID = 'gen_ai.response.id' + RESPONSE_MODEL = 'gen_ai.response.model' + RESPONSE_FINISH_REASONS = 'gen_ai.response.finish_reasons' + + # Usage attributes (v1.28.0 standard) + USAGE_INPUT_TOKENS = 'gen_ai.usage.input_tokens' + USAGE_OUTPUT_TOKENS = 'gen_ai.usage.output_tokens' + USAGE_TOTAL_TOKENS = 'gen_ai.usage.total_tokens' + + # Cost tracking (Last9 custom) + USAGE_COST_USD = 'gen_ai.usage.cost_usd' + USAGE_COST_INPUT_USD = 'gen_ai.usage.cost_input_usd' + USAGE_COST_OUTPUT_USD = 'gen_ai.usage.cost_output_usd' + + # Prompt attributes + PROMPT = 'gen_ai.prompt' + COMPLETION = 'gen_ai.completion' + + # Prompt versioning (Last9 custom) + PROMPT_TEMPLATE = 'gen_ai.prompt.template' + PROMPT_VERSION = 'gen_ai.prompt.version' + PROMPT_HASH = 'gen_ai.prompt.hash' + PROMPT_TEMPLATE_ID = 'gen_ai.prompt.template_id' + + # Tool attributes + TOOL_NAME = 'gen_ai.tool.name' + TOOL_TYPE = 'gen_ai.tool.type' + TOOL_DESCRIPTION = 'gen_ai.tool.description' + +class Last9Attributes: + """Last9-specific extensions to OpenTelemetry GenAI conventions""" + + # Span classification + L9_SPAN_KIND = 'gen_ai.l9.span.kind' + + # Workflow attributes + WORKFLOW_ID = 'workflow.id' + WORKFLOW_TYPE = 'workflow.type' + WORKFLOW_USER_ID = 'workflow.user_id' + WORKFLOW_SESSION_ID = 'workflow.session_id' + WORKFLOW_TOTAL_COST_USD = 'workflow.total_cost_usd' + WORKFLOW_LLM_CALLS = 'workflow.llm_calls' + WORKFLOW_TOOL_CALLS = 'workflow.tool_calls' + + # Advanced AI attributes + CAPABILITY_NAME = 'gen_ai.capability.name' + STEP_NAME = 'gen_ai.step.name' + AGENT_TYPE = 'gen_ai.agent.type' + CHAIN_TYPE = 'gen_ai.chain.type' + + # Function/tool calling + FUNCTION_CALL_NAME = 'gen_ai.function.call.name' + FUNCTION_CALL_ARGUMENTS = 'gen_ai.function.call.arguments' + FUNCTION_CALL_RESULT = 'gen_ai.function.call.result' + FUNCTION_CALL_DURATION_MS = 'gen_ai.function.call.duration_ms' + + # Performance metrics + RESPONSE_TIME_MS = 'gen_ai.response.time_ms' + RESPONSE_SIZE_BYTES = 'gen_ai.response.size_bytes' + REQUEST_SIZE_BYTES = 'gen_ai.request.size_bytes' + QUALITY_SCORE = 'gen_ai.quality.score' + +class SpanKinds: + """Last9 span kind values for gen_ai.l9.span.kind""" + LLM = 'llm' + TOOL = 'tool' + PROMPT = 'prompt' + +class Operations: + """Standard GenAI operation names""" + CHAT_COMPLETIONS = 'chat.completions' + EMBEDDINGS = 'embeddings' + TEXT_COMPLETION = 'text.completion' + TOOL_CALL = 'tool.call' + +class Providers: + """AI provider names""" + ANTHROPIC = 'anthropic' + OPENAI = 'openai' + GOOGLE = 'google' + COHERE = 'cohere' + HUGGINGFACE = 'huggingface' + +class EventNames: + """Event names for span events (matches Node.js agent)""" + GEN_AI_CONTENT_PROMPT = 'gen_ai.content.prompt' + GEN_AI_CONTENT_COMPLETION = 'gen_ai.content.completion' + GEN_AI_TOOL_CALL = 'gen_ai.tool.call' + GEN_AI_TOOL_RESULT = 'gen_ai.tool.result' + GEN_AI_PROMPT_VERSION = 'gen_ai.prompt.version' + +# ============================================================================ +# Model Pricing Configuration +# Based on last9-node-agent/src/config/defaults.js +# ============================================================================ + +@dataclass +class ModelPricing: + """Pricing structure for AI models (USD per million tokens)""" + input: float + output: float + +# Default model pricing (USD per million tokens) +# Based on last9-node-agent/src/config/defaults.js +MODEL_PRICING = { + # Anthropic Models - Claude 4.5 Series + 'claude-sonnet-4-5-20250929': ModelPricing(input=3.0, output=15.0), + 'claude-haiku-4-5-20251001': ModelPricing(input=0.25, output=1.25), + + # Anthropic Models - Claude 3.x Series + 'claude-3-5-sonnet': ModelPricing(input=3.0, output=15.0), + 'claude-3-5-sonnet-20241022': ModelPricing(input=3.0, output=15.0), + 'claude-3-5-sonnet-20240620': ModelPricing(input=3.0, output=15.0), + 'claude-3-opus': ModelPricing(input=15.0, output=75.0), + 'claude-3-haiku': ModelPricing(input=0.25, output=1.25), + 'claude-3-haiku-20240307': ModelPricing(input=0.25, output=1.25), + + # OpenAI Models + 'gpt-4o': ModelPricing(input=2.50, output=10.0), + 'gpt-4o-mini': ModelPricing(input=0.15, output=0.60), + 'gpt-4': ModelPricing(input=30.0, output=60.0), + 'gpt-4-turbo': ModelPricing(input=10.0, output=30.0), + 'gpt-3.5-turbo': ModelPricing(input=0.50, output=1.50), + 'gpt-3.5-turbo-instruct': ModelPricing(input=1.50, output=2.0), + + # Google Models + 'gemini-pro': ModelPricing(input=0.50, output=1.50), + 'gemini-1.5-pro': ModelPricing(input=3.50, output=10.50), + 'gemini-1.5-flash': ModelPricing(input=0.075, output=0.30), + + # Cohere Models + 'command-r': ModelPricing(input=0.50, output=1.50), + 'command-r-plus': ModelPricing(input=3.0, output=15.0), + + # Default fallback pricing + 'default': ModelPricing(input=1.0, output=3.0) +} + +# ============================================================================ +# Cost Calculation Utilities +# Based on last9-node-agent/src/costing/token-calculator.js +# ============================================================================ + +@dataclass +class CostBreakdown: + """Cost breakdown for LLM operations""" + input: float = 0.0 + output: float = 0.0 + total: float = 0.0 + +def calculate_llm_cost( + model: str, + usage: Dict[str, int], + custom_pricing: Optional[Dict[str, ModelPricing]] = None +) -> CostBreakdown: + """ + Calculate cost for LLM operation based on token usage + + Args: + model: Model name + usage: Token usage dict with keys like 'input_tokens', 'output_tokens', + 'prompt_tokens', 'completion_tokens' + custom_pricing: Optional custom pricing override + + Returns: + CostBreakdown with input, output, and total costs in USD + """ + pricing_table = custom_pricing or MODEL_PRICING + + # Get model pricing, fallback to default + pricing = pricing_table.get(model, MODEL_PRICING['default']) + + # Extract token counts (handle both old and new naming conventions) + input_tokens = usage.get('input_tokens', usage.get('prompt_tokens', 0)) + output_tokens = usage.get('output_tokens', usage.get('completion_tokens', 0)) + + if input_tokens == 0 and output_tokens == 0: + return CostBreakdown() + + # Calculate costs (pricing is per million tokens) + input_cost = (input_tokens / 1_000_000) * pricing.input + output_cost = (output_tokens / 1_000_000) * pricing.output + total_cost = input_cost + output_cost + + return CostBreakdown( + input=round(input_cost, 6), + output=round(output_cost, 6), + total=round(total_cost, 6) + ) + +def detect_ai_provider(model: str) -> Optional[str]: + """ + Detect AI provider from model name + Based on last9-node-agent/src/spans/llm.js:detectAISystem + """ + if not model: + return None + + model_lower = model.lower() + + if 'claude' in model_lower: + return Providers.ANTHROPIC + elif 'gpt' in model_lower: + return Providers.OPENAI + elif 'gemini' in model_lower: + return Providers.GOOGLE + elif 'command' in model_lower: + return Providers.COHERE + + return None + +def estimate_tokens(text: str) -> int: + """ + Rough estimation of token count from text + Based on ~4 characters per token average + """ + return len(text) // 4 + +# ============================================================================ +# Workflow Cost Tracking +# Based on last9-node-agent/src/costing/workflow-costs.js +# ============================================================================ + +@dataclass +class WorkflowCost: + """Workflow cost tracking""" + workflow_id: str + metadata: Dict[str, Any] = field(default_factory=dict) + costs: List[Dict[str, Any]] = field(default_factory=list) + total_cost: float = 0.0 + llm_calls: int = 0 + tool_calls: int = 0 + created_at: datetime = field(default_factory=datetime.now) + +class WorkflowCostTracker: + """ + Track costs across workflow operations + Based on last9-node-agent/src/costing/workflow-costs.js + """ + + def __init__(self): + self._workflows: Dict[str, WorkflowCost] = {} + + def initialize_workflow(self, workflow_id: str, metadata: Optional[Dict[str, Any]] = None) -> None: + """Initialize a new workflow for cost tracking""" + if workflow_id not in self._workflows: + self._workflows[workflow_id] = WorkflowCost( + workflow_id=workflow_id, + metadata=metadata or {} + ) + + def add_cost(self, workflow_id: str, cost: CostBreakdown, operation_type: str = 'llm') -> None: + """Add cost to workflow""" + if workflow_id not in self._workflows: + self.initialize_workflow(workflow_id) + + workflow = self._workflows[workflow_id] + workflow.costs.append({ + 'cost': cost, + 'operation_type': operation_type, + 'timestamp': datetime.now() + }) + workflow.total_cost += cost.total + + if operation_type == 'llm': + workflow.llm_calls += 1 + elif operation_type == 'tool': + workflow.tool_calls += 1 + + def get_workflow_cost(self, workflow_id: str) -> Optional[WorkflowCost]: + """Get workflow cost summary""" + return self._workflows.get(workflow_id) + + def get_all_workflows(self) -> Dict[str, WorkflowCost]: + """Get all workflow cost summaries""" + return self._workflows.copy() + + def delete_workflow(self, workflow_id: str) -> bool: + """Delete workflow from tracking""" + if workflow_id in self._workflows: + del self._workflows[workflow_id] + return True + return False + +# Global workflow tracker instance +global_workflow_tracker = WorkflowCostTracker() + +# ============================================================================ +# Main Last9 GenAI Utility Class +# ============================================================================ + +class Last9GenAI: + """ + Last9 GenAI attributes utility for Python OpenTelemetry users + + This class provides methods to add Last9-specific gen_ai attributes + to existing OpenTelemetry spans, complementing the standard gen_ai + semantic conventions with cost tracking, workflow management, and + enhanced observability features. + """ + + def __init__(self, + custom_pricing: Optional[Dict[str, ModelPricing]] = None, + workflow_tracker: Optional[WorkflowCostTracker] = None): + """ + Initialize Last9 GenAI utility + + Args: + custom_pricing: Custom model pricing configuration + workflow_tracker: Custom workflow cost tracker instance + """ + self.model_pricing = custom_pricing or MODEL_PRICING + self.workflow_tracker = workflow_tracker or global_workflow_tracker + self.logger = logging.getLogger(__name__) + + def set_span_kind(self, span: Span, kind: str) -> None: + """ + Set Last9 span kind classification + + Args: + span: OpenTelemetry span + kind: Span kind ('llm', 'tool', 'prompt') + """ + if kind in [SpanKinds.LLM, SpanKinds.TOOL, SpanKinds.PROMPT]: + span.set_attribute(Last9Attributes.L9_SPAN_KIND, kind) + else: + self.logger.warning(f"Unknown span kind: {kind}") + + def add_llm_cost_attributes(self, + span: Span, + model: str, + usage: Dict[str, int], + workflow_id: Optional[str] = None) -> CostBreakdown: + """ + Add LLM cost tracking attributes to span + + Args: + span: OpenTelemetry span + model: Model name + usage: Token usage dictionary + workflow_id: Optional workflow ID for cost aggregation + + Returns: + CostBreakdown with calculated costs + """ + cost = calculate_llm_cost(model, usage, self.model_pricing) + + if cost.total > 0: + span.set_attribute(GenAIAttributes.USAGE_COST_USD, cost.total) + span.set_attribute(GenAIAttributes.USAGE_COST_INPUT_USD, cost.input) + span.set_attribute(GenAIAttributes.USAGE_COST_OUTPUT_USD, cost.output) + + # Add to workflow cost tracking if workflow_id provided + if workflow_id: + self.workflow_tracker.add_cost(workflow_id, cost, 'llm') + + return cost + + def add_workflow_attributes(self, + span: Span, + workflow_id: str, + workflow_type: Optional[str] = None, + user_id: Optional[str] = None, + session_id: Optional[str] = None) -> None: + """ + Add workflow-level attributes to span + + Args: + span: OpenTelemetry span + workflow_id: Unique workflow identifier + workflow_type: Type of workflow + user_id: User identifier + session_id: Session identifier + """ + span.set_attribute(Last9Attributes.WORKFLOW_ID, workflow_id) + + if workflow_type: + span.set_attribute(Last9Attributes.WORKFLOW_TYPE, workflow_type) + if user_id: + span.set_attribute(Last9Attributes.WORKFLOW_USER_ID, user_id) + if session_id: + span.set_attribute(Last9Attributes.WORKFLOW_SESSION_ID, session_id) + + # Initialize workflow tracking + self.workflow_tracker.initialize_workflow(workflow_id) + + # Add aggregated cost if available + workflow_cost = self.workflow_tracker.get_workflow_cost(workflow_id) + if workflow_cost: + span.set_attribute(Last9Attributes.WORKFLOW_TOTAL_COST_USD, workflow_cost.total_cost) + span.set_attribute(Last9Attributes.WORKFLOW_LLM_CALLS, workflow_cost.llm_calls) + span.set_attribute(Last9Attributes.WORKFLOW_TOOL_CALLS, workflow_cost.tool_calls) + + def add_prompt_versioning(self, + span: Span, + prompt_template: str, + template_id: Optional[str] = None, + version: Optional[str] = None) -> str: + """ + Add prompt versioning attributes + + Args: + span: OpenTelemetry span + prompt_template: Prompt template content + template_id: Template identifier + version: Template version + + Returns: + Generated hash of the prompt template + """ + # Generate hash of template content + prompt_hash = hashlib.sha256(prompt_template.encode()).hexdigest()[:16] + + span.set_attribute(GenAIAttributes.PROMPT_TEMPLATE, prompt_template) + span.set_attribute(GenAIAttributes.PROMPT_HASH, prompt_hash) + + if template_id: + span.set_attribute(GenAIAttributes.PROMPT_TEMPLATE_ID, template_id) + if version: + span.set_attribute(GenAIAttributes.PROMPT_VERSION, version) + + return prompt_hash + + def add_tool_attributes(self, + span: Span, + tool_name: str, + tool_type: Optional[str] = None, + description: Optional[str] = None, + arguments: Optional[Dict[str, Any]] = None, + result: Optional[Any] = None, + duration_ms: Optional[float] = None, + workflow_id: Optional[str] = None) -> None: + """ + Add tool/function call attributes + + Args: + span: OpenTelemetry span + tool_name: Name of the tool/function + tool_type: Type of tool (e.g., 'datastore', 'api') + description: Tool description + arguments: Tool call arguments + result: Tool execution result + duration_ms: Execution duration in milliseconds + workflow_id: Optional workflow ID for tracking + """ + span.set_attribute(GenAIAttributes.TOOL_NAME, tool_name) + self.set_span_kind(span, SpanKinds.TOOL) + + if tool_type: + span.set_attribute(GenAIAttributes.TOOL_TYPE, tool_type) + if description: + span.set_attribute(GenAIAttributes.TOOL_DESCRIPTION, description) + if arguments: + span.set_attribute(Last9Attributes.FUNCTION_CALL_ARGUMENTS, json.dumps(arguments)) + if result: + span.set_attribute(Last9Attributes.FUNCTION_CALL_RESULT, str(result)) + if duration_ms: + span.set_attribute(Last9Attributes.FUNCTION_CALL_DURATION_MS, duration_ms) + + # Track tool cost in workflow (tools typically have no direct cost) + if workflow_id: + self.workflow_tracker.add_cost(workflow_id, CostBreakdown(), 'tool') + + def add_performance_attributes(self, + span: Span, + response_time_ms: Optional[float] = None, + request_size_bytes: Optional[int] = None, + response_size_bytes: Optional[int] = None, + quality_score: Optional[float] = None) -> None: + """ + Add performance and quality metrics + + Args: + span: OpenTelemetry span + response_time_ms: Response time in milliseconds + request_size_bytes: Request size in bytes + response_size_bytes: Response size in bytes + quality_score: Quality score (0.0-1.0) + """ + if response_time_ms is not None: + span.set_attribute(Last9Attributes.RESPONSE_TIME_MS, response_time_ms) + if request_size_bytes is not None: + span.set_attribute(Last9Attributes.REQUEST_SIZE_BYTES, request_size_bytes) + if response_size_bytes is not None: + span.set_attribute(Last9Attributes.RESPONSE_SIZE_BYTES, response_size_bytes) + if quality_score is not None: + span.set_attribute(Last9Attributes.QUALITY_SCORE, quality_score) + + def add_standard_llm_attributes(self, + span: Span, + model: str, + operation: str = Operations.CHAT_COMPLETIONS, + conversation_id: Optional[str] = None, + request_params: Optional[Dict[str, Any]] = None, + response_data: Optional[Dict[str, Any]] = None, + usage: Optional[Dict[str, int]] = None) -> None: + """ + Add standard OpenTelemetry GenAI attributes + + Args: + span: OpenTelemetry span + model: Model name + operation: Operation type + conversation_id: Conversation/session ID + request_params: Request parameters (max_tokens, temperature, etc.) + response_data: Response metadata (id, finish_reason, etc.) + usage: Token usage data + """ + # Set basic attributes + span.set_attribute(GenAIAttributes.REQUEST_MODEL, model) + span.set_attribute(GenAIAttributes.OPERATION_NAME, operation) + + # Set provider based on model + provider = detect_ai_provider(model) + if provider: + span.set_attribute(GenAIAttributes.PROVIDER_NAME, provider) + + if conversation_id: + span.set_attribute(GenAIAttributes.CONVERSATION_ID, conversation_id) + + # Set request parameters + if request_params: + if 'max_tokens' in request_params: + span.set_attribute(GenAIAttributes.REQUEST_MAX_TOKENS, request_params['max_tokens']) + if 'temperature' in request_params: + span.set_attribute(GenAIAttributes.REQUEST_TEMPERATURE, request_params['temperature']) + if 'top_p' in request_params: + span.set_attribute(GenAIAttributes.REQUEST_TOP_P, request_params['top_p']) + if 'frequency_penalty' in request_params: + span.set_attribute(GenAIAttributes.REQUEST_FREQUENCY_PENALTY, request_params['frequency_penalty']) + if 'presence_penalty' in request_params: + span.set_attribute(GenAIAttributes.REQUEST_PRESENCE_PENALTY, request_params['presence_penalty']) + + # Set response data + if response_data: + if 'id' in response_data: + span.set_attribute(GenAIAttributes.RESPONSE_ID, response_data['id']) + if 'model' in response_data: + span.set_attribute(GenAIAttributes.RESPONSE_MODEL, response_data['model']) + if 'finish_reason' in response_data: + span.set_attribute(GenAIAttributes.RESPONSE_FINISH_REASONS, [response_data['finish_reason']]) + + # Set usage attributes + if usage: + input_tokens = usage.get('input_tokens', usage.get('prompt_tokens', 0)) + output_tokens = usage.get('output_tokens', usage.get('completion_tokens', 0)) + total_tokens = usage.get('total_tokens', input_tokens + output_tokens) + + if input_tokens > 0: + span.set_attribute(GenAIAttributes.USAGE_INPUT_TOKENS, input_tokens) + if output_tokens > 0: + span.set_attribute(GenAIAttributes.USAGE_OUTPUT_TOKENS, output_tokens) + if total_tokens > 0: + span.set_attribute(GenAIAttributes.USAGE_TOTAL_TOKENS, total_tokens) + + def add_conversation_tracking(self, + span: Span, + conversation_id: str, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + turn_number: Optional[int] = None) -> None: + """ + Add conversation tracking attributes to span + + Args: + span: OpenTelemetry span + conversation_id: Unique conversation identifier + user_id: User identifier + session_id: Session identifier + turn_number: Turn number in the conversation + """ + span.set_attribute(GenAIAttributes.CONVERSATION_ID, conversation_id) + + if user_id: + span.set_attribute(Last9Attributes.WORKFLOW_USER_ID, user_id) + if session_id: + span.set_attribute(Last9Attributes.WORKFLOW_SESSION_ID, session_id) + if turn_number is not None: + span.set_attribute('gen_ai.conversation.turn_number', turn_number) + + def add_content_events(self, + span: Span, + prompt: Optional[str] = None, + completion: Optional[str] = None, + truncate_length: int = 1000) -> None: + """ + Add content events for input/output prompts (matches Node.js agent functionality) + + Args: + span: OpenTelemetry span + prompt: User prompt/input text + completion: LLM completion/response text + truncate_length: Maximum length before truncation (default: 1000) + """ + if prompt: + truncated_prompt = ( + prompt[:truncate_length] + '...' + if len(prompt) > truncate_length + else prompt + ) + + # Add prompt content as span event + span.add_event(EventNames.GEN_AI_CONTENT_PROMPT, { + GenAIAttributes.PROMPT: truncated_prompt, + 'gen_ai.prompt.length': len(prompt), + 'gen_ai.prompt.truncated': len(prompt) > truncate_length + }) + + if completion: + truncated_completion = ( + completion[:truncate_length] + '...' + if len(completion) > truncate_length + else completion + ) + + # Add completion content as span event + span.add_event(EventNames.GEN_AI_CONTENT_COMPLETION, { + GenAIAttributes.COMPLETION: truncated_completion, + 'gen_ai.completion.length': len(completion), + 'gen_ai.completion.truncated': len(completion) > truncate_length + }) + + def add_tool_call_events(self, + span: Span, + tool_name: str, + tool_arguments: Optional[Dict[str, Any]] = None, + tool_result: Optional[Any] = None) -> None: + """ + Add tool call and result events to span + + Args: + span: OpenTelemetry span + tool_name: Name of the tool being called + tool_arguments: Tool call arguments + tool_result: Tool execution result + """ + if tool_arguments: + span.add_event(EventNames.GEN_AI_TOOL_CALL, { + GenAIAttributes.TOOL_NAME: tool_name, + Last9Attributes.FUNCTION_CALL_ARGUMENTS: json.dumps(tool_arguments) if tool_arguments else None + }) + + if tool_result: + span.add_event(EventNames.GEN_AI_TOOL_RESULT, { + GenAIAttributes.TOOL_NAME: tool_name, + Last9Attributes.FUNCTION_CALL_RESULT: str(tool_result) + }) + + def create_conversation_span(self, + tracer, + conversation_id: str, + model: str, + user_id: Optional[str] = None, + turn_number: Optional[int] = None) -> Span: + """ + Create a conversation-aware LLM span with tracking + + Args: + tracer: OpenTelemetry tracer + conversation_id: Unique conversation identifier + model: Model name + user_id: User identifier + turn_number: Turn number in conversation + + Returns: + Configured span with conversation tracking + """ + span = tracer.start_span("gen_ai.chat.completions") + + # Add standard LLM attributes + self.add_standard_llm_attributes( + span, model, + conversation_id=conversation_id + ) + + # Add Last9 attributes + self.set_span_kind(span, SpanKinds.LLM) + + # Add conversation tracking + self.add_conversation_tracking( + span, conversation_id, user_id=user_id, turn_number=turn_number + ) + + return span + +# ============================================================================ +# Conversation Management Utilities +# ============================================================================ + +@dataclass +class ConversationTurn: + """Represents a single turn in a conversation""" + turn_number: int + user_message: str + assistant_message: str + model: str + usage: Dict[str, int] + cost: CostBreakdown + timestamp: datetime = field(default_factory=datetime.now) + +class ConversationTracker: + """ + Track multi-turn conversations with cost aggregation + Similar to workflow tracking but specifically for conversations + """ + + def __init__(self): + self._conversations: Dict[str, List[ConversationTurn]] = {} + self._conversation_metadata: Dict[str, Dict[str, Any]] = {} + + def start_conversation(self, + conversation_id: str, + user_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None) -> None: + """Start tracking a new conversation""" + if conversation_id not in self._conversations: + self._conversations[conversation_id] = [] + self._conversation_metadata[conversation_id] = { + 'user_id': user_id, + 'started_at': datetime.now(), + **(metadata or {}) + } + + def add_turn(self, + conversation_id: str, + user_message: str, + assistant_message: str, + model: str, + usage: Dict[str, int], + cost: CostBreakdown) -> int: + """Add a turn to the conversation""" + if conversation_id not in self._conversations: + self.start_conversation(conversation_id) + + turn_number = len(self._conversations[conversation_id]) + 1 + turn = ConversationTurn( + turn_number=turn_number, + user_message=user_message, + assistant_message=assistant_message, + model=model, + usage=usage, + cost=cost + ) + + self._conversations[conversation_id].append(turn) + return turn_number + + def get_conversation(self, conversation_id: str) -> Optional[List[ConversationTurn]]: + """Get conversation history""" + return self._conversations.get(conversation_id) + + def get_conversation_cost(self, conversation_id: str) -> float: + """Get total cost for a conversation""" + turns = self._conversations.get(conversation_id, []) + return sum(turn.cost.total for turn in turns) + + def get_conversation_stats(self, conversation_id: str) -> Optional[Dict[str, Any]]: + """Get conversation statistics""" + turns = self._conversations.get(conversation_id) + if not turns: + return None + + total_cost = sum(turn.cost.total for turn in turns) + total_input_tokens = sum(turn.usage.get('input_tokens', 0) for turn in turns) + total_output_tokens = sum(turn.usage.get('output_tokens', 0) for turn in turns) + + return { + 'conversation_id': conversation_id, + 'turn_count': len(turns), + 'total_cost': total_cost, + 'total_input_tokens': total_input_tokens, + 'total_output_tokens': total_output_tokens, + 'models_used': list(set(turn.model for turn in turns)), + 'started_at': self._conversation_metadata.get(conversation_id, {}).get('started_at'), + 'user_id': self._conversation_metadata.get(conversation_id, {}).get('user_id') + } + +# Global conversation tracker instance +global_conversation_tracker = ConversationTracker() + +# ============================================================================ +# Convenience Functions for Common Use Cases +# ============================================================================ + +def create_llm_span(tracer, + span_name: str, + model: str, + operation: str = Operations.CHAT_COMPLETIONS, + workflow_id: Optional[str] = None, + conversation_id: Optional[str] = None, + l9_genai: Optional[Last9GenAI] = None) -> Span: + """ + Create an LLM span with standard Last9 attributes + + Args: + tracer: OpenTelemetry tracer + span_name: Name of the span + model: Model name + operation: Operation type + workflow_id: Workflow ID + conversation_id: Conversation ID + l9_genai: Last9GenAI instance (creates default if not provided) + + Returns: + Configured span with Last9 attributes + """ + if l9_genai is None: + l9_genai = Last9GenAI() + + span = tracer.start_span(f"gen_ai.{operation}") + + # Add standard attributes + l9_genai.add_standard_llm_attributes( + span, model, operation, conversation_id + ) + + # Add Last9 attributes + l9_genai.set_span_kind(span, SpanKinds.LLM) + + if workflow_id: + l9_genai.add_workflow_attributes(span, workflow_id) + + return span + +def create_tool_span(tracer, + tool_name: str, + tool_type: Optional[str] = None, + workflow_id: Optional[str] = None, + l9_genai: Optional[Last9GenAI] = None) -> Span: + """ + Create a tool/function call span with Last9 attributes + + Args: + tracer: OpenTelemetry tracer + tool_name: Name of the tool + tool_type: Type of tool + workflow_id: Workflow ID + l9_genai: Last9GenAI instance + + Returns: + Configured span for tool usage + """ + if l9_genai is None: + l9_genai = Last9GenAI() + + span = tracer.start_span(f"gen_ai.tool.{tool_name}") + + l9_genai.add_tool_attributes( + span, tool_name, tool_type=tool_type, workflow_id=workflow_id + ) + + return span + +# ============================================================================ +# Example Usage and Testing +# ============================================================================ + +def example_usage(): + """Example usage of Last9 GenAI attributes""" + + # Initialize OpenTelemetry tracer (you'll already have this in your app) + tracer = trace.get_tracer(__name__) + + # Initialize Last9 GenAI utility + l9_genai = Last9GenAI() + + # Example 1: LLM call with cost tracking + with tracer.start_span("gen_ai.chat.completions") as span: + model = "claude-3-5-sonnet" + usage = {"input_tokens": 150, "output_tokens": 250} + + # Add standard OpenTelemetry GenAI attributes + l9_genai.add_standard_llm_attributes( + span, model, + conversation_id="session_123", + request_params={"max_tokens": 1000, "temperature": 0.7}, + usage=usage + ) + + # Add Last9-specific attributes + l9_genai.set_span_kind(span, SpanKinds.LLM) + cost = l9_genai.add_llm_cost_attributes(span, model, usage, "workflow_456") + l9_genai.add_workflow_attributes(span, "workflow_456", "chat", "user_789") + + print(f"LLM call cost: ${cost.total:.6f}") + + # Example 2: Tool call + with tracer.start_span("gen_ai.tool.database_query") as span: + l9_genai.add_tool_attributes( + span, "database_query", + tool_type="datastore", + description="Query user preferences", + arguments={"table": "users", "user_id": 123}, + result="Found 1 record", + duration_ms=45.2, + workflow_id="workflow_456" + ) + + # Example 3: Prompt versioning + with tracer.start_span("gen_ai.prompt.template") as span: + prompt_template = "You are a helpful AI assistant. User question: {question}" + l9_genai.set_span_kind(span, SpanKinds.PROMPT) + prompt_hash = l9_genai.add_prompt_versioning( + span, prompt_template, + template_id="assistant_v1", + version="1.2.3" + ) + print(f"Prompt hash: {prompt_hash}") + + # View workflow cost summary + workflow = l9_genai.workflow_tracker.get_workflow_cost("workflow_456") + if workflow: + print(f"Workflow total cost: ${workflow.total_cost:.6f}") + print(f"LLM calls: {workflow.llm_calls}, Tool calls: {workflow.tool_calls}") + +if __name__ == "__main__": + # Run example usage + print("Last9 GenAI Attributes for Python - Example Usage") + print("=" * 50) + example_usage() \ No newline at end of file diff --git a/python/fastapi-uvicorn/requirements.txt b/python/fastapi-uvicorn/requirements.txt index c6e8a74..13807cc 100644 --- a/python/fastapi-uvicorn/requirements.txt +++ b/python/fastapi-uvicorn/requirements.txt @@ -8,5 +8,9 @@ opentelemetry-sdk>=1.15.0 opentelemetry-instrumentation-fastapi>=0.36b0 opentelemetry-exporter-otlp>=1.15.0 opentelemetry-instrumentation-requests>=0.36b0 +requests>=2.28.0 opentelemetry-distro>=0.48b0 -opentelemetry-instrumentation[otlp]>=0.36b0 \ No newline at end of file +opentelemetry-instrumentation[otlp]>=0.36b0 + +# GenAI/LLM dependencies +anthropic>=0.3.0 \ No newline at end of file