# Advanced Agent Patterns Advanced patterns for building sophisticated multi-agent systems, complex workflows, and enterprise-grade agent applications with Embabel. ## 🤖 Multi-Agent System Patterns ### **Agent-to-Agent Communication** Embabel agents can collaborate by invoking each other through the agent platform: ```kotlin @Agent(description = "Orchestrates complex business workflows") class WorkflowOrchestratorAgent( private val agentPlatform: AgentPlatform ) { @Action suspend fun processComplexOrder(order: Order, context: OperationContext): ProcessedOrder { // Step 1: Validate order using validation agent val validation = agentPlatform.run(OrderValidationAgent::class, order) if (!validation.isValid) { return ProcessedOrder.failed(validation.issues) } // Step 2: Check inventory using inventory agent val inventory = agentPlatform.run(InventoryAgent::class, order.items) // Step 3: Process payment using payment agent val payment = agentPlatform.run(PaymentAgent::class, PaymentRequest(order, inventory)) // Step 4: Create fulfillment plan val fulfillment = agentPlatform.run(FulfillmentAgent::class, FulfillmentRequest(order, inventory, payment)) return ProcessedOrder( orderId = order.id, validation = validation, payment = payment, fulfillment = fulfillment, status = OrderStatus.COMPLETED ) } } ``` ### **Parallel Agent Execution** Execute multiple agents concurrently for improved performance: **Kotlin Version (using coroutines):** ```kotlin @Agent(description = "Performs parallel analysis tasks") class ParallelAnalysisAgent( private val agentPlatform: AgentPlatform ) { @Action suspend fun comprehensiveAnalysis(data: AnalysisData): ComprehensiveReport { // Launch multiple analyses in parallel val tasks = listOf( async { agentPlatform.run(RiskAnalysisAgent::class, data) }, async { agentPlatform.run(MarketAnalysisAgent::class, data) }, async { agentPlatform.run(TechnicalAnalysisAgent::class, data) }, async { agentPlatform.run(CompetitorAnalysisAgent::class, data) } ) // Wait for all analyses to complete val results = tasks.awaitAll() return ComprehensiveReport( riskAnalysis = results[0] as RiskAnalysis, marketAnalysis = results[1] as MarketAnalysis, technicalAnalysis = results[2] as TechnicalAnalysis, competitorAnalysis = results[3] as CompetitorAnalysis ) } } ``` **Java Version (using CompletableFuture):** ```java @Agent(description = "Performs parallel analysis tasks using Java CompletableFuture") public class ParallelAnalysisAgent { private final AgentPlatform agentPlatform; private final ExecutorService executorService; public ParallelAnalysisAgent(AgentPlatform agentPlatform) { this.agentPlatform = agentPlatform; // Create a thread pool for parallel execution this.executorService = Executors.newCachedThreadPool(); } /** * Execute multiple analysis agents in parallel and combine results. * This is the Java equivalent of the Kotlin suspend function with async blocks. */ @Action public CompletableFuture comprehensiveAnalysis(AnalysisData data) { // Launch multiple analyses in parallel using CompletableFuture List> tasks = List.of( CompletableFuture.supplyAsync(() -> agentPlatform.run(RiskAnalysisAgent.class, data), executorService), CompletableFuture.supplyAsync(() -> agentPlatform.run(MarketAnalysisAgent.class, data), executorService), CompletableFuture.supplyAsync(() -> agentPlatform.run(TechnicalAnalysisAgent.class, data), executorService), CompletableFuture.supplyAsync(() -> agentPlatform.run(CompetitorAnalysisAgent.class, data), executorService) ); // Wait for all analyses to complete and combine results return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])) .thenApply(v -> { // Collect all results List results = tasks.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); // Cast and combine results into comprehensive report return new ComprehensiveReport( (RiskAnalysis) results.get(0), (MarketAnalysis) results.get(1), (TechnicalAnalysis) results.get(2), (CompetitorAnalysis) results.get(3) ); }); } /** * Alternative synchronous approach using parallel streams. * Simpler but blocks the calling thread until all tasks complete. */ @Action public ComprehensiveReport comprehensiveAnalysisSync(AnalysisData data) { // Execute analyses in parallel using parallel streams List results = List.of( RiskAnalysisAgent.class, MarketAnalysisAgent.class, TechnicalAnalysisAgent.class, CompetitorAnalysisAgent.class ) .parallelStream() .map(agentClass -> agentPlatform.run(agentClass, data)) .collect(Collectors.toList()); return new ComprehensiveReport( (RiskAnalysis) results.get(0), (MarketAnalysis) results.get(1), (TechnicalAnalysis) results.get(2), (CompetitorAnalysis) results.get(3) ); } } ``` ### **Agent Specialization Hierarchy** Create specialized agents that inherit common behavior: ```kotlin // Base agent with common functionality abstract class BaseAnalysisAgent { @Action fun validateData(data: Any): DataValidation { // Common validation logic return DataValidation.validate(data) } @Action fun formatReport(analysis: Any): FormattedReport { // Common formatting logic return FormattedReport.format(analysis) } } @Agent(description = "Specialized financial risk analysis") class FinancialRiskAgent : BaseAnalysisAgent() { @AchievesGoal("Complete financial risk assessment") @Action fun analyzeFinancialRisk(data: FinancialData, context: OperationContext): RiskAnalysis { val validation = validateData(data) val analysis = context.ai().withDefaultLlm() .createObject(""" Analyze financial risk for the following data: - Revenue trends: ${data.revenueData} - Market conditions: ${data.marketConditions} - Regulatory environment: ${data.regulatoryFactors} Provide detailed risk assessment with probability scores. """.trimIndent()) return formatReport(analysis) as RiskAnalysis } } @Agent(description = "Specialized market opportunity analysis") class MarketOpportunityAgent : BaseAnalysisAgent() { @AchievesGoal("Identify market opportunities") @Action fun identifyOpportunities(data: MarketData, context: OperationContext): OpportunityAnalysis { val validation = validateData(data) val analysis = context.ai().withDefaultLlm() .createObject(""" Identify market opportunities from: - Market size: ${data.marketSize} - Competition: ${data.competitorLandscape} - Trends: ${data.emergingTrends} Rank opportunities by potential ROI and feasibility. """.trimIndent()) return formatReport(analysis) as OpportunityAnalysis } } ``` ## 🔄 Advanced Workflow Patterns ### **Dynamic Workflow Adaptation** Agents that adapt their workflow based on intermediate results: ```kotlin @Agent(description = "Adaptive research agent that modifies approach based on findings") class AdaptiveResearchAgent { @Action fun initialResearch(topic: String, context: OperationContext): ResearchFindings { return context.ai().withDefaultLlm() .createObject("Conduct initial research on: $topic") } @Action @ConditionalOnCondition("requiresDeepDive") fun deepDiveResearch( topic: String, initialFindings: ResearchFindings, context: OperationContext ): DetailedResearch { return context.ai() .withLlm(LlmOptions.withModel(OpenAiModels.GPT_41)) // Use powerful model .createObject(""" Based on initial findings: ${initialFindings.summary} Conduct deep dive research on: $topic Focus on areas that need more investigation. """.trimIndent()) } @Action @ConditionalOnCondition("requiresExpertValidation") fun expertValidation( research: DetailedResearch, context: OperationContext ): ValidatedResearch { return context.ai() .withLlm(LlmOptions.withModel(AnthropicModels.CLAUDE_3_OPUS)) .createObject(""" Review this research as a subject matter expert: ${research.content} Validate accuracy and provide expert commentary. """.trimIndent()) } @AchievesGoal("Complete adaptive research with validation") @Action fun finalizeResearch( topic: String, initialFindings: ResearchFindings, deepDive: DetailedResearch?, validation: ValidatedResearch?, context: OperationContext ): ComprehensiveResearch { val components = listOfNotNull(initialFindings, deepDive, validation) return context.ai().withDefaultLlm() .createObject(""" Create comprehensive research report on: $topic Integrate findings from: ${components.joinToString("\n") { "- ${it::class.simpleName}: ${it.summary}" }} Provide executive summary and actionable insights. """.trimIndent()) } @Condition("Check if topic requires deep investigation") fun requiresDeepDive(initialFindings: ResearchFindings): Boolean { return initialFindings.complexityScore > 0.7 || initialFindings.contradictoryInformation } @Condition("Check if research requires expert validation") fun requiresExpertValidation(research: DetailedResearch): Boolean { return research.confidenceScore < 0.8 || research.containsControversialClaims } } ``` ### **Saga Pattern for Complex Transactions** Implement distributed transaction patterns with compensation: ```kotlin @Agent(description = "Manages complex business transactions with rollback capability") class SagaOrchestratorAgent( private val agentPlatform: AgentPlatform ) { data class SagaContext( val transactionId: String, val steps: MutableList = mutableListOf(), val compensations: MutableList = mutableListOf() ) @AchievesGoal("Execute complex business transaction with rollback capability") @Action suspend fun executeBusinessTransaction( request: BusinessTransactionRequest, context: OperationContext ): TransactionResult { val sagaContext = SagaContext(transactionId = UUID.randomUUID().toString()) try { // Step 1: Reserve inventory val inventory = reserveInventory(request, sagaContext) // Step 2: Process payment val payment = processPayment(request, sagaContext) // Step 3: Create fulfillment order val fulfillment = createFulfillmentOrder(request, sagaContext) // Step 4: Send confirmation val confirmation = sendConfirmation(request, sagaContext) return TransactionResult.success(sagaContext.transactionId) } catch (e: Exception) { // Compensate all completed steps in reverse order compensateTransaction(sagaContext) return TransactionResult.failed(e.message, sagaContext.transactionId) } } private suspend fun reserveInventory( request: BusinessTransactionRequest, sagaContext: SagaContext ): InventoryReservation { val result = agentPlatform.run(InventoryAgent::class, request.items) sagaContext.steps.add(SagaStep("INVENTORY_RESERVED", result)) sagaContext.compensations.add(CompensationAction("RELEASE_INVENTORY", request.items)) return result } private suspend fun processPayment( request: BusinessTransactionRequest, sagaContext: SagaContext ): PaymentResult { val result = agentPlatform.run(PaymentAgent::class, request.paymentInfo) sagaContext.steps.add(SagaStep("PAYMENT_PROCESSED", result)) sagaContext.compensations.add(CompensationAction("REFUND_PAYMENT", result.transactionId)) return result } private suspend fun compensateTransaction(sagaContext: SagaContext) { // Execute compensations in reverse order sagaContext.compensations.reversed().forEach { compensation -> try { when (compensation.type) { "RELEASE_INVENTORY" -> agentPlatform.run(InventoryAgent::class, ReleaseInventoryRequest(compensation.data)) "REFUND_PAYMENT" -> agentPlatform.run(PaymentAgent::class, RefundRequest(compensation.data as String)) // Add more compensation actions as needed } } catch (e: Exception) { // Log compensation failure but continue with other compensations logger.error("Failed to compensate action ${compensation.type}", e) } } } } ``` ### **Event-Driven Agent Coordination** Agents that react to domain events: ```kotlin @Component class AgentEventCoordinator( private val agentPlatform: AgentPlatform, private val applicationEventPublisher: ApplicationEventPublisher ) { @EventListener suspend fun handleOrderCreated(event: OrderCreatedEvent) { // Trigger multiple agents in response to order creation launch { val validation = agentPlatform.run(OrderValidationAgent::class, event.order) applicationEventPublisher.publishEvent(OrderValidatedEvent(event.order, validation)) } launch { val inventory = agentPlatform.run(InventoryCheckAgent::class, event.order.items) applicationEventPublisher.publishEvent(InventoryCheckedEvent(event.order, inventory)) } launch { val fraud = agentPlatform.run(FraudDetectionAgent::class, event.order) applicationEventPublisher.publishEvent(FraudCheckCompletedEvent(event.order, fraud)) } } @EventListener suspend fun handleOrderValidated(event: OrderValidatedEvent) { if (event.validation.isValid) { agentPlatform.run(PaymentProcessingAgent::class, event.order) } else { agentPlatform.run(OrderRejectionAgent::class, OrderRejection(event.order, event.validation.issues)) } } } // Domain events data class OrderCreatedEvent(val order: Order) : ApplicationEvent(order) data class OrderValidatedEvent(val order: Order, val validation: OrderValidation) : ApplicationEvent(order) data class InventoryCheckedEvent(val order: Order, val inventory: InventoryCheck) : ApplicationEvent(order) ``` ## 🔧 Enterprise Integration Patterns ### **Agent as Microservice** Expose agents as REST APIs for microservice architectures: ```kotlin @RestController @RequestMapping("/api/agents") class AgentController( private val agentPlatform: AgentPlatform ) { @PostMapping("/analysis/financial") suspend fun financialAnalysis(@RequestBody request: FinancialAnalysisRequest): ResponseEntity { return try { val result = agentPlatform.run(FinancialAnalysisAgent::class, request) ResponseEntity.ok(result) } catch (e: Exception) { ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body(FinancialAnalysis.error(e.message)) } } @PostMapping("/research") suspend fun research(@RequestBody request: ResearchRequest): ResponseEntity { val result = agentPlatform.run(ResearchAgent::class, request) return ResponseEntity.ok(result) } @PostMapping("/orders/process") suspend fun processOrder(@RequestBody order: Order): ResponseEntity { val result = agentPlatform.run(OrderProcessingAgent::class, order) return ResponseEntity.ok(result) } } ``` ### **Message Queue Integration** Process agent tasks from message queues: ```kotlin @Component class AgentMessageProcessor( private val agentPlatform: AgentPlatform ) { @RabbitListener(queues = ["order.processing"]) suspend fun processOrderMessage(order: Order) { try { val result = agentPlatform.run(OrderProcessingAgent::class, order) // Send result to completion queue rabbitTemplate.convertAndSend("order.completed", result) } catch (e: Exception) { // Send to dead letter queue for manual handling rabbitTemplate.convertAndSend("order.failed", FailedOrderProcessing(order, e.message)) } } @KafkaListener(topics = ["analysis.requests"]) suspend fun processAnalysisRequest(request: AnalysisRequest) { val result = when (request.type) { AnalysisType.FINANCIAL -> agentPlatform.run(FinancialAnalysisAgent::class, request) AnalysisType.MARKET -> agentPlatform.run(MarketAnalysisAgent::class, request) AnalysisType.RISK -> agentPlatform.run(RiskAnalysisAgent::class, request) } kafkaTemplate.send("analysis.results", result) } } ``` ### **Database Integration Patterns** Agents that work with persistent data: ```kotlin @Agent(description = "Customer data analysis with persistence") class CustomerAnalysisAgent( private val customerRepository: CustomerRepository, private val analysisRepository: AnalysisRepository ) { @Action fun loadCustomerData(customerId: String): CustomerData { val customer = customerRepository.findById(customerId) ?: throw CustomerNotFoundException(customerId) return CustomerData( customer = customer, orderHistory = customerRepository.findOrderHistory(customerId), preferences = customerRepository.findPreferences(customerId) ) } @Action fun analyzeCustomerBehavior( customerData: CustomerData, context: OperationContext ): CustomerAnalysis { return context.ai().withDefaultLlm() .createObject(""" Analyze customer behavior patterns: Customer Profile: ${customerData.customer} Order History: ${customerData.orderHistory.takeLast(10)} Preferences: ${customerData.preferences} Provide insights on: - Purchase patterns - Seasonal trends - Churn risk - Upsell opportunities """.trimIndent()) } @AchievesGoal("Complete customer analysis with persistent results") @Action fun finalizeAnalysis( customerId: String, analysis: CustomerAnalysis ): PersistedAnalysis { // Save analysis results to database val savedAnalysis = analysisRepository.save( CustomerAnalysisRecord( customerId = customerId, analysis = analysis, timestamp = LocalDateTime.now(), version = 1 ) ) return PersistedAnalysis(savedAnalysis.id, analysis) } } ``` ## 🔐 Security and Compliance Patterns ### **Role-Based Agent Access** ```kotlin @Agent(description = "Financial reporting with role-based access control") @PreAuthorize("hasRole('FINANCIAL_ANALYST')") class SecureFinancialAgent { @Action @PreAuthorize("hasPermission(#companyId, 'Company', 'READ')") fun generateFinancialReport( companyId: String, reportType: ReportType, context: OperationContext ): FinancialReport { // Only accessible to users with proper permissions val securityContext = SecurityContextHolder.getContext() val username = securityContext.authentication.name auditLogger.info("Financial report requested by $username for company $companyId") return context.ai().withDefaultLlm() .createObject("Generate $reportType report for company $companyId") } } ``` ### **Audit Trail Integration** ```kotlin @Component class AgentAuditInterceptor : AgentExecutionInterceptor { override fun beforeExecution(agent: Any, action: Method, args: Array) { val auditRecord = AuditRecord( agentClass = agent::class.simpleName!!, actionName = action.name, timestamp = LocalDateTime.now(), user = getCurrentUser(), inputHash = hashInputs(args) ) auditRepository.save(auditRecord) } override fun afterExecution(agent: Any, action: Method, result: Any?, exception: Exception?) { val auditRecord = auditRepository.findLatestForUserAndAction( getCurrentUser(), action.name ) auditRecord?.let { it.completed = LocalDateTime.now() it.success = exception == null it.resultType = result?.let { it::class.simpleName } auditRepository.save(it) } } } ``` ## 🚀 Performance Optimization Patterns ### **Caching Strategy** ```kotlin @Agent(description = "Research agent with intelligent caching") class CachedResearchAgent( private val cacheManager: CacheManager ) { @Action @Cacheable(value = ["research-results"], key = "#topic + #depth") fun conductResearch( topic: String, depth: ResearchDepth, context: OperationContext ): ResearchResult { // Expensive research operation - results cached return context.ai().withDefaultLlm() .createObject("Research $topic with depth $depth") } @Action fun getUpdatedResearch( topic: String, lastUpdated: LocalDateTime, context: OperationContext ): ResearchResult { val cached = getCachedResult(topic) if (cached != null && cached.timestamp.isAfter(lastUpdated)) { return cached } // Cache miss or stale - conduct fresh research return conductResearch(topic, ResearchDepth.COMPREHENSIVE, context) } @CacheEvict(value = ["research-results"], key = "#topic + '*'", allEntries = false) fun invalidateResearch(topic: String) { // Evict all cached results for this topic } } ``` ### **Model Selection Strategy** ```kotlin @Agent(description = "Intelligent model selection based on task complexity") class SmartModelAgent { @Action fun processWithOptimalModel( task: ProcessingTask, context: OperationContext ): ProcessingResult { val modelChoice = when { task.complexity < 0.3 -> "llama3.2:1b" // Fast local model task.complexity < 0.7 -> "llama3.2" // Standard local model task.requiresReasoning -> OpenAiModels.GPT_41 // Cloud reasoning model task.requiresCreativity -> AnthropicModels.CLAUDE_3_OPUS // Creative model else -> "auto" // Let framework choose } return context.ai() .withLlm(LlmOptions.withModel(modelChoice)) .createObject("Process task: ${task.description}") } } ``` ## 📊 Monitoring and Observability ### **Metrics Collection** ```kotlin @Component class AgentMetricsCollector( private val meterRegistry: MeterRegistry ) { private val executionTimer = Timer.builder("agent.execution.time") .register(meterRegistry) private val successCounter = Counter.builder("agent.execution.success") .register(meterRegistry) private val errorCounter = Counter.builder("agent.execution.error") .register(meterRegistry) @EventListener fun handleAgentExecution(event: AgentExecutionEvent) { val sample = Timer.start(meterRegistry) try { // Agent execution happens here sample.stop(executionTimer) successCounter.increment( Tags.of( "agent", event.agentName, "action", event.actionName ) ) } catch (e: Exception) { sample.stop(executionTimer) errorCounter.increment( Tags.of( "agent", event.agentName, "action", event.actionName, "error", e::class.simpleName!! ) ) throw e } } } ``` These advanced patterns enable building production-ready, scalable agent systems that integrate seamlessly with existing enterprise architectures while maintaining the flexibility and power of the Embabel framework.