|
| 1 | +# A2A Federation: Finishing Touches |
| 2 | + |
| 3 | +Three runtime features close the gap between the implemented abstractions and a working end-to-end federation pipeline. Each is small (one service/handler), builds on existing patterns, and requires no new abstractions. |
| 4 | + |
| 5 | +## 1. Convergence Polling Daemon |
| 6 | + |
| 7 | +**Problem**: The parent swarm creates `FederatedGoal` records when it delegates goals to child swarms, but nothing periodically checks whether those goals have converged. The `SwarmOverseer` can measure child state, and `ConvergenceContract::is_satisfied()` can evaluate signals, but no background loop connects them. |
| 8 | + |
| 9 | +**What to build**: A `ConvergencePollingDaemon` service that periodically polls each active `FederatedGoal`, measures child swarm state, evaluates the convergence contract, transitions the goal state, and emits lifecycle events. |
| 10 | + |
| 11 | +### Design |
| 12 | + |
| 13 | +``` |
| 14 | +src/services/federation/convergence_poller.rs |
| 15 | +``` |
| 16 | + |
| 17 | +```rust |
| 18 | +pub struct ConvergencePollingDaemon { |
| 19 | + federation_service: Arc<FederationService>, |
| 20 | + a2a_client: Arc<dyn A2AClient>, |
| 21 | + federated_goal_repo: Arc<dyn FederatedGoalRepository>, |
| 22 | + event_bus: Arc<EventBus>, |
| 23 | + poll_interval: Duration, |
| 24 | +} |
| 25 | +``` |
| 26 | + |
| 27 | +**Lifecycle**: Spawned as a `tokio::spawn` background task when the swarm starts with federation enabled. Stopped via a broadcast shutdown channel (same pattern as `MemoryDecayDaemon`). |
| 28 | + |
| 29 | +**Each tick**: |
| 30 | +1. Query `federated_goal_repo.get_active()` for all non-terminal goals |
| 31 | +2. For each goal in `Delegated` or `Active` or `Converging` state: |
| 32 | + a. Build a `SwarmOverseer` from the goal's `cerebrate_id`, `remote_task_id`, and the cerebrate's URL |
| 33 | + b. Call `overseer.measure()` to get `OverseerSignals` |
| 34 | + c. Build a `ConvergenceSignalSnapshot` from the signals |
| 35 | + d. Call `goal.convergence_contract.is_satisfied(&snapshot)` |
| 36 | + e. Update goal state: `Delegated → Active` on first signal, `Active → Converging` on positive delta, `Converging → Converged` when contract satisfied |
| 37 | + f. Call `federated_goal_repo.update_state()` and `update_signals()` |
| 38 | + g. Emit `FederatedGoalProgress` on each measurement |
| 39 | + h. Emit `FederatedGoalConverged` or `FederatedGoalFailed` on terminal transitions |
| 40 | +3. Sleep for `poll_interval` (default: per-goal `contract.poll_interval_secs`, minimum 30s) |
| 41 | + |
| 42 | +**Error handling**: If `SwarmOverseer::measure()` fails (child unreachable), increment a miss counter on the goal. After N consecutive misses (configurable, default 5), transition to `Failed` with reason "child swarm unreachable". |
| 43 | + |
| 44 | +### Files to create/modify |
| 45 | + |
| 46 | +| Action | File | |
| 47 | +|--------|------| |
| 48 | +| Create | `src/services/federation/convergence_poller.rs` | |
| 49 | +| Modify | `src/services/federation/mod.rs` — add module + re-export | |
| 50 | +| Modify | `src/services/swarm_orchestrator/infrastructure.rs` — spawn daemon on start if federation enabled | |
| 51 | + |
| 52 | +### Existing code to reuse |
| 53 | + |
| 54 | +- `SwarmOverseer::new()` and `measure()` — already handles A2A polling and signal extraction |
| 55 | +- `ConvergenceContract::is_satisfied()` — already evaluates all signal types |
| 56 | +- `FederatedGoalRepository::update_state()` / `update_signals()` — atomic SQLite updates |
| 57 | +- `MemoryDecayDaemon` in `src/services/memory_decay_daemon.rs` — follow the same spawn/shutdown/tick pattern |
| 58 | +- `event_factory::federation_event()` — for event emission |
| 59 | + |
| 60 | +### Tests |
| 61 | + |
| 62 | +- Unit: mock A2AClient returns task with convergence artifacts → daemon transitions goal to Converged |
| 63 | +- Unit: mock A2AClient returns error N times → daemon transitions goal to Failed |
| 64 | +- Unit: contract not yet satisfied → goal stays in Active, signals updated |
| 65 | +- Integration: full cycle with SQLite repo |
| 66 | + |
| 67 | +--- |
| 68 | + |
| 69 | +## 2. Child Convergence Signal Publishing |
| 70 | + |
| 71 | +**Problem**: The child swarm handles `goal_delegate` by creating an `InMemoryTask` in `Working` state, but never updates it with convergence artifacts. The parent's `SwarmOverseer` polls the child's A2A task expecting structured convergence data in artifacts, but finds nothing. |
| 72 | + |
| 73 | +**What to build**: A background task on the child swarm that periodically snapshots local convergence state and attaches it as an A2A artifact on the federated task. |
| 74 | + |
| 75 | +### Design |
| 76 | + |
| 77 | +``` |
| 78 | +src/services/federation/convergence_publisher.rs |
| 79 | +``` |
| 80 | + |
| 81 | +```rust |
| 82 | +pub struct ConvergencePublisher { |
| 83 | + tasks: Arc<RwLock<HashMap<String, InMemoryTask>>>, |
| 84 | + goal_repo: Arc<dyn GoalRepository>, |
| 85 | + task_repo: Arc<dyn TaskRepository>, |
| 86 | + event_bus: Arc<EventBus>, |
| 87 | + poll_interval: Duration, |
| 88 | +} |
| 89 | +``` |
| 90 | + |
| 91 | +**Each tick**: |
| 92 | +1. Scan `tasks` for any InMemoryTask with `metadata.abathur:federation.intent == "goal_delegate"` and state `Working` |
| 93 | +2. For each such task: |
| 94 | + a. Read the convergence contract from `metadata.abathur:federation.convergence_contract` |
| 95 | + b. Query local state to build signal values: |
| 96 | + - `build_passing`: run the configured build check command (from `abathur.toml [checks].build`) |
| 97 | + - `test_pass_rate`: run the configured test command, parse results |
| 98 | + - `convergence_level`: compute from overseer signals if convergence engine is active, else from task completion ratio |
| 99 | + - `tasks_completed` / `tasks_total`: query `task_repo` for tasks under the local goal |
| 100 | + c. Build an `A2AProtocolArtifact` with a `Data` part containing the signal snapshot: |
| 101 | + ```json |
| 102 | + { |
| 103 | + "convergence_level": 0.73, |
| 104 | + "build_passing": true, |
| 105 | + "test_pass_rate": 0.95, |
| 106 | + "type_check_clean": true, |
| 107 | + "tasks_completed": 12, |
| 108 | + "tasks_total": 15 |
| 109 | + } |
| 110 | + ``` |
| 111 | + d. Append the artifact to the InMemoryTask's artifact list |
| 112 | + e. If the convergence contract is self-satisfied (child can evaluate locally), transition the task to `Completed` |
| 113 | + |
| 114 | +**Artifact format**: Must match what `SwarmOverseer::measure()` expects — it looks for `A2APart::Data` with keys `convergence_level`, `build_passing`, `test_pass_rate`, `type_check_clean`, `security_issues`. |
| 115 | + |
| 116 | +### Simpler alternative |
| 117 | + |
| 118 | +Instead of a separate daemon, hook into the existing event reactor. When the child's convergence engine produces `ConvergenceIteration` events, translate them into artifact updates on the federated task. This is more reactive and avoids polling overhead. |
| 119 | + |
| 120 | +```rust |
| 121 | +// In a new EventHandler registered on the child: |
| 122 | +// Listen for ConvergenceIteration { task_id, convergence_level, ... } |
| 123 | +// Find the InMemoryTask for the federated goal that owns this task |
| 124 | +// Append a convergence artifact |
| 125 | +``` |
| 126 | + |
| 127 | +### Files to create/modify |
| 128 | + |
| 129 | +| Action | File | |
| 130 | +|--------|------| |
| 131 | +| Create | `src/services/federation/convergence_publisher.rs` | |
| 132 | +| Modify | `src/services/federation/mod.rs` — add module | |
| 133 | +| Modify | `src/adapters/mcp/a2a_http.rs` — spawn publisher for goal_delegate tasks | |
| 134 | + |
| 135 | +### Existing code to reuse |
| 136 | + |
| 137 | +- `InMemoryTask` and `A2AProtocolArtifact` types in `a2a_http.rs` |
| 138 | +- `OverseerClusterService::measure()` — if the child has overseers configured, reuse their signals |
| 139 | +- Convergence check commands from `abathur.toml` `[checks]` section |
| 140 | + |
| 141 | +### Tests |
| 142 | + |
| 143 | +- Unit: publisher builds correct artifact format from mock goal/task state |
| 144 | +- Unit: artifact format matches what SwarmOverseer::measure() parses |
| 145 | +- Integration: child publishes, parent SwarmOverseer reads — signal values round-trip correctly |
| 146 | + |
| 147 | +--- |
| 148 | + |
| 149 | +## 3. DAG Execution from CLI |
| 150 | + |
| 151 | +**Problem**: `abathur swarm dag create --file pipeline.yaml` validates and displays the DAG but doesn't start execution. There's no way to kick off the pipeline. |
| 152 | + |
| 153 | +**What to build**: A `dag start` CLI command that creates a parent goal, wires up the `SwarmDagExecutor`, and begins delegation. |
| 154 | + |
| 155 | +### Design |
| 156 | + |
| 157 | +Add to `DagCommand` enum: |
| 158 | +```rust |
| 159 | +/// Start executing a DAG (delegates root nodes immediately) |
| 160 | +Start { |
| 161 | + /// Path to the YAML DAG specification file |
| 162 | + #[arg(long)] |
| 163 | + file: String, |
| 164 | + /// Goal name for the parent goal that owns this pipeline |
| 165 | + #[arg(long, default_value = "Pipeline execution")] |
| 166 | + goal_name: String, |
| 167 | +}, |
| 168 | +``` |
| 169 | + |
| 170 | +**`dag_start()` implementation**: |
| 171 | +1. Parse the YAML file into a `SwarmDag` (reuse `dag_create` parsing logic — extract into shared helper) |
| 172 | +2. Connect to the running swarm's database and federation service |
| 173 | +3. Create a parent goal via `GoalService::create()` with the pipeline name |
| 174 | +4. Create a `SwarmDagExecutor` with the running `FederationService` and `EventBus` |
| 175 | +5. Call `executor.start(&mut dag, &goal)` to delegate root nodes |
| 176 | +6. Store the DAG in the `SwarmDagEventHandler`'s active DAGs map (so convergence events drive it forward) |
| 177 | +7. Print the initial DAG state and delegated node IDs |
| 178 | + |
| 179 | +**Runtime integration**: The DAG needs to be stored where the `SwarmDagEventHandler` can find it. Options: |
| 180 | +- Pass it via the event bus (emit a `SwarmDagCreated` event that the handler picks up) |
| 181 | +- Store in a shared `Arc<RwLock<HashMap<Uuid, SwarmDag>>>` accessible from both CLI and handler |
| 182 | +- Persist to the database and load on startup |
| 183 | + |
| 184 | +The database approach is the most robust — add a `swarm_dags` table and a `SwarmDagRepository`. |
| 185 | + |
| 186 | +### Additional commands |
| 187 | + |
| 188 | +``` |
| 189 | +abathur swarm dag stop <dag-id> # Cancel all non-terminal nodes |
| 190 | +abathur swarm dag retry <dag-id> # Retry failed nodes |
| 191 | +``` |
| 192 | + |
| 193 | +### Files to create/modify |
| 194 | + |
| 195 | +| Action | File | |
| 196 | +|--------|------| |
| 197 | +| Modify | `src/cli/commands/swarm.rs` — add `Start` variant, `dag_start()` handler, extract YAML parsing | |
| 198 | +| Create | `src/domain/ports/swarm_dag_repository.rs` — persistence trait (optional, could defer) | |
| 199 | +| Create | `src/adapters/sqlite/swarm_dag_repository.rs` — SQLite impl (optional) | |
| 200 | +| Modify | `src/services/swarm_orchestrator/infrastructure.rs` — register `SwarmDagEventHandler` on startup | |
| 201 | + |
| 202 | +### Existing code to reuse |
| 203 | + |
| 204 | +- `dag_create()` YAML parsing in `swarm.rs` — extract the `DagYamlSpec::into_swarm_dag()` into a shared function |
| 205 | +- `SwarmDagExecutor::start()` — already implemented |
| 206 | +- `GoalService::create_goal()` — for creating the parent goal |
| 207 | +- `FederationService` — already has `delegate_goal()` wired |
| 208 | + |
| 209 | +### Tests |
| 210 | + |
| 211 | +- Integration: `dag start` with mock federation service → root nodes delegated |
| 212 | +- Integration: full pipeline with 3 mock child swarms → all nodes converge in order |
| 213 | + |
| 214 | +--- |
| 215 | + |
| 216 | +## Implementation Order |
| 217 | + |
| 218 | +``` |
| 219 | +1. Child convergence publisher (unblocks parent polling) |
| 220 | +2. Convergence polling daemon (unblocks DAG progression) |
| 221 | +3. DAG start CLI command (unblocks user interaction) |
| 222 | +``` |
| 223 | + |
| 224 | +Item 1 must come first because without it, the parent has nothing to measure. Item 2 depends on 1 because it polls what 1 publishes. Item 3 depends on 2 because DAG execution requires convergence events to drive node transitions. |
| 225 | + |
| 226 | +Each item is independently testable — you can verify 1 works by checking the child's A2A task artifacts, verify 2 works by watching FederatedGoalConverged events, and verify 3 works by running the full pipeline. |
| 227 | + |
| 228 | +## Estimated Scope |
| 229 | + |
| 230 | +| Item | New Files | Modified Files | Complexity | |
| 231 | +|------|-----------|---------------|------------| |
| 232 | +| Convergence publisher | 1 | 2 | Small — follows existing daemon patterns | |
| 233 | +| Convergence poller | 1 | 2 | Small — reuses SwarmOverseer + contract eval | |
| 234 | +| DAG start CLI | 0-2 | 2 | Medium — needs runtime integration with handler | |
| 235 | + |
| 236 | +All three items together are roughly the same scope as Phase 2 was. |
0 commit comments