diff --git a/libs/langgraph-checkpoint-aws/docs/dynamodb_store.md b/libs/langgraph-checkpoint-aws/docs/dynamodb_store.md new file mode 100644 index 000000000..73294eeda --- /dev/null +++ b/libs/langgraph-checkpoint-aws/docs/dynamodb_store.md @@ -0,0 +1,265 @@ +# DynamoDB Store for LangGraph + +A DynamoDB-backed store implementation for LangGraph that provides persistent key-value storage with hierarchical namespaces. + +## Features + +- ✅ **Persistent Storage**: Durable storage using AWS DynamoDB +- ✅ **Hierarchical Namespaces**: Organize data with multi-level namespaces +- ✅ **TTL Support**: Automatic item expiration with configurable time-to-live +- ✅ **Filtering**: Basic filtering capabilities for search operations +- ✅ **Batch Operations**: Efficient batch processing of multiple operations +- ✅ **Cost-Effective**: Pay-per-request billing for unpredictable workloads + +## Installation + +```bash +pip install langgraph-checkpoint-aws +``` + +## Quick Start + +```python +from langgraph_checkpoint_aws import DynamoDBStore + +# Create a store instance +store = DynamoDBStore(table_name="my-store-table") + +# Setup the table (creates it if it doesn't exist) +store.setup() + +# Store and retrieve data +store.put(("users", "123"), "prefs", {"theme": "dark"}) +item = store.get(("users", "123"), "prefs") +print(item.value) # {"theme": "dark"} +``` + +## Basic Usage + +### Storing Documents + +```python +# Store a document with hierarchical namespace +store.put( + ("documents", "user123"), + "report_1", + { + "text": "Machine learning report on customer behavior analysis...", + "tags": ["ml", "analytics", "report"], + "author": "data_scientist" + } +) +``` + +### Retrieving Documents + +```python +# Get a specific document +item = store.get(("documents", "user123"), "report_1") +print(f"Text: {item.value['text']}") +print(f"Created: {item.created_at}") +print(f"Updated: {item.updated_at}") +``` + +### Searching + +```python +# Search all documents in a namespace +results = store.search(("documents", "user123")) + +# Search with filter +results = store.search( + ("documents", "user123"), + filter={"author": "data_scientist"} +) +``` + +### Deleting Items + +```python +store.delete(("documents", "user123"), "report_1") +``` + +## Advanced Features + +### Time-To-Live (TTL) + +Configure automatic item expiration: + +```python +store = DynamoDBStore( + table_name="my-store-table", + ttl={ + "default_ttl": 60, # 60 minutes default TTL + "refresh_on_read": True, # Refresh TTL on reads + } +) +store.setup() + +# Item will expire after 60 minutes +store.put(("temp", "session_123"), "data", {"value": "temporary data"}) + +# Custom TTL for specific item (30 minutes) +store.put( + ("temp", "session_123"), + "short_lived", + {"value": "expires soon"}, + ttl=30 +) +``` + +### Listing Namespaces + +```python +# List all namespaces +namespaces = store.list_namespaces() + +# List with prefix filter +user_namespaces = store.list_namespaces(prefix=("users",)) + +# Limit depth +shallow_namespaces = store.list_namespaces(max_depth=2) +``` + +### Batch Operations + +```python +from langgraph.store.base import PutOp, GetOp + +# Batch put operations +ops = [ + PutOp(("batch",), "item1", {"value": 1}, None, None), + PutOp(("batch",), "item2", {"value": 2}, None, None), + PutOp(("batch",), "item3", {"value": 3}, None, None), +] +results = store.batch(ops) + +# Batch get operations +get_ops = [ + GetOp(("batch",), "item1", False), + GetOp(("batch",), "item2", False), +] +items = store.batch(get_ops) +``` + +### Context Manager + +```python +with DynamoDBStore.from_conn_string("my-store-table") as store: + store.setup() + store.put(("test",), "example", {"data": "value"}) + item = store.get(("test",), "example") +``` + +## Configuration Options + +### Constructor Parameters + +- `table_name` (str): Name of the DynamoDB table +- `region_name` (str, optional): AWS region name +- `boto3_session` (boto3.Session, optional): Custom boto3 session +- `ttl` (TTLConfig, optional): TTL configuration +- `max_read_capacity_units` (int, optional): Max read capacity (default: 10) +- `max_write_capacity_units` (int, optional): Max write capacity (default: 10) + +### TTL Configuration + +```python +ttl = { + "default_ttl": 60, # Default TTL in minutes + "refresh_on_read": True, # Refresh TTL when items are read +} +``` + +## DynamoDB Table Schema + +The store uses a single DynamoDB table with the following structure: + +- **PK** (Partition Key, String): Namespace joined with ':' +- **SK** (Sort Key, String): Item key +- **value** (Map): The stored dictionary +- **created_at** (String): ISO format timestamp +- **updated_at** (String): ISO format timestamp +- **expires_at** (Number, optional): Unix timestamp for TTL + +## AWS Configuration + +Ensure you have proper AWS credentials configured through: + +- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) +- AWS credentials file (`~/.aws/credentials`) +- IAM role when running on AWS services + +Required IAM permissions: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "dynamodb:CreateTable", + "dynamodb:DescribeTable", + "dynamodb:PutItem", + "dynamodb:GetItem", + "dynamodb:Query", + "dynamodb:Scan", + "dynamodb:DeleteItem", + "dynamodb:UpdateItem", + "dynamodb:UpdateTimeToLive" + ], + "Resource": "arn:aws:dynamodb:*:*:table/your-table-name" + } + ] +} +``` + +## Comparison with Other Stores + +### DynamoDB Store vs Valkey Store + +| Feature | DynamoDB Store | Valkey Store | +|---------|---------------|--------------| +| Vector Search | ❌ No | ✅ Yes | +| High Performance | ✅ Good | ✅ Excellent | +| TTL Support | ✅ Yes | ✅ Yes | +| Cost | Pay-per-request | Infrastructure cost | +| Best For | Simple storage, managed infra | Vector search, high performance | + +Use **DynamoDB Store** when: +- You need a fully managed solution +- You don't require vector search capabilities +- You want pay-per-request pricing +- Your workload is unpredictable + +Use **Valkey Store** when: +- You need vector search capabilities +- You require ultra-low latency +- You can manage your own infrastructure +- You have consistent, predictable workloads + +## Limitations + +- **No Vector Search**: This store does not support semantic/vector search +- **Scan Cost**: Listing namespaces uses DynamoDB Scan which can be expensive +- **Filter Limitations**: Basic filtering only (equality checks) +- **No Transactions**: Operations are not transactional across multiple items + +## Examples + +See the [example notebook](../../samples/memory/dynamodb_store.ipynb) for comprehensive usage examples. + +## Contributing + +Contributions are welcome! Please see the main [CONTRIBUTING.md](../../libs/langgraph-checkpoint-aws/CONTRIBUTING.md) for guidelines. + +## License + +This package is part of the `langgraph-checkpoint-aws` project. See [LICENSE](../../LICENSE) for details. + +## Related Resources + +- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/) +- [AWS DynamoDB Documentation](https://docs.aws.amazon.com/dynamodb/) +- [BaseStore Interface](https://langchain-ai.github.io/langgraph/reference/store/) diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/__init__.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/__init__.py index d663c274e..aa85c520e 100644 --- a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/__init__.py +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/__init__.py @@ -45,6 +45,9 @@ ValkeyConnectionError, ValkeyStoreError, ) + from langgraph_checkpoint_aws.store.dynamodb import ( + DynamoDBStore, + ) valkey_available = True except ImportError as e: @@ -102,6 +105,7 @@ def _missing_dependencies_error(*args: Any, **kwargs: Any) -> Any: "ValkeySaver", "ValkeyCache", "DynamoDBSaver", + "DynamoDBStore", "SDK_USER_AGENT", "valkey_available", ] diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/__init__.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/__init__.py index 9a1634ee8..e0872055a 100644 --- a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/__init__.py +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/__init__.py @@ -8,8 +8,9 @@ # Conditional imports for optional dependencies try: from .valkey import AsyncValkeyStore, ValkeyIndexConfig, ValkeyStore + from .dynamodb import DynamoDBStore - __all__ = ["AsyncValkeyStore", "ValkeyStore", "ValkeyIndexConfig"] + __all__ = ["AsyncValkeyStore", "ValkeyStore", "ValkeyIndexConfig", "DynamoDBStore"] except ImportError as e: # Store the error for later use _import_error = e @@ -26,5 +27,6 @@ def _missing_dependencies_error(*args: Any, **kwargs: Any) -> Any: AsyncValkeyStore: type[Any] = _missing_dependencies_error # type: ignore[assignment,no-redef] ValkeyIndexConfig: type[Any] = _missing_dependencies_error # type: ignore[assignment,no-redef] ValkeyStore: type[Any] = _missing_dependencies_error # type: ignore[assignment,no-redef] + DynamoDBStore: type[Any] = _missing_dependencies_error # type: ignore[assignment,no-redef] - __all__ = ["AsyncValkeyStore", "ValkeyStore", "ValkeyIndexConfig"] + __all__ = ["AsyncValkeyStore", "ValkeyStore", "ValkeyIndexConfig", "DynamoDBStore"] diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/__init__.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/__init__.py new file mode 100644 index 000000000..d6f3a3925 --- /dev/null +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/__init__.py @@ -0,0 +1,5 @@ +"""DynamoDB store implementation for LangGraph checkpoint AWS.""" + +from langgraph_checkpoint_aws.store.dynamodb.base import DynamoDBStore + +__all__ = ["DynamoDBStore"] diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/base.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/base.py new file mode 100644 index 000000000..2c68b2858 --- /dev/null +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/base.py @@ -0,0 +1,657 @@ +"""DynamoDB store implementation for LangGraph. + +This module provides a DynamoDB-backed store implementation that extends +the BaseStore class from LangGraph. It offers persistent storage with +hierarchical namespaces and key-value operations without vector search +capabilities. +""" + +from __future__ import annotations + +import logging +from collections.abc import Iterable, Iterator +from contextlib import contextmanager +from datetime import datetime, timezone +from typing import Any, cast + +import boto3 +import orjson +from botocore.exceptions import ClientError +from langgraph.store.base import ( + BaseStore, + GetOp, + Item, + ListNamespacesOp, + Op, + PutOp, + Result, + SearchItem, + SearchOp, + TTLConfig, +) + +from .exceptions import DynamoDBConnectionError, TableCreationError, ValidationError + +logger = logging.getLogger(__name__) + + +class DynamoDBStore(BaseStore): + """DynamoDB-backed store implementation for LangGraph. + + This store provides persistent key-value storage using AWS DynamoDB. + It supports hierarchical namespaces, TTL (time-to-live) for automatic + item expiration, and basic filtering capabilities. + + The store uses a single DynamoDB table with the following schema: + - PK (Partition Key): Namespace (joined with ':') + - SK (Sort Key): Item key + - value: The stored dictionary + - created_at: ISO format timestamp + - updated_at: ISO format timestamp + - expires_at: Unix timestamp for TTL (optional) + + Examples: + Basic usage: + ```python + from langgraph_checkpoint_aws.store.dynamodb import DynamoDBStore + + store = DynamoDBStore(table_name="my-store-table") + store.setup() # Create table if it doesn't exist + + # Store and retrieve data + store.put(("users", "123"), "prefs", {"theme": "dark"}) + item = store.get(("users", "123"), "prefs") + print(item.value) # {"theme": "dark"} + ``` + + Using context manager: + ```python + from langgraph_checkpoint_aws.store.dynamodb import DynamoDBStore + + with DynamoDBStore.from_conn_string("my-store-table") as store: + store.setup() + store.put(("docs",), "doc1", {"text": "Hello"}) + items = store.search(("docs",)) + ``` + + With TTL configuration: + ```python + store = DynamoDBStore( + table_name="my-store-table", + ttl={ + "default_ttl": 60, # 60 minutes default TTL + "refresh_on_read": True, # Refresh TTL on reads + } + ) + store.setup() + + # Item will expire after 60 minutes + store.put(("temp",), "data", {"value": 123}) + ``` + + Note: + Make sure to call `setup()` before first use to create the necessary + DynamoDB table if it doesn't exist. + + Warning: + DynamoDB charges are based on read/write capacity and storage. + Consider using on-demand billing for unpredictable workloads or + provisioned capacity for consistent traffic patterns. + """ + + MIGRATIONS: list[str] = [] + supports_ttl = True + + def __init__( + self, + table_name: str, + *, + region_name: str | None = None, + boto3_session: boto3.Session | None = None, + ttl: TTLConfig | None = None, + max_read_capacity_units: int | None = None, + max_write_capacity_units: int | None = None, + ) -> None: + """Initialize DynamoDB store. + + Args: + table_name: Name of the DynamoDB table to use. + region_name: AWS region name. If not provided, uses default from AWS config. + boto3_session: Optional boto3 session to use. If not provided, creates a new one. + ttl: Optional TTL configuration for automatic item expiration. + max_read_capacity_units: Maximum read capacity units for on-demand mode. + Only used when creating a new table. Default is 10. + max_write_capacity_units: Maximum write capacity units for on-demand mode. + Only used when creating a new table. Default is 10. + """ + super().__init__() + self.table_name = table_name + self.ttl_config = ttl + self.max_read_capacity_units = max_read_capacity_units or 10 + self.max_write_capacity_units = max_write_capacity_units or 10 + + # Initialize boto3 session and resources + if boto3_session: + self.session = boto3_session + else: + self.session = boto3.Session(region_name=region_name) + + try: + self.dynamodb = self.session.resource("dynamodb") + self.table = self.dynamodb.Table(table_name) + except Exception as e: + raise DynamoDBConnectionError( + f"Failed to initialize DynamoDB connection: {e}" + ) from e + + @classmethod + @contextmanager + def from_conn_string( + cls, + table_name: str, + *, + region_name: str | None = None, + ttl: TTLConfig | None = None, + max_read_capacity_units: int | None = None, + max_write_capacity_units: int | None = None, + ) -> Iterator[DynamoDBStore]: + """Create a DynamoDB store instance using a context manager. + + Args: + table_name: Name of the DynamoDB table to use. + region_name: AWS region name. If not provided, uses default from AWS config. + ttl: Optional TTL configuration for automatic item expiration. + max_read_capacity_units: Maximum read capacity units for on-demand mode. + max_write_capacity_units: Maximum write capacity units for on-demand mode. + + Yields: + DynamoDBStore: A DynamoDB store instance. + + Example: + ```python + with DynamoDBStore.from_conn_string("my-table") as store: + store.setup() + store.put(("docs",), "doc1", {"text": "Hello"}) + ``` + """ + store = cls( + table_name=table_name, + region_name=region_name, + ttl=ttl, + max_read_capacity_units=max_read_capacity_units, + max_write_capacity_units=max_write_capacity_units, + ) + try: + yield store + finally: + # No cleanup needed for DynamoDB client + pass + + def setup(self) -> None: + """Set up the DynamoDB table. + + This method creates the DynamoDB table if it doesn't already exist. + It configures the table with: + - On-demand billing mode + - Primary key: PK (partition key) and SK (sort key) + - TTL enabled on expires_at attribute (if TTL config provided) + + This should be called before first use of the store. + + Raises: + TableCreationError: If table creation fails. + """ + try: + # Try to load the table to check if it exists + self.table.load() + logger.info(f"DynamoDB table '{self.table_name}' already exists.") + + # Enable TTL if configured and not already enabled + if self.ttl_config: + self._enable_ttl() + + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + # Table doesn't exist, create it + logger.info(f"Creating DynamoDB table '{self.table_name}'...") + self._create_table() + else: + raise TableCreationError( + f"Failed to check/create table '{self.table_name}': {e}" + ) from e + + def _create_table(self) -> None: + """Create the DynamoDB table with appropriate configuration.""" + try: + table = self.dynamodb.create_table( + TableName=self.table_name, + KeySchema=[ + {"AttributeName": "PK", "KeyType": "HASH"}, # Partition key + {"AttributeName": "SK", "KeyType": "RANGE"}, # Sort key + ], + AttributeDefinitions=[ + {"AttributeName": "PK", "AttributeType": "S"}, + {"AttributeName": "SK", "AttributeType": "S"}, + ], + BillingMode="PAY_PER_REQUEST", + OnDemandThroughput={ + "MaxReadRequestUnits": self.max_read_capacity_units, + "MaxWriteRequestUnits": self.max_write_capacity_units, + }, + ) + # Wait for table to be created + table.wait_until_exists() + self.table = table + logger.info(f"DynamoDB table '{self.table_name}' created successfully.") + + # Enable TTL if configured + if self.ttl_config: + self._enable_ttl() + + except Exception as e: + raise TableCreationError( + f"Failed to create table '{self.table_name}': {e}" + ) from e + + def _enable_ttl(self) -> None: + """Enable TTL on the DynamoDB table.""" + try: + client = self.session.client("dynamodb") + client.update_time_to_live( + TableName=self.table_name, + TimeToLiveSpecification={"Enabled": True, "AttributeName": "expires_at"}, + ) + logger.info(f"TTL enabled on table '{self.table_name}'.") + except ClientError as e: + # TTL might already be enabled or enabling, log but don't fail + logger.warning(f"Could not enable TTL on table '{self.table_name}': {e}") + + def _construct_composite_key( + self, namespace: tuple[str, ...], key: str + ) -> tuple[str, str]: + """Construct DynamoDB composite key from namespace and key. + + Args: + namespace: Hierarchical namespace tuple. + key: Item key. + + Returns: + Tuple of (PK, SK) for DynamoDB. + """ + namespace_str = ":".join(namespace) + return (namespace_str, key) + + def _deconstruct_namespace(self, namespace: str) -> tuple[str, ...]: + """Deconstruct namespace string back to tuple. + + Args: + namespace: Namespace string (e.g., "users:123"). + + Returns: + Namespace tuple (e.g., ("users", "123")). + """ + if not namespace: + return () + if ":" in namespace: + return tuple(namespace.split(":")) + return (namespace,) + + def _map_to_item(self, result_dict: dict[str, Any], item_type: type = Item) -> Item: + """Map DynamoDB item to store Item. + + Args: + result_dict: DynamoDB item dictionary. + item_type: Type of item to create (Item or SearchItem). + + Returns: + Item or SearchItem instance. + """ + namespace = self._deconstruct_namespace(result_dict["PK"]) + key = result_dict["SK"] + value = result_dict["value"] + + # Parse timestamps + created_at = datetime.fromisoformat(result_dict["created_at"]) + updated_at = datetime.fromisoformat(result_dict["updated_at"]) + + return item_type( + value=value, + key=key, + namespace=namespace, + created_at=created_at, + updated_at=updated_at, + ) + + def _calculate_expiry(self, ttl_minutes: float | None) -> int | None: + """Calculate Unix timestamp for TTL expiry. + + Args: + ttl_minutes: TTL in minutes. + + Returns: + Unix timestamp for expiry, or None if no TTL. + """ + if ttl_minutes is None: + return None + # DynamoDB TTL expects Unix timestamp in seconds + expiry_seconds = int(datetime.now(timezone.utc).timestamp() + (ttl_minutes * 60)) + return expiry_seconds + + def batch(self, ops: Iterable[Op]) -> list[Result]: + """Execute multiple operations in a batch. + + Args: + ops: Iterable of operations (GetOp, PutOp, SearchOp, ListNamespacesOp). + + Returns: + List of results corresponding to each operation. + """ + results: list[Result] = [] + + for op in ops: + if isinstance(op, GetOp): + result = self._batch_get_op(op) + elif isinstance(op, PutOp): + result = self._batch_put_op(op) + elif isinstance(op, SearchOp): + result = self._batch_search_op(op) + elif isinstance(op, ListNamespacesOp): + result = self._batch_list_namespaces_op(op) + else: + raise NotImplementedError(f"Operation type {type(op)} not supported") + results.append(result) + + return results + + def _batch_get_op(self, op: GetOp) -> Item | None: + """Execute a GetOp operation. + + Args: + op: GetOp operation. + + Returns: + Item if found, None otherwise. + """ + composite_key = self._construct_composite_key(op.namespace, op.key) + try: + response = self.table.get_item(Key={"PK": composite_key[0], "SK": composite_key[1]}) + item = response.get("Item") + if item: + # Refresh TTL if configured + if op.refresh_ttl and self.ttl_config: + self._refresh_ttl(composite_key[0], composite_key[1]) + return self._map_to_item(item) + return None + except Exception as e: + logger.error(f"Error getting item {op.namespace}/{op.key}: {e}") + return None + + def _batch_put_op(self, op: PutOp) -> None: + """Execute a PutOp operation. + + Args: + op: PutOp operation. + """ + if op.value is None: + # Delete operation + self._delete_item(op.namespace, op.key) + else: + # Put operation + self._put_item(op.namespace, op.key, op.value, op.ttl) + return None + + def _batch_search_op(self, op: SearchOp) -> list[SearchItem]: + """Execute a SearchOp operation. + + Args: + op: SearchOp operation. + + Returns: + List of SearchItem instances. + """ + namespace_str = ":".join(op.namespace_prefix) + + try: + # Query items with the namespace prefix + response = self.table.query( + KeyConditionExpression="PK = :pk", + ExpressionAttributeValues={":pk": namespace_str}, + Limit=op.limit, + ) + + items = response.get("Items", []) + + # Apply filter if provided + if op.filter: + items = self._apply_filter(items, op.filter) + + # Apply offset + if op.offset > 0: + items = items[op.offset :] + + # Convert to SearchItem instances + results = [self._map_to_item(item, SearchItem) for item in items] + + # Refresh TTL if configured + if op.refresh_ttl and self.ttl_config: + for item in items: + self._refresh_ttl(item["PK"], item["SK"]) + + return results + + except Exception as e: + logger.error(f"Error searching namespace {op.namespace_prefix}: {e}") + return [] + + def _batch_list_namespaces_op(self, op: ListNamespacesOp) -> list[tuple[str, ...]]: + """Execute a ListNamespacesOp operation. + + Args: + op: ListNamespacesOp operation. + + Returns: + List of namespace tuples. + """ + try: + # Scan the table to get all unique namespaces + response = self.table.scan( + ProjectionExpression="PK", + ) + + namespaces_set = set() + for item in response.get("Items", []): + namespace = self._deconstruct_namespace(item["PK"]) + namespaces_set.add(namespace) + + # Handle pagination if more items exist + while "LastEvaluatedKey" in response: + response = self.table.scan( + ProjectionExpression="PK", + ExclusiveStartKey=response["LastEvaluatedKey"], + ) + for item in response.get("Items", []): + namespace = self._deconstruct_namespace(item["PK"]) + namespaces_set.add(namespace) + + # Filter namespaces based on match conditions + namespaces = list(namespaces_set) + filtered = self._filter_namespaces(namespaces, op) + + # Apply limit and offset + start = op.offset + end = start + op.limit + return filtered[start:end] + + except Exception as e: + logger.error(f"Error listing namespaces: {e}") + return [] + + def _filter_namespaces( + self, namespaces: list[tuple[str, ...]], op: ListNamespacesOp + ) -> list[tuple[str, ...]]: + """Filter namespaces based on operation criteria. + + Args: + namespaces: List of namespace tuples. + op: ListNamespacesOp with filter criteria. + + Returns: + Filtered list of namespaces. + """ + filtered = namespaces + + # Apply match conditions (prefix/suffix) + for condition in op.match_conditions: + if condition.match_type == "prefix": + prefix = condition.path + filtered = [ns for ns in filtered if ns[: len(prefix)] == prefix] + elif condition.match_type == "suffix": + suffix = condition.path + filtered = [ns for ns in filtered if ns[-len(suffix) :] == suffix] + + # Apply max_depth + if op.max_depth is not None: + filtered = [ns[: op.max_depth] for ns in filtered] + # Remove duplicates after truncation + filtered = list(dict.fromkeys(filtered)) + + return sorted(filtered) + + def _apply_filter( + self, items: list[dict[str, Any]], filter_dict: dict[str, Any] + ) -> list[dict[str, Any]]: + """Apply filter to items. + + Args: + items: List of DynamoDB items. + filter_dict: Filter criteria. + + Returns: + Filtered list of items. + """ + filtered_items = [] + for item in items: + value = item.get("value", {}) + if self._matches_filter(value, filter_dict): + filtered_items.append(item) + return filtered_items + + def _matches_filter(self, value: dict[str, Any], filter_dict: dict[str, Any]) -> bool: + """Check if value matches filter criteria. + + Args: + value: Item value dictionary. + filter_dict: Filter criteria. + + Returns: + True if value matches filter, False otherwise. + """ + for key, expected in filter_dict.items(): + if key not in value: + return False + if value[key] != expected: + return False + return True + + def _put_item( + self, + namespace: tuple[str, ...], + key: str, + value: dict[str, Any], + ttl: float | None, + ) -> None: + """Put an item into DynamoDB. + + Args: + namespace: Namespace tuple. + key: Item key. + value: Item value dictionary. + ttl: TTL in minutes (optional). + """ + composite_key = self._construct_composite_key(namespace, key) + current_time = datetime.now(timezone.utc).isoformat() + + # Check if item exists to preserve created_at + existing_item = None + try: + response = self.table.get_item(Key={"PK": composite_key[0], "SK": composite_key[1]}) + existing_item = response.get("Item") + except Exception: + pass + + item: dict[str, Any] = { + "PK": composite_key[0], + "SK": composite_key[1], + "value": value, + "created_at": existing_item["created_at"] if existing_item else current_time, + "updated_at": current_time, + } + + # Add TTL if configured + if ttl is not None: + expires_at = self._calculate_expiry(ttl) + if expires_at: + item["expires_at"] = expires_at + + try: + self.table.put_item(Item=item) + except Exception as e: + logger.error(f"Error putting item {namespace}/{key}: {e}") + raise + + def _delete_item(self, namespace: tuple[str, ...], key: str) -> None: + """Delete an item from DynamoDB. + + Args: + namespace: Namespace tuple. + key: Item key. + """ + composite_key = self._construct_composite_key(namespace, key) + try: + self.table.delete_item(Key={"PK": composite_key[0], "SK": composite_key[1]}) + except Exception as e: + logger.error(f"Error deleting item {namespace}/{key}: {e}") + raise + + def _refresh_ttl(self, pk: str, sk: str) -> None: + """Refresh TTL for an item. + + Args: + pk: Partition key. + sk: Sort key. + """ + if not self.ttl_config or not self.ttl_config.get("refresh_on_read"): + return + + default_ttl = self.ttl_config.get("default_ttl") + if default_ttl is None: + return + + expires_at = self._calculate_expiry(default_ttl) + if expires_at is None: + return + + try: + self.table.update_item( + Key={"PK": pk, "SK": sk}, + UpdateExpression="SET expires_at = :expires_at, updated_at = :updated_at", + ExpressionAttributeValues={ + ":expires_at": expires_at, + ":updated_at": datetime.now(timezone.utc).isoformat(), + }, + ) + except Exception as e: + logger.warning(f"Error refreshing TTL for {pk}/{sk}: {e}") + + async def abatch(self, ops: Iterable[Op]) -> list[Result]: + """Async batch operations are not supported. + + DynamoDBStore only supports synchronous operations. + + Raises: + NotImplementedError: Always raised for this synchronous store. + """ + raise NotImplementedError( + "Async batch operations are not supported by DynamoDBStore. " + "This is a synchronous store implementation." + ) diff --git a/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/exceptions.py b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/exceptions.py new file mode 100644 index 000000000..a39815999 --- /dev/null +++ b/libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/store/dynamodb/exceptions.py @@ -0,0 +1,25 @@ +"""Exceptions for DynamoDB store operations.""" + + +class DynamoDBStoreError(Exception): + """Base exception for DynamoDB store errors.""" + + pass + + +class DynamoDBConnectionError(DynamoDBStoreError): + """Exception raised when connection to DynamoDB fails.""" + + pass + + +class ValidationError(DynamoDBStoreError): + """Exception raised for validation errors.""" + + pass + + +class TableCreationError(DynamoDBStoreError): + """Exception raised when table creation fails.""" + + pass diff --git a/libs/langgraph-checkpoint-aws/tests/unit_tests/store/dynamodb/__init__.py b/libs/langgraph-checkpoint-aws/tests/unit_tests/store/dynamodb/__init__.py new file mode 100644 index 000000000..70fa9fea2 --- /dev/null +++ b/libs/langgraph-checkpoint-aws/tests/unit_tests/store/dynamodb/__init__.py @@ -0,0 +1 @@ +"""Test module initialization.""" diff --git a/libs/langgraph-checkpoint-aws/tests/unit_tests/store/dynamodb/test_dynamodb_store.py b/libs/langgraph-checkpoint-aws/tests/unit_tests/store/dynamodb/test_dynamodb_store.py new file mode 100644 index 000000000..22a39f33c --- /dev/null +++ b/libs/langgraph-checkpoint-aws/tests/unit_tests/store/dynamodb/test_dynamodb_store.py @@ -0,0 +1,415 @@ +"""Unit tests for DynamoDB store implementation.""" + +import pytest +from datetime import datetime, timezone +from unittest.mock import Mock, MagicMock, patch +from langgraph.store.base import ( + GetOp, + Item, + ListNamespacesOp, + MatchCondition, + PutOp, + SearchItem, + SearchOp, + TTLConfig, +) + +from langgraph_checkpoint_aws.store.dynamodb import DynamoDBStore +from langgraph_checkpoint_aws.store.dynamodb.exceptions import ( + DynamoDBConnectionError, + TableCreationError, +) + + +@pytest.fixture +def mock_boto3_session(): + """Mock boto3 session.""" + session = Mock() + return session + + +@pytest.fixture +def mock_dynamodb_resource(): + """Mock DynamoDB resource.""" + resource = Mock() + return resource + + +@pytest.fixture +def mock_dynamodb_table(): + """Mock DynamoDB table.""" + table = Mock() + table.table_name = "test_table" + return table + + +@pytest.fixture +def dynamodb_store(mock_boto3_session, mock_dynamodb_resource, mock_dynamodb_table): + """Create a DynamoDBStore instance with mocked dependencies.""" + with patch("langgraph_checkpoint_aws.store.dynamodb.base.boto3") as mock_boto3: + mock_boto3_session.resource.return_value = mock_dynamodb_resource + mock_dynamodb_resource.Table.return_value = mock_dynamodb_table + + with patch("boto3.Session", return_value=mock_boto3_session): + store = DynamoDBStore(table_name="test_table") + store.table = mock_dynamodb_table + return store + + +class TestDynamoDBStoreInit: + """Test DynamoDB store initialization.""" + + def test_init_basic(self, mock_boto3_session, mock_dynamodb_resource, mock_dynamodb_table): + """Test basic initialization.""" + with patch("boto3.Session", return_value=mock_boto3_session): + mock_boto3_session.resource.return_value = mock_dynamodb_resource + mock_dynamodb_resource.Table.return_value = mock_dynamodb_table + + store = DynamoDBStore(table_name="test_table") + + assert store.table_name == "test_table" + assert store.ttl_config is None + assert store.max_read_capacity_units == 10 + assert store.max_write_capacity_units == 10 + + def test_init_with_ttl(self, mock_boto3_session, mock_dynamodb_resource, mock_dynamodb_table): + """Test initialization with TTL config.""" + ttl_config = TTLConfig(default_ttl=60, refresh_on_read=True) + + with patch("boto3.Session", return_value=mock_boto3_session): + mock_boto3_session.resource.return_value = mock_dynamodb_resource + mock_dynamodb_resource.Table.return_value = mock_dynamodb_table + + store = DynamoDBStore(table_name="test_table", ttl=ttl_config) + + assert store.ttl_config == ttl_config + + def test_init_with_custom_capacity(self, mock_boto3_session, mock_dynamodb_resource, mock_dynamodb_table): + """Test initialization with custom capacity units.""" + with patch("boto3.Session", return_value=mock_boto3_session): + mock_boto3_session.resource.return_value = mock_dynamodb_resource + mock_dynamodb_resource.Table.return_value = mock_dynamodb_table + + store = DynamoDBStore( + table_name="test_table", + max_read_capacity_units=20, + max_write_capacity_units=30, + ) + + assert store.max_read_capacity_units == 20 + assert store.max_write_capacity_units == 30 + + def test_from_conn_string(self, mock_boto3_session, mock_dynamodb_resource, mock_dynamodb_table): + """Test creating store from connection string.""" + with patch("boto3.Session", return_value=mock_boto3_session): + mock_boto3_session.resource.return_value = mock_dynamodb_resource + mock_dynamodb_resource.Table.return_value = mock_dynamodb_table + + with DynamoDBStore.from_conn_string("test_table") as store: + assert store.table_name == "test_table" + + +class TestDynamoDBStoreSetup: + """Test store setup.""" + + def test_setup_table_exists(self, dynamodb_store): + """Test setup when table already exists.""" + dynamodb_store.table.load.return_value = None + + dynamodb_store.setup() + + dynamodb_store.table.load.assert_called_once() + + def test_setup_table_not_exists(self, dynamodb_store): + """Test setup when table doesn't exist.""" + from botocore.exceptions import ClientError + + # Mock table.load() to raise ResourceNotFoundException + error = ClientError( + {"Error": {"Code": "ResourceNotFoundException"}}, "DescribeTable" + ) + dynamodb_store.table.load.side_effect = error + + # Mock create_table + new_table = Mock() + new_table.wait_until_exists = Mock() + dynamodb_store.dynamodb.create_table = Mock(return_value=new_table) + + dynamodb_store.setup() + + dynamodb_store.dynamodb.create_table.assert_called_once() + new_table.wait_until_exists.assert_called_once() + + +class TestDynamoDBStoreOperations: + """Test store operations.""" + + def test_construct_composite_key(self, dynamodb_store): + """Test composite key construction.""" + pk, sk = dynamodb_store._construct_composite_key(("users", "123"), "prefs") + + assert pk == "users:123" + assert sk == "prefs" + + def test_deconstruct_namespace(self, dynamodb_store): + """Test namespace deconstruction.""" + namespace = dynamodb_store._deconstruct_namespace("users:123") + + assert namespace == ("users", "123") + + def test_deconstruct_namespace_empty(self, dynamodb_store): + """Test deconstruction of empty namespace.""" + namespace = dynamodb_store._deconstruct_namespace("") + + assert namespace == () + + def test_deconstruct_namespace_single(self, dynamodb_store): + """Test deconstruction of single-level namespace.""" + namespace = dynamodb_store._deconstruct_namespace("users") + + assert namespace == ("users",) + + def test_map_to_item(self, dynamodb_store): + """Test mapping DynamoDB item to Item.""" + now = datetime.now(timezone.utc) + result_dict = { + "PK": "users:123", + "SK": "prefs", + "value": {"theme": "dark"}, + "created_at": now.isoformat(), + "updated_at": now.isoformat(), + } + + item = dynamodb_store._map_to_item(result_dict) + + assert isinstance(item, Item) + assert item.namespace == ("users", "123") + assert item.key == "prefs" + assert item.value == {"theme": "dark"} + + def test_calculate_expiry(self, dynamodb_store): + """Test TTL expiry calculation.""" + expiry = dynamodb_store._calculate_expiry(60) + + assert expiry is not None + assert expiry > datetime.now(timezone.utc).timestamp() + + def test_calculate_expiry_none(self, dynamodb_store): + """Test TTL expiry calculation with None.""" + expiry = dynamodb_store._calculate_expiry(None) + + assert expiry is None + + +class TestDynamoDBStoreBatch: + """Test batch operations.""" + + def test_batch_get_op(self, dynamodb_store): + """Test batch GetOp.""" + now = datetime.now(timezone.utc) + dynamodb_store.table.get_item.return_value = { + "Item": { + "PK": "users:123", + "SK": "prefs", + "value": {"theme": "dark"}, + "created_at": now.isoformat(), + "updated_at": now.isoformat(), + } + } + + op = GetOp(namespace=("users", "123"), key="prefs", refresh_ttl=False) + result = dynamodb_store._batch_get_op(op) + + assert result is not None + assert result.key == "prefs" + assert result.value == {"theme": "dark"} + + def test_batch_get_op_not_found(self, dynamodb_store): + """Test batch GetOp when item not found.""" + dynamodb_store.table.get_item.return_value = {} + + op = GetOp(namespace=("users", "123"), key="prefs", refresh_ttl=False) + result = dynamodb_store._batch_get_op(op) + + assert result is None + + def test_batch_put_op(self, dynamodb_store): + """Test batch PutOp.""" + dynamodb_store.table.get_item.return_value = {} + dynamodb_store.table.put_item.return_value = {} + + op = PutOp( + namespace=("users", "123"), + key="prefs", + value={"theme": "dark"}, + index=None, + ttl=None, + ) + result = dynamodb_store._batch_put_op(op) + + assert result is None + dynamodb_store.table.put_item.assert_called_once() + + def test_batch_put_op_delete(self, dynamodb_store): + """Test batch PutOp for delete.""" + dynamodb_store.table.delete_item.return_value = {} + + op = PutOp( + namespace=("users", "123"), key="prefs", value=None, index=None, ttl=None + ) + result = dynamodb_store._batch_put_op(op) + + assert result is None + dynamodb_store.table.delete_item.assert_called_once() + + def test_batch_search_op(self, dynamodb_store): + """Test batch SearchOp.""" + now = datetime.now(timezone.utc) + dynamodb_store.table.query.return_value = { + "Items": [ + { + "PK": "docs", + "SK": "doc1", + "value": {"text": "Hello"}, + "created_at": now.isoformat(), + "updated_at": now.isoformat(), + } + ] + } + + op = SearchOp( + namespace_prefix=("docs",), + filter=None, + limit=10, + offset=0, + query=None, + refresh_ttl=False, + ) + results = dynamodb_store._batch_search_op(op) + + assert len(results) == 1 + assert isinstance(results[0], SearchItem) + assert results[0].key == "doc1" + + def test_batch_search_op_with_filter(self, dynamodb_store): + """Test batch SearchOp with filter.""" + now = datetime.now(timezone.utc) + dynamodb_store.table.query.return_value = { + "Items": [ + { + "PK": "docs", + "SK": "doc1", + "value": {"type": "article", "status": "published"}, + "created_at": now.isoformat(), + "updated_at": now.isoformat(), + }, + { + "PK": "docs", + "SK": "doc2", + "value": {"type": "article", "status": "draft"}, + "created_at": now.isoformat(), + "updated_at": now.isoformat(), + }, + ] + } + + op = SearchOp( + namespace_prefix=("docs",), + filter={"status": "published"}, + limit=10, + offset=0, + query=None, + refresh_ttl=False, + ) + results = dynamodb_store._batch_search_op(op) + + assert len(results) == 1 + assert results[0].key == "doc1" + assert results[0].value["status"] == "published" + + def test_batch_list_namespaces_op(self, dynamodb_store): + """Test batch ListNamespacesOp.""" + dynamodb_store.table.scan.return_value = { + "Items": [ + {"PK": "users:123"}, + {"PK": "users:456"}, + {"PK": "docs"}, + ] + } + + op = ListNamespacesOp( + match_conditions=tuple(), max_depth=None, limit=100, offset=0 + ) + results = dynamodb_store._batch_list_namespaces_op(op) + + assert len(results) == 3 + assert ("users", "123") in results + assert ("users", "456") in results + assert ("docs",) in results + + def test_batch_list_namespaces_op_with_prefix(self, dynamodb_store): + """Test batch ListNamespacesOp with prefix filter.""" + dynamodb_store.table.scan.return_value = { + "Items": [ + {"PK": "users:123"}, + {"PK": "users:456"}, + {"PK": "docs"}, + ] + } + + op = ListNamespacesOp( + match_conditions=(MatchCondition(match_type="prefix", path=("users",)),), + max_depth=None, + limit=100, + offset=0, + ) + results = dynamodb_store._batch_list_namespaces_op(op) + + assert len(results) == 2 + assert ("users", "123") in results + assert ("users", "456") in results + + +class TestDynamoDBStoreFiltering: + """Test filtering functionality.""" + + def test_matches_filter_true(self, dynamodb_store): + """Test filter matching returns True.""" + value = {"type": "article", "status": "published"} + filter_dict = {"type": "article"} + + result = dynamodb_store._matches_filter(value, filter_dict) + + assert result is True + + def test_matches_filter_false(self, dynamodb_store): + """Test filter matching returns False.""" + value = {"type": "article", "status": "draft"} + filter_dict = {"status": "published"} + + result = dynamodb_store._matches_filter(value, filter_dict) + + assert result is False + + def test_matches_filter_missing_key(self, dynamodb_store): + """Test filter matching with missing key.""" + value = {"type": "article"} + filter_dict = {"status": "published"} + + result = dynamodb_store._matches_filter(value, filter_dict) + + assert result is False + + def test_apply_filter(self, dynamodb_store): + """Test applying filter to items.""" + items = [ + {"value": {"type": "article", "status": "published"}}, + {"value": {"type": "article", "status": "draft"}}, + {"value": {"type": "blog", "status": "published"}}, + ] + filter_dict = {"type": "article", "status": "published"} + + filtered = dynamodb_store._apply_filter(items, filter_dict) + + assert len(filtered) == 1 + assert filtered[0]["value"]["status"] == "published" diff --git a/samples/memory/dynamodb_store.ipynb b/samples/memory/dynamodb_store.ipynb new file mode 100644 index 000000000..7aad1821c --- /dev/null +++ b/samples/memory/dynamodb_store.ipynb @@ -0,0 +1,383 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "95f8d3aa", + "metadata": {}, + "source": [ + "# DynamoDB Store for LangGraph\n", + "\n", + "This notebook demonstrates how to use the DynamoDB store implementation for LangGraph, providing persistent key-value storage with hierarchical namespaces." + ] + }, + { + "cell_type": "markdown", + "id": "d33c5fcd", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, install the required dependencies and configure AWS credentials." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0dc8ee03", + "metadata": {}, + "outputs": [], + "source": [ + "# Install required packages\n", + "# %pip install langgraph-checkpoint-aws boto3" + ] + }, + { + "cell_type": "markdown", + "id": "9f4dd0eb", + "metadata": {}, + "source": [ + "## Basic Usage" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47f0a8b4", + "metadata": {}, + "outputs": [], + "source": [ + "from langgraph_checkpoint_aws import DynamoDBStore\n", + "\n", + "# Create a store instance\n", + "store = DynamoDBStore(table_name=\"my-langgraph-store\")\n", + "\n", + "# Setup the table (creates it if it doesn't exist)\n", + "store.setup()" + ] + }, + { + "cell_type": "markdown", + "id": "69b48b28", + "metadata": {}, + "source": [ + "## Storing and Retrieving Data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d9ebb885", + "metadata": {}, + "outputs": [], + "source": [ + "# Store a document in a hierarchical namespace\n", + "store.put(\n", + " (\"documents\", \"user123\"),\n", + " \"report_1\",\n", + " {\n", + " \"text\": \"Machine learning report on customer behavior analysis...\",\n", + " \"tags\": [\"ml\", \"analytics\", \"report\"],\n", + " \"author\": \"data_scientist\"\n", + " }\n", + ")\n", + "\n", + "# Retrieve the document\n", + "item = store.get((\"documents\", \"user123\"), \"report_1\")\n", + "print(f\"Retrieved item: {item.value}\")\n", + "print(f\"Created at: {item.created_at}\")\n", + "print(f\"Updated at: {item.updated_at}\")" + ] + }, + { + "cell_type": "markdown", + "id": "d25f1d15", + "metadata": {}, + "source": [ + "## Searching Documents" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "86180065", + "metadata": {}, + "outputs": [], + "source": [ + "# Store multiple documents\n", + "store.put(\n", + " (\"documents\", \"user123\"),\n", + " \"report_2\",\n", + " {\"text\": \"Sales report Q1...\", \"tags\": [\"sales\", \"report\"], \"author\": \"analyst\"}\n", + ")\n", + "\n", + "store.put(\n", + " (\"documents\", \"user123\"),\n", + " \"note_1\",\n", + " {\"text\": \"Quick note about meeting...\", \"tags\": [\"note\"], \"author\": \"data_scientist\"}\n", + ")\n", + "\n", + "# Search for all documents in the namespace\n", + "results = store.search((\"documents\", \"user123\"))\n", + "print(f\"Found {len(results)} documents\")\n", + "for item in results:\n", + " print(f\" - {item.key}: {item.value['text'][:50]}...\")" + ] + }, + { + "cell_type": "markdown", + "id": "66e0462e", + "metadata": {}, + "source": [ + "## Filtering Results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2e0964a9", + "metadata": {}, + "outputs": [], + "source": [ + "# Search with filter\n", + "results = store.search(\n", + " (\"documents\", \"user123\"),\n", + " filter={\"author\": \"data_scientist\"}\n", + ")\n", + "\n", + "print(f\"Found {len(results)} documents by data_scientist\")\n", + "for item in results:\n", + " print(f\" - {item.key}: {item.value}\")" + ] + }, + { + "cell_type": "markdown", + "id": "95537c66", + "metadata": {}, + "source": [ + "## Using TTL (Time-To-Live)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8fd4d392", + "metadata": {}, + "outputs": [], + "source": [ + "from langgraph_checkpoint_aws import DynamoDBStore\n", + "\n", + "# Create a store with TTL configuration\n", + "store_with_ttl = DynamoDBStore(\n", + " table_name=\"my-langgraph-store-ttl\",\n", + " ttl={\n", + " \"default_ttl\": 60, # 60 minutes default TTL\n", + " \"refresh_on_read\": True, # Refresh TTL when items are read\n", + " }\n", + ")\n", + "store_with_ttl.setup()\n", + "\n", + "# Store a temporary item that will expire after 60 minutes\n", + "store_with_ttl.put(\n", + " (\"temp\", \"session_123\"),\n", + " \"data\",\n", + " {\"value\": \"temporary session data\"}\n", + ")\n", + "\n", + "# Store an item with custom TTL (30 minutes)\n", + "store_with_ttl.put(\n", + " (\"temp\", \"session_123\"),\n", + " \"short_lived\",\n", + " {\"value\": \"expires soon\"},\n", + " ttl=30\n", + ")\n", + "\n", + "print(\"Items stored with TTL. They will be automatically deleted by DynamoDB after expiration.\")" + ] + }, + { + "cell_type": "markdown", + "id": "28436554", + "metadata": {}, + "source": [ + "## Listing Namespaces" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b183a64f", + "metadata": {}, + "outputs": [], + "source": [ + "# Store items in different namespaces\n", + "store.put((\"users\", \"alice\"), \"prefs\", {\"theme\": \"dark\"})\n", + "store.put((\"users\", \"bob\"), \"prefs\", {\"theme\": \"light\"})\n", + "store.put((\"projects\", \"ml\"), \"config\", {\"model\": \"gpt-4\"})\n", + "\n", + "# List all namespaces\n", + "namespaces = store.list_namespaces()\n", + "print(\"All namespaces:\")\n", + "for ns in namespaces:\n", + " print(f\" - {ns}\")\n", + "\n", + "# List namespaces with prefix filter\n", + "user_namespaces = store.list_namespaces(prefix=(\"users\",))\n", + "print(\"\\nNamespaces starting with 'users':\")\n", + "for ns in user_namespaces:\n", + " print(f\" - {ns}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cbb45215", + "metadata": {}, + "source": [ + "## Deleting Items" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8802efa4", + "metadata": {}, + "outputs": [], + "source": [ + "# Delete an item\n", + "store.delete((\"documents\", \"user123\"), \"note_1\")\n", + "\n", + "# Verify it's deleted\n", + "item = store.get((\"documents\", \"user123\"), \"note_1\")\n", + "print(f\"Item exists: {item is not None}\") # Should print False" + ] + }, + { + "cell_type": "markdown", + "id": "78606ee7", + "metadata": {}, + "source": [ + "## Using Context Manager" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "245ee9a0", + "metadata": {}, + "outputs": [], + "source": [ + "# Use context manager for automatic cleanup\n", + "with DynamoDBStore.from_conn_string(\"my-langgraph-store\") as store:\n", + " store.setup()\n", + " store.put((\"test\",), \"example\", {\"data\": \"value\"})\n", + " item = store.get((\"test\",), \"example\")\n", + " print(f\"Retrieved: {item.value}\")" + ] + }, + { + "cell_type": "markdown", + "id": "91b88857", + "metadata": {}, + "source": [ + "## Batch Operations" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "06540380", + "metadata": {}, + "outputs": [], + "source": [ + "from langgraph.store.base import PutOp, GetOp, SearchOp\n", + "\n", + "# Perform multiple operations in a batch\n", + "ops = [\n", + " PutOp((\"batch\",), \"item1\", {\"value\": 1}, None, None),\n", + " PutOp((\"batch\",), \"item2\", {\"value\": 2}, None, None),\n", + " PutOp((\"batch\",), \"item3\", {\"value\": 3}, None, None),\n", + "]\n", + "\n", + "results = store.batch(ops)\n", + "print(f\"Batch operation completed: {len(results)} operations\")\n", + "\n", + "# Get multiple items\n", + "get_ops = [\n", + " GetOp((\"batch\",), \"item1\", False),\n", + " GetOp((\"batch\",), \"item2\", False),\n", + " GetOp((\"batch\",), \"item3\", False),\n", + "]\n", + "\n", + "items = store.batch(get_ops)\n", + "for item in items:\n", + " if item:\n", + " print(f\" - {item.key}: {item.value}\")" + ] + }, + { + "cell_type": "markdown", + "id": "8389e3ad", + "metadata": {}, + "source": [ + "## Integration with LangGraph\n", + "\n", + "The DynamoDB store can be used with LangGraph for persistent memory:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7e2849d5", + "metadata": {}, + "outputs": [], + "source": [ + "# Example of using DynamoDB store with LangGraph\n", + "# (This requires LangGraph to be installed)\n", + "\n", + "# from langgraph.prebuilt import ToolNode\n", + "# from langgraph.graph import StateGraph, END\n", + "# from langgraph_checkpoint_aws import DynamoDBStore\n", + "\n", + "# store = DynamoDBStore(table_name=\"langgraph-memory\")\n", + "# store.setup()\n", + "\n", + "# # Use store for persistent memory across graph executions\n", + "# # Store user preferences, conversation history, etc.\n", + "\n", + "print(\"DynamoDB store can be used for LangGraph persistent memory!\")" + ] + }, + { + "cell_type": "markdown", + "id": "15c9776d", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b9a38ac6", + "metadata": {}, + "outputs": [], + "source": [ + "# Note: To delete the DynamoDB table, use AWS Console or boto3 directly\n", + "# The store does not provide a method to delete tables to prevent accidental data loss\n", + "\n", + "# import boto3\n", + "# dynamodb = boto3.resource('dynamodb')\n", + "# table = dynamodb.Table('my-langgraph-store')\n", + "# table.delete()\n", + "\n", + "print(\"Remember to delete DynamoDB tables when no longer needed to avoid charges!\")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}