From 1593ca0ab1758bf345d8745a1c9b57910a811f3f Mon Sep 17 00:00:00 2001 From: FrigaZzz Date: Mon, 15 Sep 2025 21:10:07 +0200 Subject: [PATCH 1/6] feat: add FastAPI modular server example with SSE streaming support - Complete modular FastAPI server implementation for ADK agents - Server-Sent Events (SSE) streaming with three optimization levels - Clean architecture with proper separation of concerns - Includes greetings agent example with streaming capabilities - Comprehensive configuration management and logging - Ready-to-use with environment configuration template - Follows ADK best practices for production deployments Features: - Real-time agent streaming via SSE endpoints - Configurable optimization levels (minimal, balanced, full compatibility) - Modular agent system with easy extensibility - Proper error handling and logging - Environment-based configuration management Files added: - Modular server structure with agents, API, config, core, and models - SSE event mapper for optimized streaming - Streaming models and request handling - Greetings agent implementation - Environment configuration template - Comprehensive README documentation --- .../fastapi_modular_server/.env.example | 16 + .../samples/fastapi_modular_server/.gitignore | 1 + .../samples/fastapi_modular_server/README.md | 329 ++++++++++++++++++ .../fastapi_modular_server/__init__.py | 0 .../app/agents/greetings_agent/__init__.py | 1 + .../agents/greetings_agent/greeting_agent.py | 9 + .../app/api/__init__.py | 0 .../app/api/custom_adk_server.py | 309 ++++++++++++++++ .../app/api/routers/__init__.py | 0 .../app/api/routers/agent_router.py | 92 +++++ .../app/config/settings.py | 58 +++ .../app/core/__init__.py | 0 .../app/core/dependencies.py | 117 +++++++ .../app/core/logging.py | 20 ++ .../app/core/mapping/__init__.py | 0 .../app/core/mapping/sse_event_mapper.py | 64 ++++ .../app/models/__init__.py | 0 .../app/models/streaming_request.py | 20 ++ .../samples/fastapi_modular_server/main.py | 80 +++++ 19 files changed, 1116 insertions(+) create mode 100644 contributing/samples/fastapi_modular_server/.env.example create mode 100644 contributing/samples/fastapi_modular_server/.gitignore create mode 100644 contributing/samples/fastapi_modular_server/README.md create mode 100644 contributing/samples/fastapi_modular_server/__init__.py create mode 100644 contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py create mode 100644 contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greeting_agent.py create mode 100644 contributing/samples/fastapi_modular_server/app/api/__init__.py create mode 100644 contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py create mode 100644 contributing/samples/fastapi_modular_server/app/api/routers/__init__.py create mode 100644 contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py create mode 100644 contributing/samples/fastapi_modular_server/app/config/settings.py create mode 100644 contributing/samples/fastapi_modular_server/app/core/__init__.py create mode 100644 contributing/samples/fastapi_modular_server/app/core/dependencies.py create mode 100644 contributing/samples/fastapi_modular_server/app/core/logging.py create mode 100644 contributing/samples/fastapi_modular_server/app/core/mapping/__init__.py create mode 100644 contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py create mode 100644 contributing/samples/fastapi_modular_server/app/models/__init__.py create mode 100644 contributing/samples/fastapi_modular_server/app/models/streaming_request.py create mode 100644 contributing/samples/fastapi_modular_server/main.py diff --git a/contributing/samples/fastapi_modular_server/.env.example b/contributing/samples/fastapi_modular_server/.env.example new file mode 100644 index 0000000000..f4dd8b3c90 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/.env.example @@ -0,0 +1,16 @@ +# Application Configuration +DEBUG=true +PORT=8881 +HOST=localhost +LOG_LEVEL=INFO +LOGGING=true + +# ADK Configuration +SERVE_WEB_INTERFACE=true +RELOAD_AGENTS=true + +# Google Gemini Configuration +GOOGLE_API_KEY=YOUR_GOOGLE_API_KEY_HERE + +# Model Configuration +MODEL_PROVIDER=google diff --git a/contributing/samples/fastapi_modular_server/.gitignore b/contributing/samples/fastapi_modular_server/.gitignore new file mode 100644 index 0000000000..397b4a7624 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/.gitignore @@ -0,0 +1 @@ +*.log diff --git a/contributing/samples/fastapi_modular_server/README.md b/contributing/samples/fastapi_modular_server/README.md new file mode 100644 index 0000000000..64ac221ccd --- /dev/null +++ b/contributing/samples/fastapi_modular_server/README.md @@ -0,0 +1,329 @@ +# 🚀 Google ADK FastAPI Modular Server + +A **production-ready template** for extending Google's Agent Development Kit (ADK) with custom FastAPI endpoints, optimized SSE streaming, and modular architecture patterns. + +## 🎯 Purpose & Value + +The **FastAPI Modular Server** serves as a **template and reference implementation** for teams who want to: + +- **Extend ADK's built-in server** without modifying core behavior +- **Accelerate production deployment** with battle-tested patterns +- **Add custom business logic** through modular router systems +- **Enable hot-reload capabilities** for faster development cycles + +## ✨ Key Features + +### 🔧 **Modular Router Architecture** +- Clean separation of concerns with dedicated router classes +- Easy to add new endpoints without touching core server code + +### ⚡ **Optimized SSE Streaming** +- **3 optimization levels** for different use cases: + - `MINIMAL`: Essential content only (author + text) + - `BALANCED`: Core data with invocation tracking + - `FULL_COMPAT`: Complete ADK event compatibility +- Reduced payload sizes for improved performance +- Custom event filtering and mapping + +### 🔄 **Hot-Reload Development** +- Automatic agent reloading on file changes +- File system monitoring with `watchdog` +- Development-friendly with production stability + + +## 📁 Project Structure + +``` +fastapi_modular_server/ +├── .env.example # Environment variables template +├── README.md # Project documentation +├── __init__.py # Package initialization +├── app/ # Main application directory +│ ├── __init__.py # App package initialization +│ ├── agents/ # Agent definitions +│ │ └── greetings_agent/ # Greetings agent module +│ │ ├── __init__.py # Agent package init +│ │ └── greetings_agent.py # Greetings agent implementation +│ ├── api/ # API layer +│ │ ├── __init__.py # API package init +│ │ ├── routers/ # API route definitions +│ │ │ ├── __init__.py # Routers package init +│ │ │ └── agent_router.py # Agent-related API routes +│ │ └── custom_adk_server.py # FastAPI server configuration +│ ├── config/ # Configuration management +│ │ └── settings.py # Application settings +│ ├── core/ # Core application components +│ │ ├── __init__.py # Core package init +│ │ ├── dependencies.py # Dependency injection +│ │ ├── logging.py # Logging configuration +│ │ └── mapping/ # Data mapping utilities +│ │ ├── __init__.py # Mapping package init +│ │ └── sse_event_mapper.py # Server-Sent Events mapper +│ └── models/ # Data models +│ ├── __init__.py # Models package init +│ └── streaming_request.py # Streaming data models +└── main.py # Application entry point +``` + +## 🚀 Quick Start + +### 1. **Configuration** +```bash +# Copy environment template +cp .env.example .env + +# Edit .env with your settings +vim .env + +# Set the API KEY + +``` + +### 3. **Run the Server** +```bash +# Development mode with hot-reload +python main.py + +# Production mode +uvicorn main:app --host 0.0.0.0 --port 8881 +``` + + +## 🔧 Customization Guide + +### **Adding New Routers** + +Create a new router following the established pattern: + +```python +# app/api/routers/my_custom_router.py +from fastapi import APIRouter, Depends +from app.core.dependencies import ADKServices, get_adk_services + +class MyCustomRouter: + def __init__(self, web_server_instance): + self.web_server = web_server_instance + self.router = APIRouter(prefix="/custom", tags=["Custom"]) + self._setup_routes() + + def _setup_routes(self): + @self.router.get("/endpoint") + async def my_endpoint( + ): + # Access any ADK service + sessions = await self.web_server.session_service.list_sessions() + return {"data": "custom response", "session_count": len(sessions)} + + def get_router(self) -> APIRouter: + return self.router +``` + +Register it in the custom server: + +```python +# In app/api/server.py - CustomAdkWebServer class +def _initialize_routers(self): + try: + self.agent_router = AgentRouter(self) + self.my_custom_router = MyCustomRouter(self) # Add this + logger.info("All routers initialized successfully.") + except Exception as e: + logger.error(f"Failed to initialize routers: {e}", exc_info=True) + +def _register_modular_routers(self, app: FastAPI): + # ... existing code ... + + if self.my_custom_router: + app.include_router(self.my_custom_router.get_router()) + logger.info("Registered MyCustomRouter.") +``` + +### **Overriding ADK Endpoints** + +#### **Method 1: Route Removal (Current Approach)** + +```python +def _register_modular_routers(self, app: FastAPI): + # Remove specific ADK routes + routes_to_remove = [] + for route in app.routes: + if route.path in [ + "/run_sse", + "/apps/{app_name}/users/{user_id}/sessions" + ] and hasattr(route, 'methods') and 'POST' in route.methods: + routes_to_remove.append(route) + + # Remove the routes + for route in routes_to_remove: + app.routes.remove(route) +``` + +#### **Method 3: Middleware Interception** + +For more complex overrides, use middleware: + +```python +from fastapi import Request, Response +from starlette.middleware.base import BaseHTTPMiddleware + +class RouteOverrideMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: Request, call_next): + # Intercept specific routes + if request.url.path == "/run_sse" and request.method == "POST": + # Handle with custom logic + return await self.handle_custom_sse(request) + + return await call_next(request) +``` + +### **Accessing ADK Services and Runners** + +#### **From Router Classes** +```python +class AgentRouter: + def __init__(self, web_server_instance): + self.web_server = web_server_instance + + async def my_endpoint(self, adk_services: ADKServices = Depends(get_adk_services)): + # Access services + agents = self.web_server.agent_loader.list_agents() + session = await self.web_server.session_service.list_sessions() + + # Access runners through web server + runner = await self.web_server.runner_dict... + + # Access other web server properties + runners_cache = self.web_server.runners_to_clean +``` + + + +### **Optimizing SSE Streaming** + +#### **Custom Event Filtering** + +Extend the SSE mapper for more sophisticated filtering: + +```python +# app/core/mapping/sse_mapper.py +class AdvancedSSEEventMapper(SSEEventMapper): + def map_event_to_sse_message(self, event: Event, optimization_level: OptimizationLevel) -> Optional[str]: + # Custom filtering logic + if self._should_skip_event(event): + return None + + # Custom payload creation + payload = self._create_custom_payload(event, optimization_level) + + # Custom serialization + return self._serialize_payload(payload) + + def _should_skip_event(self, event: Event) -> bool: + # Skip system events, debug events, empty events, etc. + if event.author in ["system", "debug"]: + return True + if not event.content or not event.content.parts: + return True + return False + + def _create_custom_payload(self, event: Event, level: OptimizationLevel) -> Dict: + if level == OptimizationLevel.ULTRA_MINIMAL: + # Even more minimal than minimal + return {"t": self._extract_text_only(event)} + + return super()._create_minimal_payload(event) +``` + +#### **Streaming Performance Optimizations** + +1. **Batch Events**: Combine multiple streaming events (single chunk) into a single SSE message to reduce overhead. +```python +async def _generate_events_batched(self, req, sse_mapper, adk_services): + batch = [] + batch_size = 5 + + async for event in self._get_events(): + batch.append(event) + + if len(batch) >= batch_size: + # Process batch + combined_payload = self._combine_events(batch) + yield f"data: {json.dumps(combined_payload)}\n\n" + batch.clear() +``` + +2. **Compression**: +```python +import gzip +import json + +def _create_compressed_sse(self, payload): + json_str = json.dumps(payload, separators=(',', ':')) + compressed = gzip.compress(json_str.encode()) + # Use binary SSE or base64 encoding + return f"data: {base64.b64encode(compressed).decode()}\n\n" +``` + +3. **Event Deduplication**: +```python +class DedupSSEMapper(SSEEventMapper): + def __init__(self): + self._last_payloads = {} + + def map_event_to_sse_message(self, event, level): + payload = super().map_event_to_sse_message(event, level) + + # Skip if identical to last payload for this session + session_key = f"{event.session_id}_{event.author}" + if self._last_payloads.get(session_key) == payload: + return None + + self._last_payloads[session_key] = payload + return payload +``` + +### **Advanced Configuration** + +#### **Environment-Specific Settings** + +```python +# app/config/settings.py +class Settings(BaseSettings): + # SSE Optimization + sse_batch_size: int = Field(default=1, description="SSE batch size") + sse_compression: bool = Field(default=False, description="Enable SSE compression") + sse_dedupe: bool = Field(default=True, description="Enable event deduplication") + + # Performance + max_concurrent_sessions: int = Field(default=100) + session_timeout: int = Field(default=3600) + + # Custom features + enable_custom_auth: bool = Field(default=False) + custom_middleware: List[str] = Field(default=[]) + + class Config: + env_prefix = "ADK_" # Environment variables like ADK_SSE_BATCH_SIZE +``` + + +## 🤝 Contributing + +This template is designed to be extended and customized for your specific needs. Key extension points: + +1. **Router Classes**: Add domain-specific endpoints +2. **SSE Mappers**: Custom event processing and optimization +3. **Middleware**: Cross-cutting concerns +4. **Services**: Additional business logic services +5. **Configuration**: Environment-specific settings + +## 📚 Further Resources + +- **Google ADK Documentation**: https://google.github.io/adk-docs/ +- **FastAPI Documentation**: https://fastapi.tiangolo.com/ +- **Pydantic Settings**: https://docs.pydantic.dev/latest/concepts/pydantic_settings/ +- **Server-Sent Events**: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events + +--- + +**Happy coding!** 🚀 This template provides a solid foundation for building production-ready ADK extensions with modern Python patterns and performance optimizations. \ No newline at end of file diff --git a/contributing/samples/fastapi_modular_server/__init__.py b/contributing/samples/fastapi_modular_server/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py new file mode 100644 index 0000000000..9003021a66 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py @@ -0,0 +1 @@ +from app.agents.greetings_agent.greeting_agent import root_agent diff --git a/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greeting_agent.py b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greeting_agent.py new file mode 100644 index 0000000000..e5ab697686 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greeting_agent.py @@ -0,0 +1,9 @@ +from google.adk.agents import LlmAgent + +root_agent = LlmAgent( + model="gemini-2.5-flash", + name="greetings_agent", + description="A friendly Google Gemini-powered agent", + instruction="You are a helpful AI assistant powered by Google Gemini.", + tools=[], +) diff --git a/contributing/samples/fastapi_modular_server/app/api/__init__.py b/contributing/samples/fastapi_modular_server/app/api/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py new file mode 100644 index 0000000000..a1f2c1b1f7 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py @@ -0,0 +1,309 @@ +import logging +import threading +from contextlib import asynccontextmanager +from importlib.resources import files +from pathlib import Path +from typing import Any, Dict, List + +# Import the new modular routers +from app.api.routers.agent_router import AgentRouter +from app.config.settings import Settings +from app.core.dependencies import ADKServices +from fastapi import FastAPI + +# Import agent refresh capabilities +from watchdog.observers import Observer + +from google.adk.cli.adk_web_server import AdkWebServer +from google.adk.cli.utils.agent_change_handler import AgentChangeEventHandler + +# Configure logging +logger = logging.getLogger(__name__) + + +class CustomAdkWebServer(AdkWebServer): + """ + Enhanced ADK Web Server with modular router support and agent refresh capabilities. + Maintains backward compatibility while adding robust agent reloading functionality. + """ + + def __init__( + self, settings: Settings, adk_services: ADKServices, agents_dir: str + ): + """ + Initialize the custom ADK web server. + + Args: + settings: Application settings + adk_services: Container with all ADK services + agents_dir: Directory containing agents + *args: Additional arguments for parent class + **kwargs: Additional keyword arguments for parent class + """ + self.settings = settings + self.adk_services = adk_services + self.reload_agents = settings.reload_agents + self.agents_root = Path(agents_dir) + + # Extract services from container for parent class + # pass individual services as keyword arguments to the super init + super().__init__( + agent_loader=self.adk_services.agent_loader, + session_service=self.adk_services.session_service, + memory_service=self.adk_services.memory_service, + artifact_service=self.adk_services.artifact_service, + credential_service=self.adk_services.credential_service, + eval_sets_manager=self.adk_services.eval_sets_manager, + eval_set_results_manager=self.adk_services.eval_set_results_manager, + agents_dir=str(self.agents_root), + ) + + # App specific routers + self._routers: List[Dict[str, Any]] = [] + self._health_checks: List[callable] = [] + + # Modular routers + self.agent_router: AgentRouter | None = None + + # Agent refresh components + self.observer = None + self.agent_change_handler = None + self._setup_agent_refresh() + + def _setup_agent_refresh(self): + """Initialize agent refresh capabilities if enabled.""" + if not self.reload_agents: + logger.info("Agent refresh disabled.") + return + + try: + self.observer = Observer() + self.agent_change_handler = AgentChangeEventHandler( + agent_loader=self.adk_services.agent_loader, + runners_to_clean=self.runners_to_clean, + current_app_name_ref=self.current_app_name_ref, + ) + self._start_observer() + logger.info(f"Agent refresh enabled for root: {self.agents_root}") + except Exception as e: + logger.error(f"Failed to setup agent refresh: {e}", exc_info=True) + self.reload_agents = False + + def _start_observer(self): + """Start the file system observer for agent changes.""" + if not self.observer or not self.agent_change_handler: + return + + try: + if self.agents_root.exists(): + self.observer.schedule( + self.agent_change_handler, str(self.agents_root), recursive=True + ) + observer_thread = threading.Thread( + target=self.observer.start, daemon=True + ) + observer_thread.start() + logger.info(f"Started file system observer for: {self.agents_root}") + except Exception as e: + logger.error(f"Failed to start observer: {e}", exc_info=True) + + def _stop_observer(self): + """Stop the file system observer.""" + if self.observer and self.observer.is_alive(): + try: + self.observer.stop() + self.observer.join() + logger.info("Stopped file system observer") + except Exception as e: + logger.error(f"Error stopping observer: {e}", exc_info=True) + + def _initialize_routers(self): + """Initialize the modular routers.""" + try: + # Pass the web server instance, which now has proper dependency injection + self.agent_router = AgentRouter(self) + logger.info("AgentRouter initialized successfully.") + + except Exception as e: + logger.error(f"Failed to initialize modular routers: {e}", exc_info=True) + + def get_enhanced_fast_api_app(self) -> FastAPI: + """Assemble and return the enhanced FastAPI application.""" + web_assets_dir = None + if self.settings.serve_web_interface: + try: + web_assets_dir = str(files("google.adk.cli.browser").joinpath("")) + except Exception: + logger.warning( + "Could not locate ADK web assets. UI will not be served." + ) + + # Get the FastAPI app from ADK with custom lifespan + app = self.get_fast_api_app( + lifespan=self._setup_custom_lifespan(), + allow_origins=self.settings.allow_origins, + web_assets_dir=web_assets_dir, + ) + + # Add custom routes defined directly on this server + self.add_custom_routes(app) + + # Initialize and register our modular routers + self._initialize_routers() + self._register_modular_routers(app) + + return app + + def add_custom_routes(self, app: FastAPI): + """Add server-specific, non-modular routes to the app.""" + + @app.get("/diagnostic", tags=["Diagnostics"]) + async def diagnostic(): + """Provides diagnostic information about the server setup.""" + agent_loader_status = ( + "initialized" if self.adk_services.agent_loader else "not_initialized" + ) + + return { + "status": "success", + "message": "Server components are active.", + "agent_loader_status": agent_loader_status, + "settings": { + "agent_dir": str(self.settings.agent_parent_dir), + "reload_agents": self.settings.reload_agents, + "debug": self.settings.debug, + "app_name": self.settings.app_name, + }, + "services": { + "session_service": type( + self.adk_services.session_service + ).__name__, + "memory_service": type(self.adk_services.memory_service).__name__, + "artifact_service": type( + self.adk_services.artifact_service + ).__name__, + }, + } + + @app.get("/health", tags=["Health"]) + async def health_check(): + """Health check endpoint.""" + try: + # Perform basic health checks + checks = { + "agent_loader": self.adk_services.agent_loader is not None, + "session_service": self.adk_services.session_service is not None, + "memory_service": self.adk_services.memory_service is not None, + } + + all_healthy = all(checks.values()) + + return { + "status": "healthy" if all_healthy else "degraded", + "checks": checks, + "timestamp": __import__("time").time(), + } + except Exception as e: + logger.error(f"Health check failed: {e}") + return { + "status": "unhealthy", + "error": str(e), + "timestamp": __import__("time").time(), + } + + @app.get("/", include_in_schema=False) + async def root(): + """Root endpoint with API information.""" + return { + "message": "ADK Custom FastAPI server is running", + "app_name": self.settings.app_name, + "docs_url": "/docs", + "diagnostic_url": "/diagnostic", + "health_url": "/health", + } + + def _register_modular_routers(self, app: FastAPI): + """Register the main modular routers, overriding ADK defaults.""" + try: + routes_to_remove = [] + for route in app.routes: + # Identify original ADK routes by path and methods + if route.path in [ + "/run_sse", + ] and hasattr(route, "methods"): + # Check if it's a POST method route + if "POST" in route.methods: + routes_to_remove.append(route) + + for route in routes_to_remove: + app.routes.remove(route) + + if routes_to_remove: + logger.info( + f"Successfully removed {len(routes_to_remove)} original ADK routes" + " for override." + ) + + if self.agent_router: + app.include_router( + self.agent_router.get_router(), + ) + logger.info( + "Registered AgentRouter, overriding default agent endpoints." + ) + + except Exception as e: + logger.error(f"Failed to register modular routers: {e}", exc_info=True) + + def _setup_custom_lifespan(self): + """Setup custom lifespan events for startup and shutdown.""" + + @asynccontextmanager + async def custom_lifespan(app: FastAPI): + logger.info("Server startup sequence initiated...") + + # Startup logic + try: + # Validate services are properly initialized + if not self.adk_services.agent_loader: + logger.warning("Agent loader not properly initialized") + + # Log configuration + logger.info( + f"Server configured with settings: {self.settings.app_name}" + ) + logger.info(f"Agent directory: {self.agents_root}") + logger.info(f"Debug mode: {self.settings.debug}") + + except Exception as e: + logger.error(f"Error during startup: {e}", exc_info=True) + + yield + + # Shutdown logic + logger.info("Server shutdown sequence initiated...") + try: + self._stop_observer() + + # Additional cleanup can go here + logger.info("Server shutdown completed successfully") + + except Exception as e: + logger.error(f"Error during shutdown: {e}", exc_info=True) + + return custom_lifespan + + def get_service(self, service_name: str) -> Any: + """ + Get a specific service from the ADK services container. + + Args: + service_name: Name of the service to retrieve + + Returns: + The requested service + + Raises: + AttributeError: If the service doesn't exist + """ + return getattr(self.adk_services, service_name) diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/__init__.py b/contributing/samples/fastapi_modular_server/app/api/routers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py new file mode 100644 index 0000000000..f3c2d9e356 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import logging +from typing import AsyncGenerator + +from app.core.dependencies import ADKServices, get_sse_event_mapper +from app.core.mapping.sse_event_mapper import SSEEventMapper +from app.models.streaming_request import RunAgentRequestOptimized +from fastapi import APIRouter, Depends, HTTPException +from starlette.responses import StreamingResponse + +from google.adk.agents.run_config import RunConfig, StreamingMode +from google.adk.utils.context_utils import Aclosing + +logger = logging.getLogger(__name__) + + +class AgentRouter: + """Agent-related endpoints router.""" + + def __init__(self, web_server_instance): + self.web_server = web_server_instance + self.router = APIRouter() + self._setup_routes() + + def _setup_routes(self): + """Setup all agent-related routes.""" + + @self.router.post("/run_sse") + async def custom_run_agent_sse( + req: RunAgentRequestOptimized, + sse_event_mapper: SSEEventMapper = Depends(get_sse_event_mapper), + ) -> StreamingResponse: + """ + Custom implementation of the run_agent_sse endpoint with enhanced logging. + """ + session = await self.web_server.session_service.get_session( + app_name=req.app_name, user_id=req.user_id, session_id=req.session_id + ) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + logger.info( + f"Starting CUSTOM SSE for app: {req.app_name} and user: {req.user_id}" + ) + + return StreamingResponse( + self._generate_events(req, sse_event_mapper), + media_type="text/event-stream", + ) + + async def _generate_events( + self, + req: RunAgentRequestOptimized, + sse_event_mapper: SSEEventMapper, + ) -> AsyncGenerator[str, None]: + """Generate SSE events for the agent run.""" + try: + yield ( + 'data: {"status": "Starting custom SSE process.", "timestamp": "' + + str(__import__("time").time()) + + '"}\n\n' + ) + + stream_mode = StreamingMode.SSE if req.streaming else StreamingMode.NONE + + runner = await self.web_server.get_runner_async(req.app_name) + + async with Aclosing( + runner.run_async( + user_id=req.user_id, + session_id=req.session_id, + new_message=req.new_message, + state_delta=req.state_delta, + run_config=RunConfig(streaming_mode=stream_mode), + ) + ) as agen: + async for event in agen: + logger.debug(f"Received event: {event}") + sse_message = sse_event_mapper.map_event_to_sse_message( + event, req.optimization_level + ) + if sse_message: + yield sse_message + + except Exception as e: + logger.error(f"Error in SSE handler: {str(e)}", exc_info=True) + yield f'data: {{"error": "An error occurred: {str(e)}"}}' + + def get_router(self) -> APIRouter: + """Returns the configured FastAPI router.""" + return self.router diff --git a/contributing/samples/fastapi_modular_server/app/config/settings.py b/contributing/samples/fastapi_modular_server/app/config/settings.py new file mode 100644 index 0000000000..cbdc90ee5c --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/config/settings.py @@ -0,0 +1,58 @@ +from pathlib import Path +from typing import List, Optional + +from pydantic import Field +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings using Pydantic for validation and environment variable support.""" + + # Application + app_name: str = "ADK Agent FastAPI Server" + debug: bool = Field(default=False, description="Enable debug mode") + port: int = Field(default=8881, description="Server port") + host: str = Field(default="localhost", description="Server host") + + # Paths + current_dir: Path = Field( + default_factory=lambda: Path(__file__).parent.parent.absolute() + ) + agent_parent_dir: Optional[Path] = None + artifact_root_path: Optional[Path] = None + + # Database + session_db_url: Optional[str] = None + + # ADK Configuration + serve_web_interface: bool = Field( + default=True, description="Serve web interface" + ) + reload_agents: bool = Field( + default=True, description="Enable agent hot-reload" + ) + + # CORS + allow_origins: List[str] = Field( + default=["*"], description="CORS allowed origins" + ) + + # Logging + log_level: str = Field(default="INFO", description="Logging level") + + model_config = { + "env_file": ".env", + "env_file_encoding": "utf-8", + "case_sensitive": False, + "extra": "allow", # Allows extra fields and makes them accessible + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + # Set computed defaults after initialization + if self.agent_parent_dir is None: + self.agent_parent_dir = self.current_dir / "agents" + + +# Global settings instance +settings = Settings() diff --git a/contributing/samples/fastapi_modular_server/app/core/__init__.py b/contributing/samples/fastapi_modular_server/app/core/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/core/dependencies.py b/contributing/samples/fastapi_modular_server/app/core/dependencies.py new file mode 100644 index 0000000000..a2a825d356 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/core/dependencies.py @@ -0,0 +1,117 @@ +# dependencies.py - Improved version +import logging +from functools import lru_cache +from typing import Dict + +from app.config.settings import Settings +from app.core.mapping.sse_event_mapper import SSEEventMapper +from fastapi import Depends + +from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService +from google.adk.auth.credential_service.in_memory_credential_service import ( + InMemoryCredentialService, +) +from google.adk.cli.utils.agent_loader import AgentLoader +from google.adk.evaluation.local_eval_set_results_manager import ( + LocalEvalSetResultsManager, +) +from google.adk.evaluation.local_eval_sets_manager import LocalEvalSetsManager +from google.adk.memory import InMemoryMemoryService +from google.adk.sessions import ( + BaseSessionService, + DatabaseSessionService, + InMemorySessionService, +) + +logger = logging.getLogger(__name__) + + +# Dependency to get settings +def get_settings() -> Settings: + """Get application settings.""" + from app.config.settings import settings # Import here to avoid circular imports + + return settings + + +class ADKServices: + """Container for ADK services to avoid long parameter lists.""" + + def __init__( + self, + agent_loader: AgentLoader, + session_service: BaseSessionService, + memory_service: InMemoryMemoryService, + artifact_service: InMemoryArtifactService, + credential_service: InMemoryCredentialService, + eval_sets_manager: LocalEvalSetsManager, + eval_set_results_manager: LocalEvalSetResultsManager, + ): + self.agent_loader = agent_loader + self.session_service = session_service + self.memory_service = memory_service + self.artifact_service = artifact_service + self.credential_service = credential_service + self.eval_sets_manager = eval_sets_manager + self.eval_set_results_manager = eval_set_results_manager + + +def _create_adk_services_impl(settings: Settings) -> ADKServices: + """Internal function to create ADK services.""" + try: + # Ensure directories exist + settings.agent_parent_dir.mkdir(parents=True, exist_ok=True) + + logger.info(f'Looking for agents in: {settings.agent_parent_dir}') + + # Create services + agent_loader = AgentLoader(agents_dir=str(settings.agent_parent_dir)) + session_service = ( + DatabaseSessionService(db_url=settings.session_db_url) + if settings.session_db_url + else InMemorySessionService() + ) + memory_service = InMemoryMemoryService() + artifact_service = InMemoryArtifactService() + credential_service = InMemoryCredentialService() + eval_sets_manager = LocalEvalSetsManager( + agents_dir=str(settings.agent_parent_dir) + ) + eval_set_results_manager = LocalEvalSetResultsManager( + agents_dir=str(settings.agent_parent_dir) + ) + + logger.info('All ADK services created successfully') + + return ADKServices( + agent_loader=agent_loader, + session_service=session_service, + memory_service=memory_service, + artifact_service=artifact_service, + credential_service=credential_service, + eval_sets_manager=eval_sets_manager, + eval_set_results_manager=eval_set_results_manager, + ) + + except Exception as e: + logger.error(f'Failed to create ADK services: {e}') + raise + + +# Cache based on settings identity to ensure singleton behavior +_adk_services_cache: Dict[int, ADKServices] = {} + + +def get_adk_services(settings: Settings = Depends(get_settings)) -> ADKServices: + """Dependency provider for ADK services as a singleton.""" + settings_id = id(settings) + if settings_id not in _adk_services_cache: + _adk_services_cache[settings_id] = _create_adk_services_impl(settings) + return _adk_services_cache[settings_id] + + +# Use lru_cache for stateless singletons +@lru_cache() +def get_sse_event_mapper() -> SSEEventMapper: + """Dependency provider for the SSEEventMapper as a singleton.""" + return SSEEventMapper() diff --git a/contributing/samples/fastapi_modular_server/app/core/logging.py b/contributing/samples/fastapi_modular_server/app/core/logging.py new file mode 100644 index 0000000000..a7990ce591 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/core/logging.py @@ -0,0 +1,20 @@ +import logging +import sys + + +def setup_logging(settings) -> None: + """Configure application logging.""" + logging.basicConfig( + level=getattr(logging, settings.log_level.upper()), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler("app.log") + if not settings.debug + else logging.NullHandler(), + ], + ) + + # Configure specific loggers + logging.getLogger("uvicorn.access").setLevel(logging.WARNING) + logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING) diff --git a/contributing/samples/fastapi_modular_server/app/core/mapping/__init__.py b/contributing/samples/fastapi_modular_server/app/core/mapping/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py new file mode 100644 index 0000000000..9975d072e7 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py @@ -0,0 +1,64 @@ +import json +import logging +from typing import Any, Dict, Optional + +from app.models.streaming_request import OptimizationLevel + +from google.adk.events.event import Event + +logger = logging.getLogger(__name__) + + +class SSEEventMapper: + """ + Maps and optimizes ADK Events into Server-Sent Event (SSE) messages. + """ + + def map_event_to_sse_message( + self, event: Event, optimization_level: OptimizationLevel + ) -> Optional[str]: + try: + if optimization_level == OptimizationLevel.MINIMAL: + payload = self._create_minimal_payload(event) + elif optimization_level == OptimizationLevel.BALANCED: + payload = self._create_balanced_payload(event) + else: + payload = self._create_full_compat_payload(event) + + if payload is None: + return None + + sse_json = json.dumps(payload, separators=(",", ":")) + return f"data: {sse_json}\n\n" + + except (TypeError, AttributeError) as e: + logger.error(f"Failed to map event: {e}", exc_info=True) + return f"data: {event.model_dump_json(exclude_none=True)}\n\n" + + def _create_minimal_payload(self, event: Event) -> Optional[Dict[str, Any]]: + # Skip debug or system events based on author + if event.author in ["system", "debug"]: + return None + + payload = {"author": event.author} + + # Extract text content if available + if event.content and event.content.parts: + text_parts = [] + for part in event.content.parts: + if hasattr(part, "text") and part.text: + text_parts.append(part.text) + if text_parts: + payload["text"] = " ".join(text_parts) + + return payload + + def _create_balanced_payload(self, event: Event) -> Optional[Dict[str, Any]]: + payload = self._create_minimal_payload(event) + if payload is None: + return None + payload["invocation_id"] = event.invocation_id + return payload + + def _create_full_compat_payload(self, event: Event) -> Dict[str, Any]: + return event.model_dump(exclude_none=True) diff --git a/contributing/samples/fastapi_modular_server/app/models/__init__.py b/contributing/samples/fastapi_modular_server/app/models/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/contributing/samples/fastapi_modular_server/app/models/streaming_request.py b/contributing/samples/fastapi_modular_server/app/models/streaming_request.py new file mode 100644 index 0000000000..787fbf8318 --- /dev/null +++ b/contributing/samples/fastapi_modular_server/app/models/streaming_request.py @@ -0,0 +1,20 @@ +from enum import Enum + +from google.adk.cli.adk_web_server import RunAgentRequest + + +class OptimizationLevel(str, Enum): + """Enumeration for the available SSE optimization levels.""" + + MINIMAL = "minimal" + BALANCED = "balanced" + FULL_COMPAT = "full_compat" + + +class RunAgentRequestOptimized(RunAgentRequest): + """ + Request model for the enhanced SSE endpoint. + This can extend the ADK's `RunAgentRequest` if available, or be standalone. + """ + + optimization_level: OptimizationLevel = OptimizationLevel.FULL_COMPAT diff --git a/contributing/samples/fastapi_modular_server/main.py b/contributing/samples/fastapi_modular_server/main.py new file mode 100644 index 0000000000..c2e659271f --- /dev/null +++ b/contributing/samples/fastapi_modular_server/main.py @@ -0,0 +1,80 @@ +# main.py - Improved version +import logging + +import uvicorn +from app.api.custom_adk_server import CustomAdkWebServer +from app.config.settings import settings +from app.core.dependencies import get_adk_services +from app.core.logging import setup_logging +from fastapi import FastAPI + +logger = logging.getLogger(__name__) + + +def create_app() -> FastAPI: + """ + Application Factory. + + Creates and configures the main FastAPI application instance by orchestrating + the setup of logging, services, and the custom web server. + + Returns: + The fully configured FastAPI application instance. + """ + # 1. Setup logging as the very first step. + setup_logging(settings) + + logger.info( + f"Starting application factory for '{settings.app_name}' " + f"in {'DEBUG' if settings.debug else 'PRODUCTION'} mode." + ) + + try: + # 2. Create the foundational ADK services. + logger.debug("Initializing core ADK services...") + adk_services = get_adk_services(settings) + + # 3. Create an instance of your custom web server. + logger.debug("Creating CustomAdkWebServer instance...") + custom_server = CustomAdkWebServer( + settings=settings, + adk_services=adk_services, # Pass the services container + agents_dir=str(settings.agent_parent_dir), + ) + + # 4. Get the final, fully configured FastAPI app + logger.debug( + "Assembling the enhanced FastAPI app from the custom server..." + ) + app = custom_server.get_enhanced_fast_api_app() + + logger.info("FastAPI application created and configured successfully.") + return app + + except Exception as e: + logger.critical( + f"FATAL: Failed to create FastAPI application: {e}", exc_info=True + ) + raise + + +# Create the global 'app' instance by calling the factory. +app = create_app() + + +if __name__ == "__main__": + print("--- Starting ADK FastAPI Server for Development ---") + print(f"Host: http://{settings.host}:{settings.port}") + print(f"API Docs: http://{settings.host}:{settings.port}/docs") + if settings.serve_web_interface: + print(f"Web UI: http://{settings.host}:{settings.port}/ui") + print(f"Reload on changes: {settings.debug}") + print("----------------------------------------------------") + + uvicorn.run( + "main:app", + host=settings.host, + port=settings.port, + reload=settings.debug, + log_level=settings.log_level.lower(), + ) From 7beab8c8b331f3f010012ec672ee7a9600055175 Mon Sep 17 00:00:00 2001 From: FrigaZzz Date: Mon, 15 Sep 2025 22:02:38 +0200 Subject: [PATCH 2/6] refactor: replace __import__("time") with time module import Standardize time module usage across the codebase by replacing dynamic imports with direct imports. Also improve JSON error handling in SSE responses and update README numbering. --- .../samples/fastapi_modular_server/README.md | 39 ++++++------------- .../app/api/custom_adk_server.py | 5 ++- .../app/api/routers/agent_router.py | 8 ++-- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/contributing/samples/fastapi_modular_server/README.md b/contributing/samples/fastapi_modular_server/README.md index 64ac221ccd..8ec4e1676e 100644 --- a/contributing/samples/fastapi_modular_server/README.md +++ b/contributing/samples/fastapi_modular_server/README.md @@ -79,7 +79,7 @@ vim .env ``` -### 3. **Run the Server** +### 2. **Run the Server** ```bash # Development mode with hot-reload python main.py @@ -158,7 +158,7 @@ def _register_modular_routers(self, app: FastAPI): app.routes.remove(route) ``` -#### **Method 3: Middleware Interception** +#### **Method 2: Middleware Interception** For more complex overrides, use middleware: @@ -190,7 +190,7 @@ class AgentRouter: session = await self.web_server.session_service.list_sessions() # Access runners through web server - runner = await self.web_server.runner_dict... + runner = await self.web_server.get_runner_async("your_app_name") # Access other web server properties runners_cache = self.web_server.runners_to_clean @@ -205,6 +205,15 @@ class AgentRouter: Extend the SSE mapper for more sophisticated filtering: ```python +# app/models/streaming_request.py +class OptimizationLevel(str, Enum): + """Enumeration for the available SSE optimization levels.""" + + MINIMAL = "minimal" + BALANCED = "balanced" + FULL_COMPAT = "full_compat" + ULTRA_MINIMAL = "ultra_minimal" + # app/core/mapping/sse_mapper.py class AdvancedSSEEventMapper(SSEEventMapper): def map_event_to_sse_message(self, event: Event, optimization_level: OptimizationLevel) -> Optional[str]: @@ -282,30 +291,6 @@ class DedupSSEMapper(SSEEventMapper): return payload ``` -### **Advanced Configuration** - -#### **Environment-Specific Settings** - -```python -# app/config/settings.py -class Settings(BaseSettings): - # SSE Optimization - sse_batch_size: int = Field(default=1, description="SSE batch size") - sse_compression: bool = Field(default=False, description="Enable SSE compression") - sse_dedupe: bool = Field(default=True, description="Enable event deduplication") - - # Performance - max_concurrent_sessions: int = Field(default=100) - session_timeout: int = Field(default=3600) - - # Custom features - enable_custom_auth: bool = Field(default=False) - custom_middleware: List[str] = Field(default=[]) - - class Config: - env_prefix = "ADK_" # Environment variables like ADK_SSE_BATCH_SIZE -``` - ## 🤝 Contributing diff --git a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py index a1f2c1b1f7..862fb977bb 100644 --- a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py +++ b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py @@ -1,5 +1,6 @@ import logging import threading +import time from contextlib import asynccontextmanager from importlib.resources import files from pathlib import Path @@ -201,14 +202,14 @@ async def health_check(): return { "status": "healthy" if all_healthy else "degraded", "checks": checks, - "timestamp": __import__("time").time(), + "timestamp": time.time(), } except Exception as e: logger.error(f"Health check failed: {e}") return { "status": "unhealthy", "error": str(e), - "timestamp": __import__("time").time(), + "timestamp": time.time(), } @app.get("/", include_in_schema=False) diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py index f3c2d9e356..026622dc5f 100644 --- a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py +++ b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py @@ -1,9 +1,11 @@ from __future__ import annotations +import json import logging +import time from typing import AsyncGenerator -from app.core.dependencies import ADKServices, get_sse_event_mapper +from app.core.dependencies import get_sse_event_mapper from app.core.mapping.sse_event_mapper import SSEEventMapper from app.models.streaming_request import RunAgentRequestOptimized from fastapi import APIRouter, Depends, HTTPException @@ -58,7 +60,7 @@ async def _generate_events( try: yield ( 'data: {"status": "Starting custom SSE process.", "timestamp": "' - + str(__import__("time").time()) + + str(time.time()) + '"}\n\n' ) @@ -85,7 +87,7 @@ async def _generate_events( except Exception as e: logger.error(f"Error in SSE handler: {str(e)}", exc_info=True) - yield f'data: {{"error": "An error occurred: {str(e)}"}}' + yield f'data: {json.dumps({"error": f"An error occurred: {str(e)}"})}\n\n' def get_router(self) -> APIRouter: """Returns the configured FastAPI router.""" From 0a3a0a40ee97771a22168ae1d090ec3364b9774a Mon Sep 17 00:00:00 2001 From: FrigaZzz Date: Mon, 15 Sep 2025 22:22:33 +0200 Subject: [PATCH 3/6] refactor: improve code readability and maintainability - Use json.dumps for SSE event formatting in agent_router - Replace loop with list comprehension in sse_event_mapper - Remove unused variables and outdated comments in custom_adk_server - Specify ModuleNotFoundError exception for web assets loading --- .../fastapi_modular_server/app/api/custom_adk_server.py | 8 +------- .../app/api/routers/agent_router.py | 5 ++--- .../app/core/mapping/sse_event_mapper.py | 9 +++++---- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py index 862fb977bb..f919bf671a 100644 --- a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py +++ b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py @@ -38,8 +38,6 @@ def __init__( settings: Application settings adk_services: Container with all ADK services agents_dir: Directory containing agents - *args: Additional arguments for parent class - **kwargs: Additional keyword arguments for parent class """ self.settings = settings self.adk_services = adk_services @@ -59,10 +57,6 @@ def __init__( agents_dir=str(self.agents_root), ) - # App specific routers - self._routers: List[Dict[str, Any]] = [] - self._health_checks: List[callable] = [] - # Modular routers self.agent_router: AgentRouter | None = None @@ -134,7 +128,7 @@ def get_enhanced_fast_api_app(self) -> FastAPI: if self.settings.serve_web_interface: try: web_assets_dir = str(files("google.adk.cli.browser").joinpath("")) - except Exception: + except ModuleNotFoundError: logger.warning( "Could not locate ADK web assets. UI will not be served." ) diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py index 026622dc5f..ae46fcf169 100644 --- a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py +++ b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py @@ -59,9 +59,8 @@ async def _generate_events( """Generate SSE events for the agent run.""" try: yield ( - 'data: {"status": "Starting custom SSE process.", "timestamp": "' - + str(time.time()) - + '"}\n\n' + "data:" + f" {json.dumps({'status': 'Starting custom SSE process.', 'timestamp': time.time()})}\n\n" ) stream_mode = StreamingMode.SSE if req.streaming else StreamingMode.NONE diff --git a/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py index 9975d072e7..9ad29034df 100644 --- a/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py +++ b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py @@ -44,10 +44,11 @@ def _create_minimal_payload(self, event: Event) -> Optional[Dict[str, Any]]: # Extract text content if available if event.content and event.content.parts: - text_parts = [] - for part in event.content.parts: - if hasattr(part, "text") and part.text: - text_parts.append(part.text) + text_parts = [ + part.text + for part in event.content.parts + if hasattr(part, "text") and part.text + ] if text_parts: payload["text"] = " ".join(text_parts) From 235bca7d8821f2151a1ddf5e50b95a483cbdee88 Mon Sep 17 00:00:00 2001 From: FrigaZzz Date: Mon, 15 Sep 2025 22:33:31 +0200 Subject: [PATCH 4/6] refactor: improve type hints and code organization - Update type hints from typing.List/Dict to modern list/dict - Add TYPE_CHECKING imports where needed - Rename greetings_agent file for consistency - Expand .gitignore with common development artifacts - Clean up unused imports in multiple files --- .../samples/fastapi_modular_server/.gitignore | 19 +++++++++++++++++++ .../app/agents/greetings_agent/__init__.py | 2 +- .../{greeting_agent.py => greetings_agent.py} | 0 .../app/api/custom_adk_server.py | 2 +- .../app/api/routers/agent_router.py | 7 ++++++- .../app/config/settings.py | 4 ++-- .../app/core/dependencies.py | 3 +-- .../app/core/logging.py | 7 ++++++- 8 files changed, 36 insertions(+), 8 deletions(-) rename contributing/samples/fastapi_modular_server/app/agents/greetings_agent/{greeting_agent.py => greetings_agent.py} (100%) diff --git a/contributing/samples/fastapi_modular_server/.gitignore b/contributing/samples/fastapi_modular_server/.gitignore index 397b4a7624..e7febef8a4 100644 --- a/contributing/samples/fastapi_modular_server/.gitignore +++ b/contributing/samples/fastapi_modular_server/.gitignore @@ -1 +1,20 @@ +# Logs *.log + +# Environment variables +.env + +# Python bytecode +*.pyc +__pycache__/ + +# Test artifacts +.pytest_cache/ + +# Virtual environments +.venv/ +venv/ + +# IDE configuration +.vscode/ +.idea/ \ No newline at end of file diff --git a/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py index 9003021a66..bd486a4e89 100644 --- a/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py +++ b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/__init__.py @@ -1 +1 @@ -from app.agents.greetings_agent.greeting_agent import root_agent +from app.agents.greetings_agent.greetings_agent import root_agent diff --git a/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greeting_agent.py b/contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greetings_agent.py similarity index 100% rename from contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greeting_agent.py rename to contributing/samples/fastapi_modular_server/app/agents/greetings_agent/greetings_agent.py diff --git a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py index f919bf671a..865e387f0e 100644 --- a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py +++ b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py @@ -4,7 +4,7 @@ from contextlib import asynccontextmanager from importlib.resources import files from pathlib import Path -from typing import Any, Dict, List +from typing import Any # Import the new modular routers from app.api.routers.agent_router import AgentRouter diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py index ae46fcf169..36736ed3e5 100644 --- a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py +++ b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py @@ -1,5 +1,10 @@ from __future__ import annotations +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from app.api.custom_adk_server import CustomAdkWebServer + import json import logging import time @@ -20,7 +25,7 @@ class AgentRouter: """Agent-related endpoints router.""" - def __init__(self, web_server_instance): + def __init__(self, web_server_instance: "CustomAdkWebServer"): self.web_server = web_server_instance self.router = APIRouter() self._setup_routes() diff --git a/contributing/samples/fastapi_modular_server/app/config/settings.py b/contributing/samples/fastapi_modular_server/app/config/settings.py index cbdc90ee5c..716dc52ad3 100644 --- a/contributing/samples/fastapi_modular_server/app/config/settings.py +++ b/contributing/samples/fastapi_modular_server/app/config/settings.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional +from typing import Optional from pydantic import Field from pydantic_settings import BaseSettings @@ -33,7 +33,7 @@ class Settings(BaseSettings): ) # CORS - allow_origins: List[str] = Field( + allow_origins: list[str] = Field( default=["*"], description="CORS allowed origins" ) diff --git a/contributing/samples/fastapi_modular_server/app/core/dependencies.py b/contributing/samples/fastapi_modular_server/app/core/dependencies.py index a2a825d356..673e1aaca2 100644 --- a/contributing/samples/fastapi_modular_server/app/core/dependencies.py +++ b/contributing/samples/fastapi_modular_server/app/core/dependencies.py @@ -1,7 +1,6 @@ # dependencies.py - Improved version import logging from functools import lru_cache -from typing import Dict from app.config.settings import Settings from app.core.mapping.sse_event_mapper import SSEEventMapper @@ -99,7 +98,7 @@ def _create_adk_services_impl(settings: Settings) -> ADKServices: # Cache based on settings identity to ensure singleton behavior -_adk_services_cache: Dict[int, ADKServices] = {} +_adk_services_cache: dict[int, ADKServices] = {} def get_adk_services(settings: Settings = Depends(get_settings)) -> ADKServices: diff --git a/contributing/samples/fastapi_modular_server/app/core/logging.py b/contributing/samples/fastapi_modular_server/app/core/logging.py index a7990ce591..1a198d79e9 100644 --- a/contributing/samples/fastapi_modular_server/app/core/logging.py +++ b/contributing/samples/fastapi_modular_server/app/core/logging.py @@ -1,8 +1,13 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from app.config.settings import Settings + import logging import sys -def setup_logging(settings) -> None: +def setup_logging(settings: Settings) -> None: """Configure application logging.""" logging.basicConfig( level=getattr(logging, settings.log_level.upper()), From a7ea7f3e34ef3bb493d470041c5034725d208f45 Mon Sep 17 00:00:00 2001 From: FrigaZzz Date: Tue, 16 Sep 2025 18:17:01 +0200 Subject: [PATCH 5/6] style: reorganize imports for better readability and consistency (./autorformat.sh) Reorganize imports to follow PEP 8 guidelines by grouping standard library, third-party, and local imports separately. This improves code readability and maintainability while keeping the same functionality. --- .../app/api/custom_adk_server.py | 12 +++++------- .../app/api/routers/agent_router.py | 10 ++++++---- .../app/core/dependencies.py | 19 ++++++------------- .../app/core/mapping/sse_event_mapper.py | 5 +++-- .../samples/fastapi_modular_server/main.py | 2 +- 5 files changed, 21 insertions(+), 27 deletions(-) diff --git a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py index 865e387f0e..9c51b90ce0 100644 --- a/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py +++ b/contributing/samples/fastapi_modular_server/app/api/custom_adk_server.py @@ -1,9 +1,9 @@ -import logging -import threading -import time from contextlib import asynccontextmanager from importlib.resources import files +import logging from pathlib import Path +import threading +import time from typing import Any # Import the new modular routers @@ -11,12 +11,10 @@ from app.config.settings import Settings from app.core.dependencies import ADKServices from fastapi import FastAPI - -# Import agent refresh capabilities -from watchdog.observers import Observer - from google.adk.cli.adk_web_server import AdkWebServer from google.adk.cli.utils.agent_change_handler import AgentChangeEventHandler +# Import agent refresh capabilities +from watchdog.observers import Observer # Configure logging logger = logging.getLogger(__name__) diff --git a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py index 36736ed3e5..46ccaa1070 100644 --- a/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py +++ b/contributing/samples/fastapi_modular_server/app/api/routers/agent_router.py @@ -13,11 +13,13 @@ from app.core.dependencies import get_sse_event_mapper from app.core.mapping.sse_event_mapper import SSEEventMapper from app.models.streaming_request import RunAgentRequestOptimized -from fastapi import APIRouter, Depends, HTTPException -from starlette.responses import StreamingResponse - -from google.adk.agents.run_config import RunConfig, StreamingMode +from fastapi import APIRouter +from fastapi import Depends +from fastapi import HTTPException +from google.adk.agents.run_config import RunConfig +from google.adk.agents.run_config import StreamingMode from google.adk.utils.context_utils import Aclosing +from starlette.responses import StreamingResponse logger = logging.getLogger(__name__) diff --git a/contributing/samples/fastapi_modular_server/app/core/dependencies.py b/contributing/samples/fastapi_modular_server/app/core/dependencies.py index 673e1aaca2..cbdc8d2a47 100644 --- a/contributing/samples/fastapi_modular_server/app/core/dependencies.py +++ b/contributing/samples/fastapi_modular_server/app/core/dependencies.py @@ -1,26 +1,19 @@ # dependencies.py - Improved version -import logging from functools import lru_cache +import logging from app.config.settings import Settings from app.core.mapping.sse_event_mapper import SSEEventMapper from fastapi import Depends - from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService -from google.adk.auth.credential_service.in_memory_credential_service import ( - InMemoryCredentialService, -) +from google.adk.auth.credential_service.in_memory_credential_service import InMemoryCredentialService from google.adk.cli.utils.agent_loader import AgentLoader -from google.adk.evaluation.local_eval_set_results_manager import ( - LocalEvalSetResultsManager, -) +from google.adk.evaluation.local_eval_set_results_manager import LocalEvalSetResultsManager from google.adk.evaluation.local_eval_sets_manager import LocalEvalSetsManager from google.adk.memory import InMemoryMemoryService -from google.adk.sessions import ( - BaseSessionService, - DatabaseSessionService, - InMemorySessionService, -) +from google.adk.sessions import BaseSessionService +from google.adk.sessions import DatabaseSessionService +from google.adk.sessions import InMemorySessionService logger = logging.getLogger(__name__) diff --git a/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py index 9ad29034df..c6fa3f17c0 100644 --- a/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py +++ b/contributing/samples/fastapi_modular_server/app/core/mapping/sse_event_mapper.py @@ -1,9 +1,10 @@ import json import logging -from typing import Any, Dict, Optional +from typing import Any +from typing import Dict +from typing import Optional from app.models.streaming_request import OptimizationLevel - from google.adk.events.event import Event logger = logging.getLogger(__name__) diff --git a/contributing/samples/fastapi_modular_server/main.py b/contributing/samples/fastapi_modular_server/main.py index c2e659271f..887083b391 100644 --- a/contributing/samples/fastapi_modular_server/main.py +++ b/contributing/samples/fastapi_modular_server/main.py @@ -1,12 +1,12 @@ # main.py - Improved version import logging -import uvicorn from app.api.custom_adk_server import CustomAdkWebServer from app.config.settings import settings from app.core.dependencies import get_adk_services from app.core.logging import setup_logging from fastapi import FastAPI +import uvicorn logger = logging.getLogger(__name__) From ff07aaeb15b73bb9f092f80677dea1dc9b3ac591 Mon Sep 17 00:00:00 2001 From: FrigaZzz Date: Wed, 17 Sep 2025 22:16:44 +0200 Subject: [PATCH 6/6] docs: Update README to reflect changes in custom server file structure and remove obsolete SSE optimization examples --- .../samples/fastapi_modular_server/README.md | 60 +------------------ 1 file changed, 3 insertions(+), 57 deletions(-) diff --git a/contributing/samples/fastapi_modular_server/README.md b/contributing/samples/fastapi_modular_server/README.md index 8ec4e1676e..863d1ae6f3 100644 --- a/contributing/samples/fastapi_modular_server/README.md +++ b/contributing/samples/fastapi_modular_server/README.md @@ -121,7 +121,7 @@ class MyCustomRouter: Register it in the custom server: ```python -# In app/api/server.py - CustomAdkWebServer class +# In app/api/custom_adk_server.py - CustomAdkWebServer class def _initialize_routers(self): try: self.agent_router = AgentRouter(self) @@ -149,7 +149,8 @@ def _register_modular_routers(self, app: FastAPI): for route in app.routes: if route.path in [ "/run_sse", - "/apps/{app_name}/users/{user_id}/sessions" + # You could add additional ADK routes here if you want to override them, + # e.g., "/apps/{app_name}/users/{user_id}/sessions" ] and hasattr(route, 'methods') and 'POST' in route.methods: routes_to_remove.append(route) @@ -196,8 +197,6 @@ class AgentRouter: runners_cache = self.web_server.runners_to_clean ``` - - ### **Optimizing SSE Streaming** #### **Custom Event Filtering** @@ -243,55 +242,6 @@ class AdvancedSSEEventMapper(SSEEventMapper): return super()._create_minimal_payload(event) ``` -#### **Streaming Performance Optimizations** - -1. **Batch Events**: Combine multiple streaming events (single chunk) into a single SSE message to reduce overhead. -```python -async def _generate_events_batched(self, req, sse_mapper, adk_services): - batch = [] - batch_size = 5 - - async for event in self._get_events(): - batch.append(event) - - if len(batch) >= batch_size: - # Process batch - combined_payload = self._combine_events(batch) - yield f"data: {json.dumps(combined_payload)}\n\n" - batch.clear() -``` - -2. **Compression**: -```python -import gzip -import json - -def _create_compressed_sse(self, payload): - json_str = json.dumps(payload, separators=(',', ':')) - compressed = gzip.compress(json_str.encode()) - # Use binary SSE or base64 encoding - return f"data: {base64.b64encode(compressed).decode()}\n\n" -``` - -3. **Event Deduplication**: -```python -class DedupSSEMapper(SSEEventMapper): - def __init__(self): - self._last_payloads = {} - - def map_event_to_sse_message(self, event, level): - payload = super().map_event_to_sse_message(event, level) - - # Skip if identical to last payload for this session - session_key = f"{event.session_id}_{event.author}" - if self._last_payloads.get(session_key) == payload: - return None - - self._last_payloads[session_key] = payload - return payload -``` - - ## 🤝 Contributing This template is designed to be extended and customized for your specific needs. Key extension points: @@ -308,7 +258,3 @@ This template is designed to be extended and customized for your specific needs. - **FastAPI Documentation**: https://fastapi.tiangolo.com/ - **Pydantic Settings**: https://docs.pydantic.dev/latest/concepts/pydantic_settings/ - **Server-Sent Events**: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events - ---- - -**Happy coding!** 🚀 This template provides a solid foundation for building production-ready ADK extensions with modern Python patterns and performance optimizations. \ No newline at end of file