|
| 1 | +# API/CLI Shared Task Execution Engine |
| 2 | + |
| 3 | +## Problem Statement |
| 4 | + |
| 5 | +The API and CLI currently have completely different task execution implementations, with the API missing critical task lifecycle management that the CLI has. This causes the API to create Task instances but not properly execute them, leading to tasks that appear to hang without any actual processing. |
| 6 | + |
| 7 | +**Current State:** |
| 8 | + |
| 9 | +- **CLI**: Has sophisticated task execution with completion detection, event handling, timeouts, and proper cleanup in `BatchProcessor.executeTaskWithCompletionDetection()` |
| 10 | +- **API**: Simply creates a Task instance and waits on the promise with no event handling or lifecycle management |
| 11 | +- **Result**: API tasks don't actually execute the task loop, leading to hanging behavior |
| 12 | + |
| 13 | +**Evidence from Logs:** |
| 14 | + |
| 15 | +- CLI shows detailed task lifecycle: `[TaskLifecycle] Starting task`, `[Task] Initiating task loop`, `[BatchProcessor] executeTaskWithCompletionDetection` |
| 16 | +- API shows only: `Task.create() completed`, `Started job`, then nothing - no task loop initiation |
| 17 | + |
| 18 | +## Root Cause Analysis |
| 19 | + |
| 20 | +The [`Task.create()`](src/core/task/Task.ts:592) method creates a Task instance and returns a promise, but the promise only represents the initial task setup, not the complete execution. The actual task execution requires: |
| 21 | + |
| 22 | +1. **Event Handling**: Listening for `taskCompleted`, `taskAborted`, `message` events |
| 23 | +2. **Completion Detection**: Detecting when informational queries are complete |
| 24 | +3. **Timeout Management**: Emergency timeouts and query-specific timeouts |
| 25 | +4. **Resource Cleanup**: Proper disposal of Task resources |
| 26 | +5. **Response Monitoring**: For informational queries, detecting when responses are complete |
| 27 | + |
| 28 | +The CLI's [`BatchProcessor.executeTaskWithCompletionDetection()`](src/cli/commands/batch.ts:274) implements all of this, while the API has none of it. |
| 29 | + |
| 30 | +## Solution: Extract Shared Task Execution Orchestrator |
| 31 | + |
| 32 | +### Architecture Overview |
| 33 | + |
| 34 | +```mermaid |
| 35 | +graph TB |
| 36 | + subgraph "Current State" |
| 37 | + CLI1[CLI BatchProcessor] --> TaskExec1[executeTaskWithCompletionDetection] |
| 38 | + API1[API FastifyServer] --> TaskCreate1[Task.create() + wait promise] |
| 39 | + TaskExec1 --> Task1[Task Instance] |
| 40 | + TaskCreate1 --> Task2[Task Instance] |
| 41 | + end |
| 42 | +
|
| 43 | + subgraph "Proposed State" |
| 44 | + CLI2[CLI BatchProcessor] --> SharedOrch[TaskExecutionOrchestrator] |
| 45 | + API2[API FastifyServer] --> SharedOrch |
| 46 | + SharedOrch --> Task3[Task Instance] |
| 47 | + SharedOrch --> EventHandling[Event Handling] |
| 48 | + SharedOrch --> TimeoutMgmt[Timeout Management] |
| 49 | + SharedOrch --> CompletionDetection[Completion Detection] |
| 50 | + SharedOrch --> Cleanup[Resource Cleanup] |
| 51 | + end |
| 52 | +``` |
| 53 | + |
| 54 | +### Implementation Plan |
| 55 | + |
| 56 | +#### Phase 1: Extract Core Execution Logic |
| 57 | + |
| 58 | +**Story 1.1: Create TaskExecutionOrchestrator** |
| 59 | + |
| 60 | +- Extract [`executeTaskWithCompletionDetection`](src/cli/commands/batch.ts:274) logic from CLI BatchProcessor |
| 61 | +- Create `src/core/task/TaskExecutionOrchestrator.ts` |
| 62 | +- Abstract output/interaction handlers via interfaces |
| 63 | +- Maintain all existing completion detection and timeout logic |
| 64 | + |
| 65 | +**Story 1.2: Create Execution Interfaces** |
| 66 | + |
| 67 | +- `ITaskExecutionHandler` interface for output/interaction |
| 68 | +- `TaskExecutionOptions` for timeout and completion settings |
| 69 | +- `TaskExecutionResult` for standardized results |
| 70 | + |
| 71 | +#### Phase 2: API Integration |
| 72 | + |
| 73 | +**Story 2.1: Implement API Task Execution Handler** |
| 74 | + |
| 75 | +- Create `ApiTaskExecutionHandler` that implements `ITaskExecutionHandler` |
| 76 | +- Integrate with SSE streaming for real-time output |
| 77 | +- Handle user interactions via SSE question/response flow |
| 78 | +- Map Task events to SSE events |
| 79 | + |
| 80 | +**Story 2.2: Update API FastifyServer** |
| 81 | + |
| 82 | +- Replace current task promise waiting with `TaskExecutionOrchestrator` |
| 83 | +- Remove custom timeout/monitoring code (use orchestrator's) |
| 84 | +- Ensure proper resource cleanup on client disconnect |
| 85 | + |
| 86 | +#### Phase 3: CLI Migration |
| 87 | + |
| 88 | +**Story 3.1: Implement CLI Task Execution Handler** |
| 89 | + |
| 90 | +- Create `CliTaskExecutionHandler` that implements `ITaskExecutionHandler` |
| 91 | +- Maintain current console output behavior |
| 92 | +- Handle user interactions via CLI prompts |
| 93 | + |
| 94 | +**Story 3.2: Update CLI BatchProcessor** |
| 95 | + |
| 96 | +- Replace `executeTaskWithCompletionDetection` with `TaskExecutionOrchestrator` |
| 97 | +- Remove duplicated logic |
| 98 | +- Ensure backward compatibility |
| 99 | + |
| 100 | +#### Phase 4: Testing & Validation |
| 101 | + |
| 102 | +**Story 4.1: Integration Testing** |
| 103 | + |
| 104 | +- Verify API now properly executes tasks (shows task loop initiation) |
| 105 | +- Verify CLI maintains existing behavior |
| 106 | +- Test timeout and completion detection in both contexts |
| 107 | + |
| 108 | +**Story 4.2: Performance & Reliability** |
| 109 | + |
| 110 | +- Ensure no performance regression |
| 111 | +- Test resource cleanup under various scenarios |
| 112 | +- Validate event handling consistency |
| 113 | + |
| 114 | +### Key Components |
| 115 | + |
| 116 | +#### TaskExecutionOrchestrator |
| 117 | + |
| 118 | +```typescript |
| 119 | +class TaskExecutionOrchestrator { |
| 120 | + async executeTask( |
| 121 | + task: Task, |
| 122 | + taskPromise: Promise<void>, |
| 123 | + handler: ITaskExecutionHandler, |
| 124 | + options: TaskExecutionOptions, |
| 125 | + ): Promise<TaskExecutionResult> |
| 126 | + |
| 127 | + private setupEventHandlers(task: Task, handler: ITaskExecutionHandler) |
| 128 | + private setupTimeouts(options: TaskExecutionOptions) |
| 129 | + private setupCompletionDetection(task: Task, isInfoQuery: boolean) |
| 130 | + private cleanup(task: Task) |
| 131 | +} |
| 132 | +``` |
| 133 | + |
| 134 | +#### ITaskExecutionHandler |
| 135 | + |
| 136 | +```typescript |
| 137 | +interface ITaskExecutionHandler { |
| 138 | + onTaskStarted(taskId: string): Promise<void> |
| 139 | + onTaskCompleted(taskId: string, result: string): Promise<void> |
| 140 | + onTaskFailed(taskId: string, error: Error): Promise<void> |
| 141 | + onTaskMessage(message: any): Promise<void> |
| 142 | + onTaskProgress(progress: number, message: string): Promise<void> |
| 143 | +} |
| 144 | +``` |
| 145 | + |
| 146 | +### Benefits |
| 147 | + |
| 148 | +1. **Consistency**: Both API and CLI use identical task execution logic |
| 149 | +2. **Maintainability**: Single source of truth for task lifecycle management |
| 150 | +3. **Reliability**: Proven CLI logic extends to API |
| 151 | +4. **Feature Parity**: API gains all CLI completion detection and timeout features |
| 152 | +5. **Testability**: Shared logic can be thoroughly tested once |
| 153 | + |
| 154 | +### Migration Strategy |
| 155 | + |
| 156 | +1. **Phase 1**: Extract and test orchestrator in isolation |
| 157 | +2. **Phase 2**: Integrate with API first (high impact, easier to test) |
| 158 | +3. **Phase 3**: Migrate CLI (lower risk, maintains compatibility) |
| 159 | +4. **Phase 4**: Remove duplicate code and optimize |
| 160 | + |
| 161 | +### Success Criteria |
| 162 | + |
| 163 | +- API logs show same detailed task execution as CLI: `[Task] Initiating task loop`, event handling, completion detection |
| 164 | +- Both API and CLI can execute the test query "list your MCP servers" successfully |
| 165 | +- No performance regression in either interface |
| 166 | +- Consistent behavior between CLI batch mode and API streaming mode |
| 167 | +- Proper resource cleanup and timeout handling in both contexts |
| 168 | + |
| 169 | +## Technical Details |
| 170 | + |
| 171 | +### Current CLI Flow (Working) |
| 172 | + |
| 173 | +1. `Task.create()` → returns [instance, promise] |
| 174 | +2. `BatchProcessor.executeTaskWithCompletionDetection()` → sets up event handlers |
| 175 | +3. Task events (`taskCompleted`, `message`) → trigger completion logic |
| 176 | +4. `task.dispose()` → cleanup |
| 177 | + |
| 178 | +### Current API Flow (Broken) |
| 179 | + |
| 180 | +1. `Task.create()` → returns [instance, promise] |
| 181 | +2. Wait on promise directly → **missing event handling** |
| 182 | +3. No completion detection → **tasks appear to hang** |
| 183 | +4. No proper cleanup → **resource leaks** |
| 184 | + |
| 185 | +### Proposed Shared Flow |
| 186 | + |
| 187 | +1. `Task.create()` → returns [instance, promise] |
| 188 | +2. `TaskExecutionOrchestrator.executeTask()` → unified event handling |
| 189 | +3. Handler-specific output (console vs SSE) → **abstracted interface** |
| 190 | +4. Consistent completion detection and cleanup → **shared logic** |
| 191 | + |
| 192 | +This approach reuses the existing, proven task execution logic rather than creating duplicate implementations. |
0 commit comments