-
Notifications
You must be signed in to change notification settings - Fork 245
Advanced Agent Patterns
Advanced patterns for building sophisticated multi-agent systems, complex workflows, and enterprise-grade agent applications with Embabel.
Embabel agents can collaborate by invoking each other through the agent platform:
@Agent(description = "Orchestrates complex business workflows")
class WorkflowOrchestratorAgent(
private val agentPlatform: AgentPlatform
) {
@Action
suspend fun processComplexOrder(order: Order, context: OperationContext): ProcessedOrder {
// Step 1: Validate order using validation agent
val validation = agentPlatform.run(OrderValidationAgent::class, order)
if (!validation.isValid) {
return ProcessedOrder.failed(validation.issues)
}
// Step 2: Check inventory using inventory agent
val inventory = agentPlatform.run(InventoryAgent::class, order.items)
// Step 3: Process payment using payment agent
val payment = agentPlatform.run(PaymentAgent::class, PaymentRequest(order, inventory))
// Step 4: Create fulfillment plan
val fulfillment = agentPlatform.run(FulfillmentAgent::class,
FulfillmentRequest(order, inventory, payment))
return ProcessedOrder(
orderId = order.id,
validation = validation,
payment = payment,
fulfillment = fulfillment,
status = OrderStatus.COMPLETED
)
}
}Execute multiple agents concurrently for improved performance:
Kotlin Version (using coroutines):
@Agent(description = "Performs parallel analysis tasks")
class ParallelAnalysisAgent(
private val agentPlatform: AgentPlatform
) {
@Action
suspend fun comprehensiveAnalysis(data: AnalysisData): ComprehensiveReport {
// Launch multiple analyses in parallel
val tasks = listOf(
async { agentPlatform.run(RiskAnalysisAgent::class, data) },
async { agentPlatform.run(MarketAnalysisAgent::class, data) },
async { agentPlatform.run(TechnicalAnalysisAgent::class, data) },
async { agentPlatform.run(CompetitorAnalysisAgent::class, data) }
)
// Wait for all analyses to complete
val results = tasks.awaitAll()
return ComprehensiveReport(
riskAnalysis = results[0] as RiskAnalysis,
marketAnalysis = results[1] as MarketAnalysis,
technicalAnalysis = results[2] as TechnicalAnalysis,
competitorAnalysis = results[3] as CompetitorAnalysis
)
}
}Java Version (using CompletableFuture):
@Agent(description = "Performs parallel analysis tasks using Java CompletableFuture")
public class ParallelAnalysisAgent {
private final AgentPlatform agentPlatform;
private final ExecutorService executorService;
public ParallelAnalysisAgent(AgentPlatform agentPlatform) {
this.agentPlatform = agentPlatform;
// Create a thread pool for parallel execution
this.executorService = Executors.newCachedThreadPool();
}
/**
* Execute multiple analysis agents in parallel and combine results.
* This is the Java equivalent of the Kotlin suspend function with async blocks.
*/
@Action
public CompletableFuture<ComprehensiveReport> comprehensiveAnalysis(AnalysisData data) {
// Launch multiple analyses in parallel using CompletableFuture
List<CompletableFuture<Object>> tasks = List.of(
CompletableFuture.supplyAsync(() ->
agentPlatform.run(RiskAnalysisAgent.class, data), executorService),
CompletableFuture.supplyAsync(() ->
agentPlatform.run(MarketAnalysisAgent.class, data), executorService),
CompletableFuture.supplyAsync(() ->
agentPlatform.run(TechnicalAnalysisAgent.class, data), executorService),
CompletableFuture.supplyAsync(() ->
agentPlatform.run(CompetitorAnalysisAgent.class, data), executorService)
);
// Wait for all analyses to complete and combine results
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))
.thenApply(v -> {
// Collect all results
List<Object> results = tasks.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// Cast and combine results into comprehensive report
return new ComprehensiveReport(
(RiskAnalysis) results.get(0),
(MarketAnalysis) results.get(1),
(TechnicalAnalysis) results.get(2),
(CompetitorAnalysis) results.get(3)
);
});
}
/**
* Alternative synchronous approach using parallel streams.
* Simpler but blocks the calling thread until all tasks complete.
*/
@Action
public ComprehensiveReport comprehensiveAnalysisSync(AnalysisData data) {
// Execute analyses in parallel using parallel streams
List<Object> results = List.of(
RiskAnalysisAgent.class,
MarketAnalysisAgent.class,
TechnicalAnalysisAgent.class,
CompetitorAnalysisAgent.class
)
.parallelStream()
.map(agentClass -> agentPlatform.run(agentClass, data))
.collect(Collectors.toList());
return new ComprehensiveReport(
(RiskAnalysis) results.get(0),
(MarketAnalysis) results.get(1),
(TechnicalAnalysis) results.get(2),
(CompetitorAnalysis) results.get(3)
);
}
}Create specialized agents that inherit common behavior:
// Base agent with common functionality
abstract class BaseAnalysisAgent {
@Action
fun validateData(data: Any): DataValidation {
// Common validation logic
return DataValidation.validate(data)
}
@Action
fun formatReport(analysis: Any): FormattedReport {
// Common formatting logic
return FormattedReport.format(analysis)
}
}
@Agent(description = "Specialized financial risk analysis")
class FinancialRiskAgent : BaseAnalysisAgent() {
@AchievesGoal("Complete financial risk assessment")
@Action
fun analyzeFinancialRisk(data: FinancialData, context: OperationContext): RiskAnalysis {
val validation = validateData(data)
val analysis = context.ai().withDefaultLlm()
.createObject("""
Analyze financial risk for the following data:
- Revenue trends: ${data.revenueData}
- Market conditions: ${data.marketConditions}
- Regulatory environment: ${data.regulatoryFactors}
Provide detailed risk assessment with probability scores.
""".trimIndent())
return formatReport(analysis) as RiskAnalysis
}
}
@Agent(description = "Specialized market opportunity analysis")
class MarketOpportunityAgent : BaseAnalysisAgent() {
@AchievesGoal("Identify market opportunities")
@Action
fun identifyOpportunities(data: MarketData, context: OperationContext): OpportunityAnalysis {
val validation = validateData(data)
val analysis = context.ai().withDefaultLlm()
.createObject("""
Identify market opportunities from:
- Market size: ${data.marketSize}
- Competition: ${data.competitorLandscape}
- Trends: ${data.emergingTrends}
Rank opportunities by potential ROI and feasibility.
""".trimIndent())
return formatReport(analysis) as OpportunityAnalysis
}
}Agents that adapt their workflow based on intermediate results:
@Agent(description = "Adaptive research agent that modifies approach based on findings")
class AdaptiveResearchAgent {
@Action
fun initialResearch(topic: String, context: OperationContext): ResearchFindings {
return context.ai().withDefaultLlm()
.createObject("Conduct initial research on: $topic")
}
@Action
@ConditionalOnCondition("requiresDeepDive")
fun deepDiveResearch(
topic: String,
initialFindings: ResearchFindings,
context: OperationContext
): DetailedResearch {
return context.ai()
.withLlm(LlmOptions.withModel(OpenAiModels.GPT_41)) // Use powerful model
.createObject("""
Based on initial findings: ${initialFindings.summary}
Conduct deep dive research on: $topic
Focus on areas that need more investigation.
""".trimIndent())
}
@Action
@ConditionalOnCondition("requiresExpertValidation")
fun expertValidation(
research: DetailedResearch,
context: OperationContext
): ValidatedResearch {
return context.ai()
.withLlm(LlmOptions.withModel(AnthropicModels.CLAUDE_3_OPUS))
.createObject("""
Review this research as a subject matter expert:
${research.content}
Validate accuracy and provide expert commentary.
""".trimIndent())
}
@AchievesGoal("Complete adaptive research with validation")
@Action
fun finalizeResearch(
topic: String,
initialFindings: ResearchFindings,
deepDive: DetailedResearch?,
validation: ValidatedResearch?,
context: OperationContext
): ComprehensiveResearch {
val components = listOfNotNull(initialFindings, deepDive, validation)
return context.ai().withDefaultLlm()
.createObject("""
Create comprehensive research report on: $topic
Integrate findings from:
${components.joinToString("\n") { "- ${it::class.simpleName}: ${it.summary}" }}
Provide executive summary and actionable insights.
""".trimIndent())
}
@Condition("Check if topic requires deep investigation")
fun requiresDeepDive(initialFindings: ResearchFindings): Boolean {
return initialFindings.complexityScore > 0.7 || initialFindings.contradictoryInformation
}
@Condition("Check if research requires expert validation")
fun requiresExpertValidation(research: DetailedResearch): Boolean {
return research.confidenceScore < 0.8 || research.containsControversialClaims
}
}Implement distributed transaction patterns with compensation:
@Agent(description = "Manages complex business transactions with rollback capability")
class SagaOrchestratorAgent(
private val agentPlatform: AgentPlatform
) {
data class SagaContext(
val transactionId: String,
val steps: MutableList<SagaStep> = mutableListOf(),
val compensations: MutableList<CompensationAction> = mutableListOf()
)
@AchievesGoal("Execute complex business transaction with rollback capability")
@Action
suspend fun executeBusinessTransaction(
request: BusinessTransactionRequest,
context: OperationContext
): TransactionResult {
val sagaContext = SagaContext(transactionId = UUID.randomUUID().toString())
try {
// Step 1: Reserve inventory
val inventory = reserveInventory(request, sagaContext)
// Step 2: Process payment
val payment = processPayment(request, sagaContext)
// Step 3: Create fulfillment order
val fulfillment = createFulfillmentOrder(request, sagaContext)
// Step 4: Send confirmation
val confirmation = sendConfirmation(request, sagaContext)
return TransactionResult.success(sagaContext.transactionId)
} catch (e: Exception) {
// Compensate all completed steps in reverse order
compensateTransaction(sagaContext)
return TransactionResult.failed(e.message, sagaContext.transactionId)
}
}
private suspend fun reserveInventory(
request: BusinessTransactionRequest,
sagaContext: SagaContext
): InventoryReservation {
val result = agentPlatform.run(InventoryAgent::class, request.items)
sagaContext.steps.add(SagaStep("INVENTORY_RESERVED", result))
sagaContext.compensations.add(CompensationAction("RELEASE_INVENTORY", request.items))
return result
}
private suspend fun processPayment(
request: BusinessTransactionRequest,
sagaContext: SagaContext
): PaymentResult {
val result = agentPlatform.run(PaymentAgent::class, request.paymentInfo)
sagaContext.steps.add(SagaStep("PAYMENT_PROCESSED", result))
sagaContext.compensations.add(CompensationAction("REFUND_PAYMENT", result.transactionId))
return result
}
private suspend fun compensateTransaction(sagaContext: SagaContext) {
// Execute compensations in reverse order
sagaContext.compensations.reversed().forEach { compensation ->
try {
when (compensation.type) {
"RELEASE_INVENTORY" -> agentPlatform.run(InventoryAgent::class,
ReleaseInventoryRequest(compensation.data))
"REFUND_PAYMENT" -> agentPlatform.run(PaymentAgent::class,
RefundRequest(compensation.data as String))
// Add more compensation actions as needed
}
} catch (e: Exception) {
// Log compensation failure but continue with other compensations
logger.error("Failed to compensate action ${compensation.type}", e)
}
}
}
}Agents that react to domain events:
@Component
class AgentEventCoordinator(
private val agentPlatform: AgentPlatform,
private val applicationEventPublisher: ApplicationEventPublisher
) {
@EventListener
suspend fun handleOrderCreated(event: OrderCreatedEvent) {
// Trigger multiple agents in response to order creation
launch {
val validation = agentPlatform.run(OrderValidationAgent::class, event.order)
applicationEventPublisher.publishEvent(OrderValidatedEvent(event.order, validation))
}
launch {
val inventory = agentPlatform.run(InventoryCheckAgent::class, event.order.items)
applicationEventPublisher.publishEvent(InventoryCheckedEvent(event.order, inventory))
}
launch {
val fraud = agentPlatform.run(FraudDetectionAgent::class, event.order)
applicationEventPublisher.publishEvent(FraudCheckCompletedEvent(event.order, fraud))
}
}
@EventListener
suspend fun handleOrderValidated(event: OrderValidatedEvent) {
if (event.validation.isValid) {
agentPlatform.run(PaymentProcessingAgent::class, event.order)
} else {
agentPlatform.run(OrderRejectionAgent::class,
OrderRejection(event.order, event.validation.issues))
}
}
}
// Domain events
data class OrderCreatedEvent(val order: Order) : ApplicationEvent(order)
data class OrderValidatedEvent(val order: Order, val validation: OrderValidation) : ApplicationEvent(order)
data class InventoryCheckedEvent(val order: Order, val inventory: InventoryCheck) : ApplicationEvent(order)Expose agents as REST APIs for microservice architectures:
@RestController
@RequestMapping("/api/agents")
class AgentController(
private val agentPlatform: AgentPlatform
) {
@PostMapping("/analysis/financial")
suspend fun financialAnalysis(@RequestBody request: FinancialAnalysisRequest): ResponseEntity<FinancialAnalysis> {
return try {
val result = agentPlatform.run(FinancialAnalysisAgent::class, request)
ResponseEntity.ok(result)
} catch (e: Exception) {
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(FinancialAnalysis.error(e.message))
}
}
@PostMapping("/research")
suspend fun research(@RequestBody request: ResearchRequest): ResponseEntity<ResearchResult> {
val result = agentPlatform.run(ResearchAgent::class, request)
return ResponseEntity.ok(result)
}
@PostMapping("/orders/process")
suspend fun processOrder(@RequestBody order: Order): ResponseEntity<ProcessedOrder> {
val result = agentPlatform.run(OrderProcessingAgent::class, order)
return ResponseEntity.ok(result)
}
}Process agent tasks from message queues:
@Component
class AgentMessageProcessor(
private val agentPlatform: AgentPlatform
) {
@RabbitListener(queues = ["order.processing"])
suspend fun processOrderMessage(order: Order) {
try {
val result = agentPlatform.run(OrderProcessingAgent::class, order)
// Send result to completion queue
rabbitTemplate.convertAndSend("order.completed", result)
} catch (e: Exception) {
// Send to dead letter queue for manual handling
rabbitTemplate.convertAndSend("order.failed",
FailedOrderProcessing(order, e.message))
}
}
@KafkaListener(topics = ["analysis.requests"])
suspend fun processAnalysisRequest(request: AnalysisRequest) {
val result = when (request.type) {
AnalysisType.FINANCIAL -> agentPlatform.run(FinancialAnalysisAgent::class, request)
AnalysisType.MARKET -> agentPlatform.run(MarketAnalysisAgent::class, request)
AnalysisType.RISK -> agentPlatform.run(RiskAnalysisAgent::class, request)
}
kafkaTemplate.send("analysis.results", result)
}
}Agents that work with persistent data:
@Agent(description = "Customer data analysis with persistence")
class CustomerAnalysisAgent(
private val customerRepository: CustomerRepository,
private val analysisRepository: AnalysisRepository
) {
@Action
fun loadCustomerData(customerId: String): CustomerData {
val customer = customerRepository.findById(customerId)
?: throw CustomerNotFoundException(customerId)
return CustomerData(
customer = customer,
orderHistory = customerRepository.findOrderHistory(customerId),
preferences = customerRepository.findPreferences(customerId)
)
}
@Action
fun analyzeCustomerBehavior(
customerData: CustomerData,
context: OperationContext
): CustomerAnalysis {
return context.ai().withDefaultLlm()
.createObject("""
Analyze customer behavior patterns:
Customer Profile: ${customerData.customer}
Order History: ${customerData.orderHistory.takeLast(10)}
Preferences: ${customerData.preferences}
Provide insights on:
- Purchase patterns
- Seasonal trends
- Churn risk
- Upsell opportunities
""".trimIndent())
}
@AchievesGoal("Complete customer analysis with persistent results")
@Action
fun finalizeAnalysis(
customerId: String,
analysis: CustomerAnalysis
): PersistedAnalysis {
// Save analysis results to database
val savedAnalysis = analysisRepository.save(
CustomerAnalysisRecord(
customerId = customerId,
analysis = analysis,
timestamp = LocalDateTime.now(),
version = 1
)
)
return PersistedAnalysis(savedAnalysis.id, analysis)
}
}@Agent(description = "Financial reporting with role-based access control")
@PreAuthorize("hasRole('FINANCIAL_ANALYST')")
class SecureFinancialAgent {
@Action
@PreAuthorize("hasPermission(#companyId, 'Company', 'READ')")
fun generateFinancialReport(
companyId: String,
reportType: ReportType,
context: OperationContext
): FinancialReport {
// Only accessible to users with proper permissions
val securityContext = SecurityContextHolder.getContext()
val username = securityContext.authentication.name
auditLogger.info("Financial report requested by $username for company $companyId")
return context.ai().withDefaultLlm()
.createObject("Generate $reportType report for company $companyId")
}
}@Component
class AgentAuditInterceptor : AgentExecutionInterceptor {
override fun beforeExecution(agent: Any, action: Method, args: Array<Any?>) {
val auditRecord = AuditRecord(
agentClass = agent::class.simpleName!!,
actionName = action.name,
timestamp = LocalDateTime.now(),
user = getCurrentUser(),
inputHash = hashInputs(args)
)
auditRepository.save(auditRecord)
}
override fun afterExecution(agent: Any, action: Method, result: Any?, exception: Exception?) {
val auditRecord = auditRepository.findLatestForUserAndAction(
getCurrentUser(),
action.name
)
auditRecord?.let {
it.completed = LocalDateTime.now()
it.success = exception == null
it.resultType = result?.let { it::class.simpleName }
auditRepository.save(it)
}
}
}@Agent(description = "Research agent with intelligent caching")
class CachedResearchAgent(
private val cacheManager: CacheManager
) {
@Action
@Cacheable(value = ["research-results"], key = "#topic + #depth")
fun conductResearch(
topic: String,
depth: ResearchDepth,
context: OperationContext
): ResearchResult {
// Expensive research operation - results cached
return context.ai().withDefaultLlm()
.createObject("Research $topic with depth $depth")
}
@Action
fun getUpdatedResearch(
topic: String,
lastUpdated: LocalDateTime,
context: OperationContext
): ResearchResult {
val cached = getCachedResult(topic)
if (cached != null && cached.timestamp.isAfter(lastUpdated)) {
return cached
}
// Cache miss or stale - conduct fresh research
return conductResearch(topic, ResearchDepth.COMPREHENSIVE, context)
}
@CacheEvict(value = ["research-results"], key = "#topic + '*'", allEntries = false)
fun invalidateResearch(topic: String) {
// Evict all cached results for this topic
}
}@Agent(description = "Intelligent model selection based on task complexity")
class SmartModelAgent {
@Action
fun processWithOptimalModel(
task: ProcessingTask,
context: OperationContext
): ProcessingResult {
val modelChoice = when {
task.complexity < 0.3 -> "llama3.2:1b" // Fast local model
task.complexity < 0.7 -> "llama3.2" // Standard local model
task.requiresReasoning -> OpenAiModels.GPT_41 // Cloud reasoning model
task.requiresCreativity -> AnthropicModels.CLAUDE_3_OPUS // Creative model
else -> "auto" // Let framework choose
}
return context.ai()
.withLlm(LlmOptions.withModel(modelChoice))
.createObject("Process task: ${task.description}")
}
}@Component
class AgentMetricsCollector(
private val meterRegistry: MeterRegistry
) {
private val executionTimer = Timer.builder("agent.execution.time")
.register(meterRegistry)
private val successCounter = Counter.builder("agent.execution.success")
.register(meterRegistry)
private val errorCounter = Counter.builder("agent.execution.error")
.register(meterRegistry)
@EventListener
fun handleAgentExecution(event: AgentExecutionEvent) {
val sample = Timer.start(meterRegistry)
try {
// Agent execution happens here
sample.stop(executionTimer)
successCounter.increment(
Tags.of(
"agent", event.agentName,
"action", event.actionName
)
)
} catch (e: Exception) {
sample.stop(executionTimer)
errorCounter.increment(
Tags.of(
"agent", event.agentName,
"action", event.actionName,
"error", e::class.simpleName!!
)
)
throw e
}
}
}These advanced patterns enable building production-ready, scalable agent systems that integrate seamlessly with existing enterprise architectures while maintaining the flexibility and power of the Embabel framework.
(c) Embabel Software Inc 2024-2025.