Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

README.md

BatchNode Example

This example demonstrates how to build a BatchNode component that processes multiple inputs through a Graph or Workflow with configurable concurrency and interrupt/resume support.

Overview

BatchNode is a reusable component that:

  • Accepts []I (slice of inputs) and returns []O (slice of outputs)
  • Runs a Graph or Workflow for each input item
  • Supports configurable concurrency (sequential or parallel)
  • Handles errors and interrupts from individual tasks
  • Integrates with Eino's callback and checkpoint systems

Business Scenario

Document Review Pipeline: A compliance team needs to review multiple documents. Each document goes through an automated review workflow, with high-priority documents requiring human approval before completion.

Quick Start

cd compose/batch
go run .

Project Structure

compose/batch/
├── batch/
│   ├── types.go    # Type definitions (NodeConfig, NodeInterruptState, etc.)
│   ├── options.go  # Batch invocation options (WithInnerOptions)
│   ├── store.go    # Internal checkpoint store for sub-tasks
│   └── node.go     # Core BatchNode implementation
├── main.go         # Example scenarios
└── README.md

Key Concepts

1. Creating a BatchNode

batchNode := batch.NewBatchNode(&batch.NodeConfig[ReviewRequest, ReviewResult]{
    Name:           "DocumentReviewer",
    InnerTask:      workflow,        // Graph or Workflow
    MaxConcurrency: 3,               // 0=sequential, >0=parallel limit
    InnerCompileOptions: []compose.GraphCompileOption{
        compose.WithGraphName("SingleDocReview"),
    },
})

2. Concurrency Control

MaxConcurrency Behavior
0 Sequential: process one task at a time
>0 Concurrent: up to N parallel tasks (first task runs on main goroutine)

3. Options

Compile-time options (in NodeConfig.InnerCompileOptions):

  • Applied when compiling the inner Graph/Workflow
  • Example: compose.WithGraphName("...")

Request-time options (via batch.WithInnerOptions):

  • Applied to each inner task invocation
  • Example: compose.WithCallbacks(handler)
results, err := batchNode.Invoke(ctx, inputs,
    batch.WithInnerOptions(
        compose.WithCallbacks(progressHandler),
    ),
)

4. Error Handling

  • Normal errors: BatchNode returns the first error encountered
  • Interrupt errors: Collected and bundled via compose.CompositeInterrupt

5. Interrupt & Resume

BatchNode supports human-in-the-loop workflows:

// In your inner workflow's lambda:
if needsHumanReview {
    wasInterrupted, _, _ := compose.GetInterruptState[any](ctx)
    if !wasInterrupted {
        // First run: interrupt for human review
        return Result{}, compose.Interrupt(ctx, map[string]string{
            "document_id": docID,
            "reason":      "Requires human approval",
        })
    }
    
    // Resume: get human decision
    isTarget, hasData, decision := compose.GetResumeContext[*Decision](ctx)
    if isTarget && hasData && decision != nil {
        return Result{Approved: decision.Approved}, nil
    }
}

Resume with approval decisions:

// Extract interrupt contexts
info, _ := compose.ExtractInterruptInfo(err)

// Prepare resume data (keyed by interrupt ID)
resumeData := make(map[string]any)
for _, iCtx := range info.InterruptContexts {
    resumeData[iCtx.ID] = &Decision{Approved: true}
}

// Resume
resumeCtx := compose.BatchResumeWithData(ctx, resumeData)
results, err = runner.Invoke(resumeCtx, nil, compose.WithCheckPointID(checkpointID))

Scenarios

Scenario 1: Basic Sequential Processing

Process documents one at a time with MaxConcurrency: 0.

Scenario 2: Concurrent Processing

Process multiple documents in parallel with MaxConcurrency: 3.

Scenario 3: With Compile Options

Configure inner workflow at compile time using InnerCompileOptions.

Scenario 4: With Invoke Options (Callbacks)

Add callbacks for monitoring using callbacks.InitCallbacks.

Scenario 5: Normal Error Handling

Demonstrates how BatchNode handles errors from individual tasks.

Scenario 6: Interrupt & Resume

Human-in-the-loop workflow:

  1. High-priority documents interrupt for human review
  2. Extract interrupt contexts with document IDs
  3. Resume with approval decisions using BatchResumeWithData

Scenario 7: Parent Graph with Reduce Node

  • Integrate BatchNode in a larger pipeline
  • Use WithInnerOptions for progress tracking callbacks
  • Reduce pattern: aggregate batch results into a summary report

Key APIs Used

API Purpose
compose.NewWorkflow Create inner workflow
compose.AppendAddressSegment Create unique address for each sub-task
compose.GetInterruptState Check if resuming from interrupt
compose.GetResumeContext Get resume data for this component
compose.Interrupt Interrupt execution for human input
compose.CompositeInterrupt Bundle multiple interrupt errors
compose.ExtractInterruptInfo Extract interrupt contexts from error
compose.BatchResumeWithData Resume with data for multiple targets
compose.WithCheckPointStore Enable checkpoint persistence
compose.WithCheckPointID Identify checkpoint for resume
callbacks.EnsureRunInfo Setup callback context
callbacks.OnStart/OnEnd/OnError Trigger callbacks
schema.RegisterName Register types for serialization

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      BatchNode                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Input: []ReviewRequest                              │    │
│  └─────────────────────────────────────────────────────┘    │
│                           │                                  │
│                           ▼                                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Concurrency Control (MaxConcurrency)               │    │
│  │  - Sequential (0): one at a time                    │    │
│  │  - Concurrent (>0): parallel with semaphore         │    │
│  └─────────────────────────────────────────────────────┘    │
│                           │                                  │
│         ┌─────────────────┼─────────────────┐               │
│         ▼                 ▼                 ▼               │
│  ┌─────────────┐   ┌─────────────┐   ┌─────────────┐       │
│  │ Inner Task  │   │ Inner Task  │   │ Inner Task  │       │
│  │ (index: 0)  │   │ (index: 1)  │   │ (index: 2)  │       │
│  │             │   │             │   │             │       │
│  │ Workflow/   │   │ Workflow/   │   │ Workflow/   │       │
│  │ Graph       │   │ Graph       │   │ Graph       │       │
│  └─────────────┘   └─────────────┘   └─────────────┘       │
│         │                 │                 │               │
│         ▼                 ▼                 ▼               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Result Collection                                   │    │
│  │  - Success: store in outputs[index]                 │    │
│  │  - Error: return first error                        │    │
│  │  - Interrupt: collect for CompositeInterrupt        │    │
│  └─────────────────────────────────────────────────────┘    │
│                           │                                  │
│                           ▼                                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Output: []ReviewResult                              │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

Interrupt & Resume Flow

First Invocation:
┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  DOC-001 │    │  DOC-002 │    │  DOC-003 │    │  DOC-004 │
│  (high)  │    │ (medium) │    │  (high)  │    │  (low)   │
└────┬─────┘    └────┬─────┘    └────┬─────┘    └────┬─────┘
     │               │               │               │
     ▼               ▼               ▼               ▼
 INTERRUPT       COMPLETE        INTERRUPT       COMPLETE
     │               │               │               │
     └───────────────┴───────────────┴───────────────┘
                           │
                           ▼
              ┌────────────────────────┐
              │   CompositeInterrupt   │
              │   - InterruptContexts  │
              │   - NodeInterruptState │
              └────────────────────────┘

Resume with Approval:
┌──────────┐                    ┌──────────┐
│  DOC-001 │                    │  DOC-003 │
│  (high)  │                    │  (high)  │
└────┬─────┘                    └────┬─────┘
     │                               │
     ▼                               ▼
 GetResumeContext               GetResumeContext
 → Decision{Approved: true}     → Decision{Approved: true}
     │                               │
     ▼                               ▼
 COMPLETE                        COMPLETE
     │                               │
     └───────────────┬───────────────┘
                     │
                     ▼
         ┌─────────────────────┐
         │   Final Results     │
         │   DOC-001: ✓        │
         │   DOC-002: ✓        │
         │   DOC-003: ✓        │
         │   DOC-004: ✓        │
         └─────────────────────┘

Sample Output

=== Document Review Pipeline Example ===

--- Scenario 6: Interrupt & Resume ---
First invocation (will interrupt for high priority docs):
    Document DOC-001 requires human review (high priority)
    Document DOC-003 requires human review (high priority)

  Interrupt detected! Found 2 interrupt context(s):
    1. ID=fd49cbc4-deca-4f02-bdf9-02f921c0c1f5
       Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:0;...
       DocumentID=DOC-001, Reason=High priority document requires human approval
    2. ID=af4a3f99-2414-4d6c-9c06-b9b4b1786044
       Address=runnable:InterruptResumeDemo;node:batch_review;batch_process:2;...
       DocumentID=DOC-003, Reason=High priority document requires human approval

  Resuming with approval decisions...
    Document DOC-001 resumed with decision: approved=true
    Document DOC-003 resumed with decision: approved=true

  Final results after resume:
    - DOC-001: approved=true, comments=Human review: Approved by supervisor
    - DOC-002: approved=true, comments=Auto-approved (non-high priority)
    - DOC-003: approved=true, comments=Human review: Approved by supervisor
    - DOC-004: approved=true, comments=Auto-approved (non-high priority)

Learn More