-
Notifications
You must be signed in to change notification settings - Fork 3
19 agent system
The Lobster AI agent system is built on a hierarchical multi-agent architecture using LangGraph for coordination. The system features a centralized agent registry, dynamic tool generation, and specialized domain experts that work together to provide comprehensive bioinformatics analysis capabilities.
The heart of the agent system is the centralized Agent Registry, which serves as the single source of truth for all agent configurations and eliminates code duplication.
graph TB
subgraph "Agent Registry Core"
REGISTRY[Agent Registry<br/>📋 Single Source of Truth]
CONFIG[AgentConfig Objects<br/>🔧 Metadata & Factory Functions]
HELPERS[Helper Functions<br/>🛠️ Dynamic Loading & Discovery]
end
subgraph "System Integration"
GRAPH[Graph Creation<br/>🕸️ LangGraph Agent Network]
CALLBACKS[Callback System<br/>📊 Monitoring & Events]
HANDOFFS[Handoff Tools<br/>🔄 Agent Communication]
end
subgraph "Dynamic Operations"
IMPORT[Dynamic Import<br/>📦 Runtime Agent Loading]
TOOLS[Tool Generation<br/>🔧 Automatic Handoff Creation]
DETECT[Agent Detection<br/>🔍 System Discovery]
end
REGISTRY --> CONFIG
CONFIG --> HELPERS
HELPERS --> GRAPH
HELPERS --> CALLBACKS
CONFIG --> HANDOFFS
CONFIG --> IMPORT
HANDOFFS --> TOOLS
HELPERS --> DETECT
classDef registry fill:#e8f5e8,stroke:#2e7d32,stroke-width:3px
classDef integration fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef dynamic fill:#fff3e0,stroke:#f57c00,stroke-width:2px
class REGISTRY,CONFIG,HELPERS registry
class GRAPH,CALLBACKS,HANDOFFS integration
class IMPORT,TOOLS,DETECT dynamic
Each agent is defined using a structured configuration object:
@dataclass
class AgentRegistryConfig:
"""Configuration for an agent in the system."""
name: str # Unique agent identifier
display_name: str # Human-readable name
description: str # Agent's capabilities description
factory_function: str # Module path to factory function
handoff_tool_name: Optional[str] # Name of handoff tool
handoff_tool_description: Optional[str] # Tool descriptionThe Supervisor Agent serves as the orchestrator and decision-maker for the entire system:
- Request Routing - Analyzes user queries and delegates to appropriate specialists
- Workflow Coordination - Maintains logical analysis sequences across agents
- Context Management - Ensures coherent conversation flow and data consistency
- Direct Response - Handles general questions without delegation
flowchart TD
USER[User Query] --> SUPERVISOR{Supervisor Agent}
SUPERVISOR --> |General Questions| DIRECT[Direct Response]
SUPERVISOR --> |Data Operations| DATA_EXPERT[Data Expert Agent]
SUPERVISOR --> |Literature Search| RESEARCH[Research Agent]
SUPERVISOR --> |Transcriptomics Analysis| TRANS_EXPERT[Transcriptomics Expert]
SUPERVISOR --> |Proteomics Analysis| PROT_EXPERT[Proteomics Expert]
SUPERVISOR --> |ML Tasks| ML_EXPERT[ML Expert]
classDef supervisor fill:#4caf50,stroke:#2e7d32,stroke-width:3px
classDef agent fill:#81c784,stroke:#388e3c,stroke-width:2px
classDef response fill:#ffb74d,stroke:#f57c00,stroke-width:2px
class SUPERVISOR supervisor
class DATA_EXPERT,RESEARCH,TRANS_EXPERT,PROT_EXPERT,ML_EXPERT agent
class DIRECT response
Each specialist agent focuses on a specific domain of bioinformatics analysis:
- Data Discovery - Locating and cataloging biological datasets
- Format Handling - Supporting multiple input formats (CSV, H5AD, 10X MTX, Excel)
- Quality Assessment - Initial data validation and profiling
- Workspace Management - Organizing datasets and maintaining data lineage
- Literature Mining - PubMed, bioRxiv, medRxiv search capabilities
- Dataset Discovery - Direct GEO DataSets search with advanced filtering
- Publication Analysis - DOI/PMID to dataset association
- Marker Gene Discovery - Literature-based gene signature extraction
Unified agent handling both single-cell and bulk RNA-seq analysis:
Single-Cell Capabilities:
- Quality Control - Comprehensive cell and gene filtering
- Preprocessing - Normalization, batch correction, doublet detection
- Dimensionality Reduction - PCA, UMAP, t-SNE implementation
- Clustering Analysis - Leiden/Louvain clustering with resolution optimization
- Cell Type Annotation - Manual and automated cell type assignment
- Visualization - QC plots, UMAP plots, feature plots, heatmaps
Bulk RNA-seq Capabilities:
- Sample QC - Sequencing depth and quality metrics
- Differential Expression - pyDESeq2 integration with statistical rigor
- Pathway Analysis - GO, KEGG, Reactome enrichment
- Formula Construction - R-style design matrices with agent guidance
- Iterative Analysis - Comparative DE analysis workflows
Unified agent handling both mass spectrometry and affinity proteomics analysis:
Mass Spectrometry Capabilities:
- DDA/DIA Workflows - MaxQuant and Spectronaut output processing
- Missing Value Handling - MNAR/MCAR pattern analysis (30-70% missing typical)
- Intensity Normalization - TMM, quantile, VSN methods
- Statistical Testing - Linear models with empirical Bayes
- Pathway Enrichment - Protein-centric pathway analysis
Affinity Proteomics Capabilities:
- Targeted Panels - Olink NPX processing and antibody array analysis
- Low Missing Values - Optimized for <30% missing data
- CV Analysis - Coefficient of variation assessment
- Antibody Validation - Quality control metrics for targeted assays
- Panel Harmonization - Cross-platform data integration
Merged into Research Agent - All method extraction capabilities now in Research Agent with Phase 1 auto-resolution.
- Data Preparation - Feature selection and normalization for ML
- Framework Export - sklearn, PyTorch, TensorFlow format conversion
- Model Readiness - Data quality assessment for ML workflows
- Split Generation - Stratified train/validation/test splits
The agent system is built on LangGraph's state machine framework:
stateDiagram-v2
[*] --> Supervisor
Supervisor --> DataExpert: Data tasks
Supervisor --> Research: Literature tasks
Supervisor --> Transcriptomics: Transcriptomics tasks (single-cell/bulk)
Supervisor --> Proteomics: Proteomics tasks (MS/affinity)
Supervisor --> MLExpert: ML tasks
DataExpert --> Supervisor: Results
Research --> Supervisor: Results
Transcriptomics --> Supervisor: Results
Proteomics --> Supervisor: Results
MLExpert --> Supervisor: Results
Supervisor --> [*]
The system creates the LangGraph dynamically based on the Agent Registry:
# Dynamic agent loading from registry
worker_agents = get_worker_agents()
for agent_name, agent_config in worker_agents.items():
# Import factory function dynamically
factory_function = import_agent_factory(agent_config.factory_function)
# Create agent instance
agent = factory_function(
data_manager=data_manager,
callback_handler=callback_handler,
agent_name=agent_config.name,
handoff_tools=None
)
# Create handoff tool
handoff_tool = create_custom_handoff_tool(
agent_name=agent_config.name,
name=agent_config.handoff_tool_name,
description=agent_config.handoff_tool_description
)Agents communicate through handoff tools that are automatically generated from the registry:
sequenceDiagram
participant User
participant Supervisor
participant DataExpert
participant Transcriptomics
participant DataManagerV2
User->>Supervisor: "Analyze my single-cell data"
Note over Supervisor: Analyzes request and checks available modalities
Supervisor->>DataManagerV2: list_available_modalities()
DataManagerV2-->>Supervisor: Available datasets
alt Data needs loading
Supervisor->>DataExpert: handoff_to_data_expert("Load dataset")
DataExpert->>DataManagerV2: load_modality()
DataManagerV2-->>DataExpert: Dataset loaded
DataExpert-->>Supervisor: "Data ready for analysis"
end
Supervisor->>Transcriptomics: handoff_to_transcriptomics_expert("Perform analysis")
Transcriptomics->>DataManagerV2: get_modality() + analysis
DataManagerV2-->>Transcriptomics: Processed results
Transcriptomics-->>Supervisor: "Analysis complete with visualizations"
Supervisor-->>User: Comprehensive results
Each agent maintains state through the shared DataManagerV2 instance:
- Modality Access - Agents retrieve and store data through named modalities
- Tool Usage Logging - All operations are tracked for provenance
- Plot Management - Visualizations are centrally managed and accessible
- Metadata Preservation - Analysis parameters and results are stored
The Lobster platform features a sophisticated expert-to-expert handoff system that enables true agent collaboration with context preservation, automatic return flow management, and type-safe parameter passing.
graph TB
subgraph "Enhanced Handoff Infrastructure"
EHM[ExpertHandoffManager<br/>🎯 Central Coordinator]
EHT[Enhanced Handoff Tools<br/>🔧 Type-Safe Validation]
HP[Handoff Patterns<br/>📋 Standardized Collaborations]
end
subgraph "Core Features"
CP[Context Preservation<br/>💾 Parameter Passing]
RF[Return Flow Logic<br/>🔄 A→B→A Patterns]
VL[Schema Validation<br/>✅ Type Safety]
EH[Error Handling<br/>🛡️ Graceful Degradation]
end
subgraph "Registry Integration"
AR[Agent Registry<br/>📋 Available Agents]
AT[Automatic Tools<br/>⚡ Dynamic Generation]
PM[Pattern Matching<br/>🎯 Smart Routing]
end
EHM --> CP
EHM --> RF
EHT --> VL
EHT --> EH
HP --> PM
AR --> AT
AT --> EHT
classDef infrastructure fill:#e8f5e8,stroke:#2e7d32,stroke-width:3px
classDef features fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef registry fill:#fff3e0,stroke:#f57c00,stroke-width:2px
class EHM,EHT,HP infrastructure
class CP,RF,VL,EH features
class AR,AT,PM registry
The ExpertHandoffManager serves as the central coordinator for all inter-agent communications:
from lobster.tools.expert_handoff_manager import expert_handoff_manager
# Create context-preserving handoff
handoff_context = create_handoff_context(
from_expert="transcriptomics_expert",
to_expert="machine_learning_expert",
task_type="scvi_training",
parameters={"modality_name": "data", "n_latent": 10},
return_expectations={"embedding_key": "X_scvi"}
)
# Execute handoff with tracking
command = expert_handoff_manager.create_context_preserving_handoff(
to_expert="machine_learning_expert",
context=handoff_context,
return_to_sender=True
)Key Capabilities:
- Context Preservation: Full parameter and state passing between agents
- Chain Tracking: Supports A→B→C→A patterns with loop prevention
- Return Flow Management: Automatic routing back to sender or supervisor
- Audit Trail: Complete handoff history for debugging and monitoring
- Concurrent Support: Multiple simultaneous handoffs without interference
The system includes 15+ pre-defined handoff patterns for common expert collaborations:
# Transcriptomics Expert → ML Expert (scVI training)
"transcriptomics_to_ml": {
"task_types": ["scvi_training", "deep_learning_embedding"],
"context_schema": SCVI_CONTEXT_SCHEMA,
"return_flow": "sender",
"priority": 10
}
# Data Expert → Research Agent (dataset discovery)
"data_to_research": {
"task_types": ["dataset_search", "metadata_extraction"],
"context_schema": DATA_LOADING_SCHEMA,
"return_flow": "sender",
"priority": 7
}Example: Transcriptomics Expert → ML Expert → Transcriptomics Expert (scVI Training)
sequenceDiagram
participant TE as Transcriptomics Expert
participant EHM as ExpertHandoffManager
participant ML as ML Expert
participant DM as DataManagerV2
Note over TE: User requests scVI embeddings
TE->>TE: Validate modality and parameters
TE->>EHM: Create handoff with context
Note over EHM: handoff_id: abc-123<br/>task_type: scvi_training<br/>parameters: {modality, n_latent, ...}
EHM->>EHM: Track active handoff
EHM-->>TE: LangGraph Command(goto=ML)
TE->>ML: Handoff with preserved context
Note over ML: Receives validated parameters<br/>and task description
ML->>DM: Train scVI model
DM-->>ML: Store embeddings in obsm['X_scvi']
ML->>EHM: Complete handoff with results
EHM->>EHM: Determine return path
EHM-->>ML: Command(goto=Transcriptomics)
ML->>TE: Return with results
Note over TE: Process handoff results<br/>Verify embeddings stored<br/>Continue analysis
TE-->>EHM: Cleanup completed handoff
All handoffs use schema-based validation for type safety:
# Schema definitions for different handoff types
SCVI_CONTEXT_SCHEMA = {
"modality_name": str,
"n_latent": int,
"batch_key": Optional[str],
"max_epochs": int,
"use_gpu": bool
}
# Validation with detailed error messages
def validate_context_schema(context: Dict[str, Any], schema: Dict[str, Type]):
"""Validate context against schema with detailed error reporting."""
validated = {}
errors = []
for field_name, field_type in schema.items():
if field_name not in context and not _is_optional(field_type):
errors.append(f"Required field '{field_name}' is missing")
elif field_name in context:
value = context[field_name]
if not _validate_type(value, field_type):
errors.append(f"Field '{field_name}' must be {field_type}")
if errors:
raise ValueError(f"Context validation failed: {'; '.join(errors)}")
return validatedThe Agent Registry System automatically creates handoff tools based on available agents:
# Automatic handoff tool creation
def create_expert_handoff_tools(available_agents: List[str]) -> Dict[str, BaseTool]:
"""Create handoff tools for all compatible expert pairs."""
handoff_tools = {}
for pattern_name, pattern in EXPERT_HANDOFF_PATTERNS.items():
if both_experts_available(pattern, available_agents):
for task_type in pattern.task_types:
tool_name = f"handoff_{pattern.from_expert}_to_{pattern.to_expert}_{task_type}"
handoff_tools[tool_name] = create_expert_handoff_tool(
from_expert=pattern.from_expert,
to_expert=pattern.to_expert,
task_type=task_type,
context_schema=pattern.context_schema,
return_to_sender=(pattern.return_flow == "sender")
)
return handoff_toolsThe enhanced handoff system includes comprehensive error handling:
# Graceful error handling in handoff tools
try:
# Validate context against schema
validated_context = validate_context_schema(context, SCVI_CONTEXT_SCHEMA)
# Create and execute handoff
handoff_command = expert_handoff_manager.create_context_preserving_handoff(
to_expert="machine_learning_expert",
context=handoff_context,
return_to_sender=True
)
return handoff_command
except ValueError as e:
# Context validation failed
return Command(
goto="__end__",
update={
"messages": state["messages"] + [
AIMessage(content=f"❌ Handoff validation failed: {str(e)}")
],
"handoff_error": str(e)
}
)
except Exception as e:
# Unexpected error - graceful degradation
return Command(
goto="__end__",
update={
"messages": state["messages"] + [
AIMessage(content=f"❌ Handoff failed: {str(e)}")
]
}
)The handoff system provides comprehensive monitoring capabilities:
# Get active handoffs for monitoring
active_handoffs = expert_handoff_manager.get_active_handoffs()
# Get handoff history for debugging
history = expert_handoff_manager.get_handoff_history(limit=50)
# Registry summary for system overview
from lobster.config.agent_registry import get_handoff_registry_summary
summary = get_handoff_registry_summary()
# Example summary:
{
"total_patterns": 15,
"available_agents": 8,
"patterns_by_priority": {10: [...], 9: [...], 8: [...]},
"handoff_matrix": {
"transcriptomics_expert": {
"machine_learning_expert": True,
"proteomics_expert": True,
"research_agent": False
}
}
}- Handoff Overhead: <100ms for context passing
- Memory Usage: Minimal - only active contexts stored
- Scalability: Supports concurrent handoffs without interference
- Error Recovery: Automatic cleanup and rollback on failures
- Chain Protection: Maximum depth limit prevents infinite loops (default: 10)
All agents follow a consistent tool implementation pattern:
@tool
def analyze_data(modality_name: str, **params) -> str:
"""Standard agent tool pattern."""
try:
# 1. Validate modality exists
if modality_name not in data_manager.list_modalities():
raise ModalityNotFoundError(f"Modality '{modality_name}' not found")
# 2. Get data from modality system
adata = data_manager.get_modality(modality_name)
# 3. Call stateless service for processing
result_adata, statistics = service.analyze(adata, **params)
# 4. Store results with descriptive naming
new_modality = f"{modality_name}_analyzed"
data_manager.modalities[new_modality] = result_adata
# 5. Log operation for provenance
data_manager.log_tool_usage("analyze_data", params, statistics)
# 6. Return formatted response
return format_analysis_response(statistics, new_modality)
except ServiceError as e:
logger.error(f"Service error: {e}")
return f"Analysis failed: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error: {e}")
return f"Unexpected error: {str(e)}"The centralized registry makes adding new agents straightforward:
Before (Legacy System):
Adding agents required updating:
├── lobster/agents/graph.py # Import statements
├── lobster/agents/graph.py # Agent creation code
├── lobster/agents/graph.py # Handoff tool definitions
├── lobster/utils/callbacks.py # Agent name hardcoded list
└── Multiple imports throughout codebase
After (Registry System):
Adding agents only requires:
└── lobster/config/agent_registry.py # Single registry entry
Everything else is automatic:
├── ✅ Dynamic agent loading
├── ✅ Automatic handoff tool creation
├── ✅ Callback system integration
├── ✅ Type-safe configuration
└── ✅ Professional error handling
The registry provides utility functions for system integration:
# Get all worker agents with configurations
worker_agents = get_worker_agents()
# Returns: Dict[str, AgentRegistryConfig]
# Get all agent names (including system agents)
all_agents = get_all_agent_names()
# Returns: List[str]
# Dynamically import agent factory
factory = import_agent_factory('lobster.agents.data_expert.data_expert')
# Returns: CallableThe agent system implements comprehensive error handling:
- Agent-Level Errors - Tool failures, validation errors, service exceptions
- Communication Errors - Handoff failures, state corruption, timeout issues
- System-Level Errors - Registry failures, import errors, configuration issues
The callback system monitors agent activities:
# Agent activity tracking
callback.on_agent_start(agent_name, input_data)
callback.on_tool_start(tool_name, input_args)
callback.on_tool_end(tool_name, output)
callback.on_agent_end(agent_name, output)
# Error tracking
callback.on_agent_error(agent_name, error)
callback.on_tool_error(tool_name, error)- Lazy Loading - Agents are created only when needed
- Stateless Design - Agents don't maintain persistent state beyond DataManagerV2
- Resource Cleanup - Automatic cleanup of temporary resources
- Memory Efficiency - Shared data structures through DataManagerV2
- Independent Operations - Agents can process different modalities simultaneously
- Batch Processing - Support for bulk operations across multiple datasets
- Async Communication - Non-blocking agent interactions where possible
def test_agent_registry():
"""Test the agent registry functionality."""
# Verify all agents are registered
worker_agents = get_worker_agents()
assert len(worker_agents) > 0
# Validate factory function imports
for agent_name, config in worker_agents.items():
factory = import_agent_factory(config.factory_function)
assert callable(factory)
# Check agent name consistency
all_agents = get_all_agent_names()
assert 'data_expert_agent' in all_agents
assert 'transcriptomics_expert' in all_agents
assert 'proteomics_expert' in all_agents- End-to-End Workflows - Complete analysis pipelines
- Agent Communication - Handoff mechanism validation
- Error Recovery - Graceful handling of failures
- State Consistency - DataManagerV2 integration testing
This agent system architecture provides a robust, extensible, and maintainable foundation for complex bioinformatics workflows while maintaining clear separation of concerns and professional software engineering practices.