Skip to content

[Feat]: Amazon S3 TaskStore Implementation #495

@juliangrueber

Description

@juliangrueber

Is your feature request related to a problem? Please describe.

To run the A2A server on a containerised environment, such as Amazon ECS Fargate, a in-memory TaskStore is not sufficient.

Describe the solution you'd like

To begin with, we could start storing the tasks on Amazon S3.

s3_task_store.py in https://github.com/a2aproject/a2a-python/tree/main/src/a2a/server/tasks

"""A2A TaskStore implementation for Amazon S3."""

import logging

import boto3
from a2a.server.context import ServerCallContext
from a2a.server.tasks import TaskStore
from a2a.types import Task
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


class S3TaskStore(TaskStore):
    """Amazon S3 implementation of TaskStore."""

    def __init__(self, bucket_name: str, boto_session: boto3.Session | None = None, prefix: str | None = None) -> None:
        """Initialize S3TaskStore with bucket name, optional boto session, and optional prefix.

        Args:
            bucket_name (str): Name of the S3 bucket to store tasks in.
            boto_session (boto3.Session | None): Optional boto3 session. If None, creates a new session.
            prefix (str | None): Optional prefix for S3 object keys.

        """
        if boto_session is None:
            boto_session = boto3.Session()

        self.s3 = boto_session.client("s3")
        self.bucket_name = bucket_name
        self.prefix = prefix

    def _get_object_key(self, task_id: str) -> str:
        """Generate S3 object key for a task.

        Args:
            task_id (str): The unique identifier for the task.

        Returns:
            str: The S3 object key for the task, including prefix if configured.

        """
        if self.prefix is not None:
            return f"{self.prefix}/{task_id}.json"
        return f"{task_id}.json"

    async def save(self, task: Task, context: ServerCallContext | None = None) -> None:  # noqa: ARG002 # context is unused but required by interface
        """Save task to S3 bucket.

        Args:
            task (Task): The task object to save to S3.
            context (ServerCallContext | None, optional): Server call context. Defaults to None.

        """
        try:
            # Convert task to JSON string
            task_json = task.model_dump_json(by_alias=True)

            # Upload to S3
            self.s3.put_object(
                Bucket=self.bucket_name,
                Key=self._get_object_key(task.id),
                Body=task_json,
                ContentType="application/json",
            )
        except Exception:
            logger.exception("Failed to save task to S3")
            raise

    async def get(self, task_id: str, context: ServerCallContext | None = None) -> Task | None:  # noqa: ARG002 # context is unused but required by interface
        """Retrieve task from S3 bucket.

        Args:
            task_id (str): The unique identifier for the task to retrieve.
            context (ServerCallContext | None, optional): Server call context. Defaults to None.

        Returns:
            Task | None: The task object if found, None if not found.

        """
        try:
            # Get object from S3
            response = self.s3.get_object(Bucket=self.bucket_name, Key=self._get_object_key(task_id))

            # Read and parse JSON content
            task_json = response["Body"].read().decode("utf-8")
            return Task.model_validate_json(task_json)
        except ClientError as e:
            if e.response["Error"]["Code"] == "NoSuchKey":
                return None
            logger.exception("Failed to get task from S3")
            raise

    async def delete(self, task_id: str, context: ServerCallContext | None = None) -> None:  # noqa: ARG002 # context is unused but required by interface
        """Delete task from S3 bucket.

        Args:
            task_id (str): The unique identifier for the task to delete.
            context (ServerCallContext | None, optional): Server call context. Defaults to None.

        """
        try:
            self.s3.delete_object(Bucket=self.bucket_name, Key=self._get_object_key(task_id))
        except Exception:
            logger.exception("Failed to delete task from S3")
            raise

Describe alternatives you've considered

Amazon DynamoDB
Amazon AgentCore Memory
Valkey

All might be suitable solutions, depending on the use case.

Additional context

No response

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions