Skip to content

[FEATURE] Async reading from S3 Session Manager List Message #1416

@Unshure

Description

@Unshure

Problem Statement

I would like Strands to list messages asynchronously from S3 to improve its performance.

Proposed Solution

We will spawn a thread and start an event_loop in that thread. This threaded event loop will call s3 asynchronously for reading the objects it needs to from s3.

Use Case

Call aws boto s3 get object asynchronously

Alternatives Solutions

N/A


Implementation Requirements

Based on clarification discussion and repository analysis:

Technical Approach

  • Target File: src/strands/session/s3_session_manager.py
  • Target Method: list_messages() (lines 259-300)
  • Pattern: Use threaded async event loop (similar to existing run_async() utility)
  • Performance Bottleneck: Current sequential S3 reads (lines 292-295)

Dependencies

Scope

  • ONLY convert list_messages() to async
  • DO NOT convert other S3 operations (read_message, write_s3_object, delete_session, etc.)
  • Maintain both async and sync S3 clients in S3SessionManager

Implementation Details

1. Add aiobotocore Client

  • Create an async S3 client using aiobotocore alongside existing sync boto3 client
  • Initialize in __init__() method
  • Ensure proper cleanup/resource management

2. Implement Async list_messages()

  • Create async version of message reading logic
  • Use asyncio.gather() or similar to read multiple messages concurrently
  • Maintain message ordering (sort by message index)
  • Use existing run_async() utility from src/strands/_async.py to execute async code

3. Integration Pattern

# Use run_async() to spawn thread and run async event loop
from strands._async import run_async
...
# Pseudocode example
def list_messages(self, session_id, agent_id, limit=None, offset=0, **kwargs):

    
    async def async_list_messages():
        # 1. List message keys (can stay sync or convert to async)
        # 2. Sort and paginate keys
        # 3. Read messages concurrently using asyncio.gather()
        # 4. Return ordered list
        pass
    
    return run_async(async_list_messages)

Configuration

  • Default Behavior: Async reading is the DEFAULT behavior
  • No configuration parameters needed: All list_messages calls will use async
  • Backward Compatible: External API remains synchronous (async is internal implementation)

Error Handling

  • Fail Fast: If ANY S3 message read fails, raise an error
  • Stop the entire operation on first failure
  • Propagate errors to caller with appropriate error messages
  • Use existing SessionException for error reporting

Testing Strategy

  • Unit Tests: Existing tests in tests/strands/session/test_s3_session_manager.py should work
    • Tests use moto for AWS mocking
    • Tests should be agnostic to underlying sync vs async client
    • Verify tests still pass with async implementation
  • Integration Tests: Already exist, should continue to work
  • No new tests required: Existing test coverage should be sufficient

Files to Modify

  1. src/strands/session/s3_session_manager.py

    • Add aiobotocore client initialization
    • Modify list_messages() method
    • Add async helper method for concurrent message reading
  2. pyproject.toml

    • Add aiobotocore to dependencies

Acceptance Criteria

  • aiobotocore added to project dependencies
  • Async S3 client created and managed in S3SessionManager
  • list_messages() uses async reading with threaded event loop via run_async()
  • Messages are read concurrently from S3
  • Message ordering is preserved (sorted by message index)
  • Error handling: fail fast on any read error
  • All existing unit tests pass (tests/strands/session/test_s3_session_manager.py)
  • All existing integration tests pass
  • External API remains synchronous (no breaking changes)
  • Code follows existing patterns and style guidelines

Related Work

Implementation Notes

  • Reuse existing run_async() utility as-is (no modifications needed)
  • Don't worry about performance benchmarks or metrics
  • Focus on correctness and maintaining existing test coverage
  • Single cohesive implementation (no sub-task breakdown)

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions