diff --git a/extensions/saga/__init__.py b/extensions/saga/__init__.py new file mode 100644 index 00000000..7de976d6 --- /dev/null +++ b/extensions/saga/__init__.py @@ -0,0 +1,5 @@ +"""A2A-SAGA extension package. + +This package contains the Saga Orchestration extension for the +Agent2Agent (A2A) protocol. +""" diff --git a/extensions/saga/a2a-saga-extension-rfc.md b/extensions/saga/a2a-saga-extension-rfc.md new file mode 100644 index 00000000..27397a5b --- /dev/null +++ b/extensions/saga/a2a-saga-extension-rfc.md @@ -0,0 +1,848 @@ +--- +title: A2A-SAGA — Saga Orchestration Extension for Agent2Agent (A2A) +status: Draft +category: Standards Track (A2A Extension) +version: 1 +extension_uri: https://a2a.dev/extensions/saga/v1 +proposed_by: origo Labs (GitHub: @origo-labs) +last_updated: 2026-01-01 +--- + +## 1. Abstract + +This document specifies **A2A-SAGA**, an extension to the Agent2Agent (A2A) protocol that standardizes **saga-style orchestration** for multi-agent workflows. + +A2A-SAGA defines interoperable, on-the-wire semantics for: + +- executing multi-step workflows across agents, +- determining whether actions took effect (including uncertain outcomes), +- compensating previously completed actions when later steps fail, +- safe retries via idempotency, and +- parallel step execution with deterministic recovery ordering. + +This RFC is **fully self-contained**: all protocol rules, JSON Schemas, and normative test vectors required for conformance are included. + +## 2. Motivation + +As agents evolve from conversational assistants into autonomous actors, they increasingly perform **state-changing actions** across external systems: creating records, provisioning infrastructure, modifying configurations, and sending notifications. + +The reliability challenge is not merely issuing actions, but **knowing what actually happened** and recovering safely when actions fail or outcomes are uncertain. + +### 2.1 The Need for Specialized Verbs + +While the A2A Core `Message` object handles conversational turns effectively, it lacks the semantic precision required for reliable distributed transactions. Overloading the generic `Message` with unstructured metadata creates brittle, unvalidatable contracts. + +A2A-SAGA introduces explicit JSON-RPC verbs (`saga.step.execute`, `saga.step.verify`, `saga.step.compensate`) to strictly define the lifecycle of a transaction. This ensures that: + +1. **Intent is Unambiguous:** A `compensate` request cannot be mistaken for a new task. +2. **Validation is Enforceable:** Schemas can strictly enforce `idempotency_key` presence for state-changing operations, which is optional in generic messages. +3. **Tooling is Robust:** Middleware can automatically route saga steps to specific queues or audit logs based on the method name alone, without parsing payload bodies. + + +### 2.2 Outcome Uncertainty & Partial Completion + +In real systems, outcomes such as _unknown_ are unavoidable due to timeouts and asynchronous processing. Multi-step workflows also encode invariants (e.g., “if X exists, Y must exist”). When a later step fails, earlier successful actions can leave systems in an unacceptable intermediate state. + +A2A-SAGA standardizes the **Compensation Pattern**: best-effort undo/mitigation to restore acceptable invariants, rather than perfect rollback. + +## 3. Goals and Non-Goals + +### 3.1 Goals + +- Interoperable saga orchestration across A2A agents. +- Explicit, observable outcomes (`succeeded`, `failed`, `unknown`). +- Safe retries via mandated idempotency. +- Deterministic compensation ordering (including parallel steps). +- **Discovery:** Standardized advertisement of saga roles via the Agent Card. + +### 3.2 Non-Goals + +- Atomic multi-party commit guarantees (2PC/XA). +- Distributed consensus or leader election. +- Standardization of agent internal reasoning/planning. +- Byzantine fault tolerance. + +## 4. Terminology + +The key words **MUST**, **MUST NOT**, **SHOULD**, **SHOULD NOT**, and **MAY** are to be interpreted as described in RFC 2119. + +- **Saga:** Ordered workflow of steps with compensation semantics. +- **Orchestrator:** A _role_ coordinating a saga. +- **Participant:** Agent executing steps. +- **Compensation:** Best-effort undo or mitigation. +- **Verification:** Operation to determine whether an effect occurred. +- **Protocol Error:** A failure in the RPC mechanics (e.g., malformed JSON, missing header). +- **Application Error:** A valid RPC call resulting in a logical failure (e.g., "Out of Stock"). + +## 5. Extension Identification and Activation + +**Extension URI:** `https://a2a.dev/extensions/saga/v1` + +### 5.1 Header Negotiation (Normative) + +Clients invoking any method defined in this specification (`saga.*`) MUST include the `A2A-Extensions` HTTP header (or equivalent transport metadata). The header value is a comma-separated list of extension URIs to activate for the request (see `extensions/extensions.md`). + +- **Header Name:** `A2A-Extensions` + - **Legacy Alias:** Implementations MAY also accept `X-A2A-Extensions` for backward compatibility only. The `X-` prefix is deprecated by RFC 6648. +- **Header Value:** Must contain the Extension URI as one item in the comma-separated list. + - _Example:_ `A2A-Extensions: https://a2a.dev/extensions/saga/v1` + +**Server Behavior:** + +1. If the header is present and the server supports the extension, it MUST process the request according to this RFC. + +2. If the header is **missing** or does not include this extension URI, and the client invokes a `saga.*` method, the server MUST reject the request with JSON-RPC error code `-32601` (Method not found) or a specific extension error `-32001` (Extension required). + - If the Agent Card declares the extension as `required: true`, the server SHOULD use `-32001` to make the requirement explicit. + + +### 5.2 Discovery: Agent Card Declaration (Normative) + +Agents supporting this extension MUST advertise their capabilities in the `capabilities` section of their `agent.json` (or equivalent discovery document). + +**Location:** `capabilities.extensions["https://a2a.dev/extensions/saga/v1"]` + +**Schema:** + +```json +{ + "type": "object", + "required": ["roles"], + "properties": { + "roles": { + "type": "array", + "items": { "type": "string", "enum": ["orchestrator", "participant"] }, + "minItems": 1 + }, + "max_concurrent_sagas": { "type": "integer", "description": "Optional limit on concurrency" } + } +} +``` + +_Example `agent.json` snippet:_ + +```json +{ + "capabilities": { + "extensions": { + "https://a2a.dev/extensions/saga/v1": { + "roles": ["participant"], + "max_concurrent_sagas": 50 + } + } + } +} +``` + +## 6. Protocol Surface + +A2A-SAGA defines the following JSON-RPC methods: + +1. `saga.start` (Orchestrator entry point) +2. `saga.step.execute` (Participant action) +3. `saga.step.verify` (Participant check) +4. `saga.step.compensate` (Participant undo) +5. `saga.abort` (Orchestrator cancellation) +6. `saga.status` (Observability) + +### 6.1 Error Handling Strategy + +Implementations MUST distinguish between **Protocol Errors** and **Application Failures**. + +- **Protocol Errors:** Returned as JSON-RPC Error objects (non-200 semantic). + - `-32600`: Invalid Request (e.g., missing `idempotency_key`). + - `-32602`: Invalid Params (e.g., schema violation). +- **Application Failures:** Returned as a valid JSON-RPC `result` object with `status: "failed"` or `status: "unknown"`. + - This allows the orchestrator to inspect the `failure` payload (reason, criticality) and decide on retry vs. compensation. + +## 7. Parallel Group Semantics (Normative) + +### 7.1 Grouping + +Steps MAY include a `group` identifier. Steps with the same group MAY be executed concurrently. + +### 7.2 Execution Ordering + +Let groups be ordered by first appearance in the saga’s `steps` array: `G1, G2, …, Gn`. + +An orchestrator that supports parallel groups (conformance O1) MUST NOT begin executing any step in `Gi+1` until all steps in `Gi` have reached a **terminal decision**. + +A **terminal decision** is one of: + +- `succeeded` +- `failed` +- `unknown` (only after verification has confirmed the outcome is truly unresolvable or negative). + + +### 7.3 Compensation Ordering + +When compensation is initiated: + +1. Only steps known to have applied effects (i.e., `succeeded` or `unknown` verified as applied) are eligible for compensation. +2. Compensation MUST proceed in reverse group order: `Gn … G1`. +3. Within a group, compensations MAY be run concurrently. + +## 8. Failure Semantics (Normative) + +### 8.1 Failure Classification (On Wire) + +If a step result is not successful (`failed`, `unknown`, or `pending_approval`), participants MUST include a `failure` object containing `failure_class` and `reason`. + +`failure_class` MUST be one of: + +- `transient`: (e.g., network timeout) - Retry likely to succeed. +- `deterministic`: (e.g., validation error) - Retry will fail. +- `policy`: (e.g., quota exceeded) - Retry depends on policy reset. +- `semantic`: (e.g., item out of stock) - Logic failure. +- `unknown`: Root cause undetermined. + +### 8.2 Unknown Outcomes & Verification + +If a participant returns `status = "unknown"`: + +1. If the step definition includes a `verify` action AND the participant supports `saga.step.verify`: +- The orchestrator MUST invoke `saga.step.verify` before deciding to retry or compensate. +2. If verification confirms the action succeeded, the step is treated as `succeeded`. +3. If verification confirms the action did _not_ happen, the orchestrator MAY retry (if safe) or fail. +4. If verification returns `status = "not_supported"`, the orchestrator MUST treat the step as having an **unknown** outcome and decide by policy whether to retry `execute`, fail the saga, or compensate. + +## 9. Idempotency (Normative) + +### 9.1 Required Keys + +All `saga.step.execute` and `saga.step.compensate` requests MUST include `idempotency_key`. + +### 9.2 Participant Obligations + +Participants MUST ensure that repeated calls with the same `idempotency_key`: + +1. **Safety:** MUST NOT apply additional side effects. +2. **Consistency:** MUST return a **consistent** `status` and `evidence` structure that is stable across replays. + +## 10. Conformance + +### 10.1 Participant Conformance + +- **P-Min:** Implements `saga.step.execute` with strict idempotency. +- **P0:** P-Min + `saga.step.compensate`. +- **P1 (Recommended):** P0 + `saga.step.verify`. + +### 10.2 Orchestrator Conformance + +- **O0:** Serial execution, basic compensation on failure. +- **O1:** Supports parallel groups (Section 7) and verification logic (Section 8.2). + +## Appendix A — Normative JSON Schemas + +**Notes:** Schemas are JSON Schema Draft 2020-12. + +## A.1 `common` schema + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "A2A-SAGA Common Types", + "type": "object", + "$defs": { + "SagaId": { + "type": "string", + "minLength": 1, + "maxLength": 256, + "pattern": "^[A-Za-z0-9._:-]+$" + }, + "StepId": { + "type": "string", + "minLength": 1, + "maxLength": 128, + "pattern": "^[A-Za-z0-9._:-]+$" + }, + "IdempotencyKey": { + "type": "string", + "minLength": 8, + "maxLength": 512 + }, + "FailureClass": { + "type": "string", + "enum": ["transient", "deterministic", "policy", "semantic", "unknown"] + }, + "StepStatus": { + "type": "string", + "enum": ["succeeded", "failed", "unknown", "pending_approval"] + }, + "VerifyStatus": { + "type": "string", + "enum": ["verified", "not_verified", "unknown", "not_supported"] + }, + "CompensateStatus": { + "type": "string", + "enum": ["compensated", "failed", "unknown", "pending_approval", "not_supported"] + }, + "Reversibility": { + "type": "string", + "enum": ["full", "partial", "none"] + }, + "Criticality": { + "type": "string", + "enum": ["low", "medium", "high"] + }, + "ActionRef": { + "type": "object", + "additionalProperties": false, + "required": ["action", "args"], + "properties": { + "action": { "type": "string", "minLength": 1, "maxLength": 256 }, + "args": { "type": "object" } + } + }, + "ExecuteSpec": { + "allOf": [ + { "$ref": "#/$defs/ActionRef" }, + { + "type": "object", + "additionalProperties": false, + "required": ["idempotency_key"], + "properties": { + "action": { "type": "string" }, + "args": { "type": "object" }, + "idempotency_key": { "$ref": "#/$defs/IdempotencyKey" } + } + } + ] + }, + "CompensateSpec": { + "allOf": [ + { "$ref": "#/$defs/ActionRef" }, + { + "type": "object", + "additionalProperties": false, + "required": ["idempotency_key"], + "properties": { + "action": { "type": "string" }, + "args": { "type": "object" }, + "idempotency_key": { "$ref": "#/$defs/IdempotencyKey" } + } + } + ] + }, + "VerifySpec": { "$ref": "#/$defs/ActionRef" }, + "FailureInfo": { + "type": "object", + "additionalProperties": false, + "required": ["failure_class", "reason"], + "properties": { + "failure_class": { "$ref": "#/$defs/FailureClass" }, + "reason": { "type": "string", "minLength": 1, "maxLength": 4096 }, + "retry_after_ms": { "type": "integer", "minimum": 0, "maximum": 86400000 }, + "details": { "type": "object" } + } + }, + "Evidence": { + "type": "object", + "description": "Opaque evidence payload; may include signed receipts, hashes, timestamps.", + "additionalProperties": true + }, + "StepDefinition": { + "type": "object", + "additionalProperties": false, + "required": ["step_id", "participant", "execute", "reversibility", "criticality"], + "properties": { + "step_id": { "$ref": "#/$defs/StepId" }, + "participant": { + "type": "string", + "description": "Participant identifier or URL.", + "minLength": 1, + "maxLength": 2048 + }, + "execute": { "$ref": "#/$defs/ExecuteSpec" }, + "verify": { "$ref": "#/$defs/VerifySpec" }, + "compensate": { "$ref": "#/$defs/CompensateSpec" }, + "reversibility": { "$ref": "#/$defs/Reversibility" }, + "criticality": { "$ref": "#/$defs/Criticality" }, + "group": { + "type": "string", + "description": "Optional parallel group identifier.", + "minLength": 1, + "maxLength": 128, + "pattern": "^[A-Za-z0-9._:-]+$" + }, + "metadata": { "type": "object" } + } + } + } +} +``` + +## A.2 `saga.start` schemas + +### A.2.1 `saga.start.params` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.start params", + "type": "object", + "additionalProperties": false, + "required": ["saga_id", "goal", "steps"], + "properties": { + "saga_id": { "$ref": "#/$defs/SagaId" }, + "goal": { "type": "string", "minLength": 1, "maxLength": 4096 }, + "steps": { + "type": "array", + "items": { "$ref": "#/$defs/StepDefinition" }, + "minItems": 1 + }, + "policy": { + "type": "object", + "additionalProperties": false, + "properties": { + "require_approval_for": { + "type": "array", + "items": { "$ref": "#/$defs/Criticality" } + }, + "max_irreversible_steps": { "type": "integer", "minimum": 0, "maximum": 1000 } + } + }, + "metadata": { "type": "object" } + }, + "$defs": {} +} +``` + +### A.2.2 `saga.start.result` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.start result", + "type": "object", + "additionalProperties": false, + "required": ["status", "state"], + "properties": { + "status": { "type": "string", "enum": ["accepted", "rejected"] }, + "state": { + "type": "string", + "enum": ["saga_running", "saga_waiting", "saga_failed", "saga_compensating", "saga_compensated", "saga_completed"] + }, + "failure": { "$ref": "#/$defs/FailureInfo" } + }, + "$defs": {} +} +``` + +## A.3 `saga.step.execute` schemas + +### A.3.1 `saga.step.execute.params` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.step.execute params", + "type": "object", + "additionalProperties": false, + "required": ["saga_id", "step_id", "execute"], + "properties": { + "saga_id": { "$ref": "#/$defs/SagaId" }, + "step_id": { "$ref": "#/$defs/StepId" }, + "execute": { "$ref": "#/$defs/ExecuteSpec" }, + "metadata": { "type": "object" } + }, + "$defs": {} +} +``` + +### A.3.2 `saga.step.execute.result` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.step.execute result", + "type": "object", + "additionalProperties": false, + "required": ["status"], + "properties": { + "status": { "$ref": "#/$defs/StepStatus" }, + "evidence": { "$ref": "#/$defs/Evidence" }, + "failure": { "$ref": "#/$defs/FailureInfo" } + }, + "allOf": [ + { + "if": { "properties": { "status": { "const": "succeeded" } } }, + "then": { "required": ["evidence"] } + }, + { + "if": { "properties": { "status": { "enum": ["failed", "unknown", "pending_approval"] } } }, + "then": { "required": ["failure"] } + } + ], + "$defs": {} +} +``` + +## A.4 `saga.step.verify` schemas + +### A.4.1 `saga.step.verify.params` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.step.verify params", + "type": "object", + "additionalProperties": false, + "required": ["saga_id", "step_id", "verify"], + "properties": { + "saga_id": { "$ref": "#/$defs/SagaId" }, + "step_id": { "$ref": "#/$defs/StepId" }, + "verify": { "$ref": "#/$defs/VerifySpec" }, + "metadata": { "type": "object" } + }, + "$defs": {} +} +``` + +### A.4.2 `saga.step.verify.result` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.step.verify result", + "type": "object", + "additionalProperties": false, + "required": ["status"], + "properties": { + "status": { "$ref": "#/$defs/VerifyStatus" }, + "details": { "type": "object" }, + "failure": { "$ref": "#/$defs/FailureInfo" } + }, + "$defs": {} +} +``` + +## A.5 `saga.step.compensate` schemas + +### A.5.1 `saga.step.compensate.params` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.step.compensate params", + "type": "object", + "additionalProperties": false, + "required": ["saga_id", "step_id", "compensate"], + "properties": { + "saga_id": { "$ref": "#/$defs/SagaId" }, + "step_id": { "$ref": "#/$defs/StepId" }, + "compensate": { "$ref": "#/$defs/CompensateSpec" }, + "metadata": { "type": "object" } + }, + "$defs": {} +} +``` + +### A.5.2 `saga.step.compensate.result` + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "saga.step.compensate result", + "type": "object", + "additionalProperties": false, + "required": ["status"], + "properties": { + "status": { "$ref": "#/$defs/CompensateStatus" }, + "evidence": { "$ref": "#/$defs/Evidence" }, + "failure": { "$ref": "#/$defs/FailureInfo" } + }, + "allOf": [ + { + "if": { "properties": { "status": { "const": "compensated" } } }, + "then": { "required": ["evidence"] } + }, + { + "if": { "properties": { "status": { "enum": ["failed", "unknown", "pending_approval"] } } }, + "then": { "required": ["failure"] } + } + ], + "$defs": {} +} +``` + +## Appendix B — Normative Test Vectors + +## B.1 Test Vector Rules (Normative) + +- **Normativity:** Implementations claiming conformance MUST be able to process these message sequences and produce results that conform to the schemas and match the expected status. +- **Evidence:** Evidence values (IDs, timestamps) MAY differ unless the vector explicitly requires stability (see B.3). + +## B.2 Test Vector Set A — Parallel Groups + Recovery + +### B.2.1 Scenario + +Goal: “Onboard customer c123” with parallel execution, failure, and compensation. + +1. **Group G1:** `reserve_username` (Identity), `create_customer` (CRM) -> Both Succeed. +2. **Group G2:** `provision_workspace` -> Returns `unknown`. +3. **Action:** Orchestrator verifies `provision_workspace` -> `not_verified` (Effect did not happen). +4. **Action:** Orchestrator retries `provision_workspace` -> Fails deterministically. +5. **Recovery:** Orchestrator aborts. Compensates G1 in parallel. + +### B.2.2 `saga.start` + +**Request:** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-1", + "method": "saga.start", + "params": { + "saga_id": "tvA-saga-001", + "goal": "Onboard customer c123", + "steps": [ + { + "step_id": "reserve_username", + "group": "g1", + "participant": "a2a://agent/identity", + "execute": { + "action": "reserve_username", + "args": { "customer_id": "c123", "username": "acme" }, + "idempotency_key": "tvA-saga-001:reserve_username:exec" + }, + "compensate": { + "action": "release_username", + "args": { "username": "acme" }, + "idempotency_key": "tvA-saga-001:reserve_username:comp" + }, + "reversibility": "full", + "criticality": "medium" + }, + { + "step_id": "create_customer_record", + "group": "g1", + "participant": "a2a://agent/crm", + "execute": { + "action": "create_customer", + "args": { "customer_id": "c123", "name": "ACME" }, + "idempotency_key": "tvA-saga-001:create_customer_record:exec" + }, + "compensate": { + "action": "delete_customer", + "args": { "customer_id": "c123" }, + "idempotency_key": "tvA-saga-001:create_customer_record:comp" + }, + "reversibility": "full", + "criticality": "high" + }, + { + "step_id": "provision_workspace", + "group": "g2", + "participant": "a2a://agent/provision", + "execute": { + "action": "provision_workspace", + "args": { "customer_id": "c123", "plan": "pro" }, + "idempotency_key": "tvA-saga-001:provision_workspace:exec" + }, + "verify": { + "action": "get_workspace", + "args": { "customer_id": "c123" } + }, + "reversibility": "full", + "criticality": "high" + } + ] + } +} +``` + +**Result:** `{"status": "accepted", "state": "saga_running"}` + +### B.2.3 Execute Group G1 (Parallel) + +**Request (Identity):** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-2", + "method": "saga.step.execute", + "params": { + "saga_id": "tvA-saga-001", + "step_id": "reserve_username", + "execute": { ... } + } +} +``` + +**Result:** `{"status": "succeeded", "evidence": {"reservation_id": "resv-001"}}` + +**Request (CRM):** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-3", + "method": "saga.step.execute", + "params": { + "saga_id": "tvA-saga-001", + "step_id": "create_customer_record", + "execute": { ... } + } +} +``` + +**Result:** `{"status": "succeeded", "evidence": {"customer_row_id": "crm-777"}}` + +### B.2.4 Execute Group G2 (Unknown Outcome) + +**Request:** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-4", + "method": "saga.step.execute", + "params": { + "saga_id": "tvA-saga-001", + "step_id": "provision_workspace", + "execute": { ... } + } +} +``` + +**Result:** + +```json +{ + "status": "unknown", + "failure": { + "failure_class": "unknown", + "reason": "timeout after dispatch" + } +} +``` + +### B.2.5 Verification + +**Request:** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-5", + "method": "saga.step.verify", + "params": { + "saga_id": "tvA-saga-001", + "step_id": "provision_workspace", + "verify": { ... } + } +} +``` + +**Result:** + +```json +{ + "status": "not_verified", + "details": { "workspace": null } +} +``` + +### B.2.6 Retry & Failure + +Request: (Retry of tvA-4) + +Result: + +```json +{ + "status": "failed", + "failure": { + "failure_class": "deterministic", + "reason": "Plan not available" + } +} +``` + +### B.2.7 Abort & Compensation + +**Request:** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-7", + "method": "saga.abort", + "params": { + "saga_id": "tvA-saga-001", + "reason": "provision_workspace failed" + } +} +``` + +**Result:** `{"status": "aborting", "state": "saga_compensating"}` + +**Request (Compensate Identity):** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-8", + "method": "saga.step.compensate", + "params": { + "saga_id": "tvA-saga-001", + "step_id": "reserve_username", + "compensate": { ... } + } +} +``` + +**Result:** `{"status": "compensated", "evidence": {"released": true}}` + +**Request (Compensate CRM):** + +```json +{ + "jsonrpc": "2.0", + "id": "tvA-9", + "method": "saga.step.compensate", + "params": { + "saga_id": "tvA-saga-001", + "step_id": "create_customer_record", + "compensate": { ... } + } +} +``` + +**Result:** `{"status": "compensated", "evidence": {"deleted": true}}` + +## B.3 Test Vector Set B — Idempotent Replay + +### B.3.1 Scenario + +Replaying the same `idempotency_key` for execute MUST return consistent output. + +**Request:** (Identical to tvA-2) + +```json +{ + "jsonrpc": "2.0", + "id": "tvB-1", + "method": "saga.step.execute", + "params": { + "saga_id": "tvA-saga-001", + "step_id": "reserve_username", + "execute": { + "action": "reserve_username", + "args": { "customer_id": "c123", "username": "acme" }, + "idempotency_key": "tvA-saga-001:reserve_username:exec" + } + } +} +``` + +**Result:** (MUST match tvA-2) + +```json +{ + "status": "succeeded", + "evidence": { "reservation_id": "resv-001" } +} +``` diff --git a/extensions/saga/saga_orchestrator_demo.py b/extensions/saga/saga_orchestrator_demo.py new file mode 100644 index 00000000..fc9bfadf --- /dev/null +++ b/extensions/saga/saga_orchestrator_demo.py @@ -0,0 +1,541 @@ +"""Demo implementation of the A2A-SAGA extension orchestrator. + +This module provides a demonstration of the Saga Orchestrator pattern +as specified in the A2A-SAGA RFC for multi-agent workflow management. +""" + +import asyncio +import json +import logging + +from collections import OrderedDict +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + + +# --- Configuration & Constants --- +SAGA_EXTENSION_URI = 'https://a2a.dev/extensions/saga/v1' +A2A_EXTENSION_HEADER_KEY = 'A2A-Extensions' + +# Setup Logging +logging.basicConfig( + level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger('SagaOrchestrator') + + +# --- Mock Response Lookup Table --- +_MOCK_RESPONSES: dict[tuple[str, str], dict] = { + ('reserve_username', 'saga.step.execute'): { + 'status': 'succeeded', + 'evidence': {'reservation_id': 'resv-001'}, + }, + ('create_customer_record', 'saga.step.execute'): { + 'status': 'succeeded', + 'evidence': {'customer_row_id': 'crm-777'}, + }, + ('provision_workspace', 'saga.step.verify'): { + 'status': 'not_verified', + 'details': {'workspace': None}, + }, + ('reserve_username', 'saga.step.compensate'): { + 'status': 'compensated', + 'evidence': {'released': True}, + }, + ('create_customer_record', 'saga.step.compensate'): { + 'status': 'compensated', + 'evidence': {'deleted': True}, + }, +} + + +def _make_default_response() -> dict: + """Create the default success response.""" + return {'status': 'succeeded', 'evidence': {}} + + +def _make_provision_response(idempotency_key: str | None) -> dict: + """Create the provision_workspace execute response based on retry state.""" + if idempotency_key and idempotency_key.endswith(':retry-1'): + return { + 'status': 'failed', + 'failure': { + 'failure_class': 'deterministic', + 'reason': 'Plan not available', + }, + } + return { + 'status': 'unknown', + 'failure': { + 'failure_class': 'unknown', + 'reason': 'timeout after dispatch', + }, + } + + +# --- Mocks for A2A SDK (Replace with actual imports in production) --- +class A2AClient: + """Mock client simulating JSON-RPC calls to remote agents. + + In a real implementation, this would be `a2a.Client`. + """ + + async def call_method( + self, url: str, method: str, params: dict, headers: dict + ) -> dict: + """Simulates the network call. + + Replace this with actual HTTP/JSON-RPC transport logic. + """ + logger.info('--> OUTBOUND RPC: %s to %s', method, url) + logger.debug(' Params: %s', json.dumps(params)) + logger.debug(' Headers: %s', json.dumps(headers)) + + # SIMULATION LOGIC FOR TEST VECTOR A + B (RFC Appendix B) + idempotency_key = self._extract_idempotency_key(method, params) + + if idempotency_key in getattr(self, '_idempotency_store', {}): + return self._idempotency_store[idempotency_key] + + step_id = params.get('step_id', '') + response = self._get_mock_response(step_id, method, idempotency_key) + + if idempotency_key: + self._store_response(idempotency_key, response) + + return response + + def _extract_idempotency_key(self, method: str, params: dict) -> str | None: + """Extract the idempotency key from params based on method.""" + if method == 'saga.step.execute': + return params.get('execute', {}).get('idempotency_key') + if method == 'saga.step.compensate': + return params.get('compensate', {}).get('idempotency_key') + return None + + def _get_mock_response( + self, step_id: str, method: str, idempotency_key: str | None + ) -> dict: + """Get the mock response for the given step and method.""" + key = (step_id, method) + if key in _MOCK_RESPONSES: + return _MOCK_RESPONSES[key].copy() + + if key == ('provision_workspace', 'saga.step.execute'): + return _make_provision_response(idempotency_key) + + return _make_default_response() + + def _store_response(self, idempotency_key: str, response: dict) -> None: + """Store the response for idempotency.""" + if not hasattr(self, '_idempotency_store'): + self._idempotency_store = {} + self._idempotency_store[idempotency_key] = response + + +# --- Data Structures --- +@dataclass(frozen=True) +class ActionSpec: + """Specification for an action to be executed.""" + + action: str + args: dict[str, Any] + + +@dataclass(frozen=True) +class ExecuteSpec(ActionSpec): + """Specification for executing an action with idempotency.""" + + idempotency_key: str + + +@dataclass(frozen=True) +class CompensateSpec(ActionSpec): + """Specification for compensating an action with idempotency.""" + + idempotency_key: str + + +@dataclass(frozen=True) +class StepDefinition: + """Definition of a single step in a saga.""" + + step_id: str + participant: str + execute: ExecuteSpec + reversibility: str + criticality: str + group: str | None = None + verify: ActionSpec | None = None + compensate: CompensateSpec | None = None + metadata: dict[str, Any] | None = None + + +@dataclass(frozen=True) +class SagaDefinition: + """Definition of a complete saga with all its steps.""" + + saga_id: str + goal: str + steps: list[StepDefinition] + policy: dict[str, Any] | None = None + metadata: dict[str, Any] | None = None + + +@dataclass +class SagaContext: + """Runtime context for a saga execution.""" + + saga_id: str + applied_steps_by_group: OrderedDict[str, list[StepDefinition]] = field( + default_factory=OrderedDict + ) + retry_counts: dict[str, int] = field(default_factory=dict) + + +class StepResult(Enum): + """Enumeration of possible step result statuses.""" + + SUCCEEDED = 'succeeded' + FAILED = 'failed' + UNKNOWN = 'unknown' + + +class SagaFailedError(Exception): + """Custom exception for saga failures.""" + + +# --- The Orchestrator Implementation --- +class SagaOrchestrator: + """Orchestrator for executing sagas across multiple agents.""" + + def __init__(self, client: A2AClient) -> None: + """Initialize the orchestrator with an A2A client.""" + self.client = client + + def _get_headers(self) -> dict[str, str]: + """Normative Requirement: 5.1 Header Negotiation.""" + return { + 'Content-Type': 'application/json', + A2A_EXTENSION_HEADER_KEY: SAGA_EXTENSION_URI, + } + + async def run_saga(self, saga_def: SagaDefinition) -> dict: + """Main entry point for executing a saga.""" + saga_id = saga_def.saga_id + steps = saga_def.steps + ctx = SagaContext(saga_id=saga_id) + + logger.info('Starting Saga %s', saga_id) + + # 1. Group steps by 'group' ID for parallel execution + # Logic: Sequential groups, parallel steps within groups. + grouped_steps = self._group_steps(steps) + ctx.applied_steps_by_group = OrderedDict( + (gid, []) for gid in grouped_steps + ) + + try: + for group_id, group_steps in grouped_steps.items(): + logger.info( + 'Executing Group: %s (%d steps)', group_id, len(group_steps) + ) + + # Execute group in parallel (O1 Conformance) + results = await asyncio.gather( + *[ + self._execute_step(ctx, step, group_id) + for step in group_steps + ] + ) + + # Check for failures in the group + failed_results = [ + r + for r in results + if r['status'] != StepResult.SUCCEEDED.value + ] + + if failed_results: + self._handle_group_failure(group_id, failed_results) + + except SagaFailedError as e: + logger.warning('Aborting Saga: %s', e) + await self._compensate_saga(ctx) + return {'status': 'compensated', 'error': str(e)} + + logger.info('Saga %s Completed Successfully!', saga_id) + return {'status': 'completed', 'ctx': ctx} + + def _handle_group_failure( + self, group_id: str, failed_results: list[dict] + ) -> None: + """Handle saga failure for a group.""" + logger.error('Group %s failed. Initiating Compensation.', group_id) + raise SagaFailedError( + 'Saga Failed at group %s: %s', group_id, failed_results[0] + ) + + def _group_steps( + self, steps: list[StepDefinition] + ) -> dict[str, list[StepDefinition]]: + """Helper to preserve order of groups but allow parallel steps within.""" + groups: dict[str, list[StepDefinition]] = OrderedDict() + for step in steps: + g_id = step.group or f'default_{step.step_id}' + if g_id not in groups: + groups[g_id] = [] + groups[g_id].append(step) + return groups + + async def _execute_step( + self, ctx: SagaContext, step: StepDefinition, group_id: str + ) -> dict: + """Executes a single step, handling Idempotency, Headers, and Unknown outcomes.""" + step_id = step.step_id + + # Normative Requirement: 9.1 Idempotency Keys + if not step.execute.idempotency_key: + raise ValueError( + 'Missing execute.idempotency_key for step %s', step_id + ) + + payload = { + 'saga_id': ctx.saga_id, + 'step_id': step_id, + 'execute': { + 'action': step.execute.action, + 'args': step.execute.args, + 'idempotency_key': step.execute.idempotency_key, + }, + } + + # RPC Call + response = await self.client.call_method( + url=step.participant, + method='saga.step.execute', + params=payload, + headers=self._get_headers(), + ) + + status = response.get('status') + + # Normative Requirement: 8.2 Verify Unknown Outcomes + if status == StepResult.UNKNOWN.value and step.verify is not None: + logger.info('Step %s returned UNKNOWN. Verifying...', step_id) + verification = await self._verify_step(ctx, step) + if verification['status'] == 'verified': + status = StepResult.SUCCEEDED.value + elif verification['status'] == 'not_verified': + logger.info( + 'Verification failed for %s. Retrying execute once.', + step_id, + ) + response = await self._retry_execute_step(ctx, step) + status = response.get('status') + + if status in ( + StepResult.FAILED.value, + StepResult.UNKNOWN.value, + 'pending_approval', + ): + response.setdefault( + 'failure', + {'failure_class': 'unknown', 'reason': 'unspecified failure'}, + ) + + # Record success for potential future compensation + if status == StepResult.SUCCEEDED.value: + ctx.applied_steps_by_group[group_id].append(step) + + return {'step_id': step_id, 'status': status, 'raw': response} + + async def _verify_step( + self, ctx: SagaContext, step: StepDefinition + ) -> dict: + """Verifies the outcome of a step.""" + if step.verify is None: + raise ValueError('Missing verify action for step %s', step.step_id) + + payload = { + 'saga_id': ctx.saga_id, + 'step_id': step.step_id, + 'verify': {'action': step.verify.action, 'args': step.verify.args}, + } + return await self.client.call_method( + url=step.participant, + method='saga.step.verify', + params=payload, + headers=self._get_headers(), + ) + + async def _retry_execute_step( + self, ctx: SagaContext, step: StepDefinition + ) -> dict: + """Retries a step execution with a new idempotency key.""" + step_id = step.step_id + retry_count = ctx.retry_counts.get(step_id, 0) + 1 + ctx.retry_counts[step_id] = retry_count + + retry_payload = { + 'saga_id': ctx.saga_id, + 'step_id': step_id, + 'execute': { + 'action': step.execute.action, + 'args': step.execute.args, + 'idempotency_key': f'{step.execute.idempotency_key}:retry-{retry_count}', + }, + } + + return await self.client.call_method( + url=step.participant, + method='saga.step.execute', + params=retry_payload, + headers=self._get_headers(), + ) + + async def _compensate_saga(self, ctx: SagaContext) -> None: + """Normative Requirement: 7.3 Compensation Ordering. + + Executes compensation in Reverse Order (LIFO). + """ + logger.info('--- STARTING COMPENSATION PHASE ---') + + # Reverse group order; parallel within a group. + for group_id in reversed(list(ctx.applied_steps_by_group)): + group_steps = ctx.applied_steps_by_group[group_id] + if not group_steps: + continue + await asyncio.gather( + *[self._compensate_step(ctx, step) for step in group_steps] + ) + + logger.info('--- COMPENSATION COMPLETE ---') + + async def _compensate_step( + self, ctx: SagaContext, step: StepDefinition + ) -> None: + """Compensates a single step.""" + if step.compensate is None: + logger.warning( + 'Skipping step %s (No compensation defined)', step.step_id + ) + return + + step_id = step.step_id + if not step.compensate.idempotency_key: + raise ValueError( + 'Missing compensate.idempotency_key for step %s', step_id + ) + + payload = { + 'saga_id': ctx.saga_id, + 'step_id': step_id, + 'compensate': { + 'action': step.compensate.action, + 'args': step.compensate.args, + 'idempotency_key': step.compensate.idempotency_key, + }, + } + + logger.info('Compensating %s...', step_id) + await self.client.call_method( + url=step.participant, + method='saga.step.compensate', + params=payload, + headers=self._get_headers(), + ) + + +# --- Test Vector Runner --- +async def main() -> None: + """Runs the saga test vectors from the RFC.""" + client = A2AClient() + orchestrator = SagaOrchestrator(client) + + # Define the Saga from Test Vector A (Appendix B.2) + saga_definition = SagaDefinition( + saga_id='tvA-saga-001', + goal='Onboard customer c123', + steps=[ + # Group 1: Parallel + StepDefinition( + step_id='reserve_username', + group='g1', + participant='a2a://agent/identity', + execute=ExecuteSpec( + action='reserve_username', + args={'customer_id': 'c123', 'username': 'acme'}, + idempotency_key='tvA-saga-001:reserve_username:exec', + ), + compensate=CompensateSpec( + action='release_username', + args={'username': 'acme'}, + idempotency_key='tvA-saga-001:reserve_username:comp', + ), + verify=ActionSpec( + action='get_reservation', args={'username': 'acme'} + ), + reversibility='full', + criticality='medium', + ), + StepDefinition( + step_id='create_customer_record', + group='g1', + participant='a2a://agent/crm', + execute=ExecuteSpec( + action='create_customer', + args={'customer_id': 'c123', 'name': 'ACME'}, + idempotency_key='tvA-saga-001:create_customer_record:exec', + ), + compensate=CompensateSpec( + action='delete_customer', + args={'customer_id': 'c123'}, + idempotency_key='tvA-saga-001:create_customer_record:comp', + ), + reversibility='full', + criticality='high', + ), + # Group 2: Dependent (Will Fail) + StepDefinition( + step_id='provision_workspace', + group='g2', + participant='a2a://agent/provision', + execute=ExecuteSpec( + action='provision_workspace', + args={'customer_id': 'c123', 'plan': 'pro'}, + idempotency_key='tvA-saga-001:provision_workspace:exec', + ), + verify=ActionSpec( + action='get_workspace', args={'customer_id': 'c123'} + ), + reversibility='full', + criticality='high', + ), + ], + ) + + await orchestrator.run_saga(saga_definition) + + # Test Vector B: Idempotent replay of reserve_username execute. + tvb_payload = { + 'saga_id': 'tvA-saga-001', + 'step_id': 'reserve_username', + 'execute': { + 'action': 'reserve_username', + 'args': {'customer_id': 'c123', 'username': 'acme'}, + 'idempotency_key': 'tvA-saga-001:reserve_username:exec', + }, + } + tvb_response = await client.call_method( + url='a2a://agent/identity', + method='saga.step.execute', + params=tvb_payload, + headers=orchestrator._get_headers(), # noqa: SLF001 + ) + logger.info('Test Vector B replay response: %s', tvb_response) + + +if __name__ == '__main__': + asyncio.run(main())