diff --git a/cookbook/claim-check-pattern-python.mdx b/cookbook/claim-check-pattern-python.mdx new file mode 100644 index 0000000000..2e7097a151 --- /dev/null +++ b/cookbook/claim-check-pattern-python.mdx @@ -0,0 +1,503 @@ +--- +title: Claim Check Pattern with Temporal +description: Use the Claim Check pattern to efficiently handle large payloads by storing them externally and passing only keys through Temporal. +tags: [foundations, python, patterns] +source: https://github.com/temporalio/ai-cookbook/tree/main/foundations/claim_check_pattern_python +--- + +The Claim Check pattern enables efficient handling of large payloads by storing +them externally and passing only keys through Temporal workflows and activities. + +This pattern is particularly useful when: +- Working with large files, images, or datasets +- Processing bulk data operations +- Handling payloads that exceed Temporal's size limits +- Improving performance by reducing payload serialization overhead + +## How the Claim Check Pattern Works + +The Claim Check pattern implements a `PayloadCodec` that: + +1. **Encode**: Replaces large payloads with unique keys and stores the original + data in external storage (Redis, S3, etc.) +2. **Decode**: Retrieves the original payload using the key when needed + +This allows Temporal workflows to operate with small, lightweight keys instead +of large payloads, while maintaining transparent access to the full data through +automatic encoding/decoding. + +## Claim Check Codec Implementation + +The `ClaimCheckCodec` class implements the `PayloadCodec` interface to handle +the encoding and decoding of payloads. + +```python +import uuid +import redis.asyncio as redis +from typing import Iterable, List + +from temporalio.api.common.v1 import Payload +from temporalio.converter import PayloadCodec + +class ClaimCheckCodec(PayloadCodec): + def __init__(self, redis_host: str = "localhost", redis_port: int = 6379): + self.redis_client = redis.Redis(host=redis_host, port=redis_port) + + async def encode(self, payloads: Iterable[Payload]) -> List[Payload]: + """Replace large payloads with keys and store original data in Redis.""" + out: List[Payload] = [] + for payload in payloads: + encoded = await self.encode_payload(payload) + out.append(encoded) + return out + + async def decode(self, payloads: Iterable[Payload]) -> List[Payload]: + """Retrieve original payloads from Redis using stored keys.""" + out: List[Payload] = [] + for payload in payloads: + if payload.metadata.get("temporal.io/claim-check-codec", b"").decode() != "v1": + # Not a claim-checked payload, pass through unchanged + out.append(payload) + continue + + redis_key = payload.data.decode("utf-8") + stored_data = await self.redis_client.get(redis_key) + if stored_data is None: + raise ValueError(f"Claim check key not found in Redis: {redis_key}") + + original_payload = Payload.FromString(stored_data) + out.append(original_payload) + return out + + async def encode_payload(self, payload: Payload) -> Payload: + """Store payload in Redis and return a key-based payload.""" + key = str(uuid.uuid4()) + serialized_data = payload.SerializeToString() + + # Store the original payload data in Redis + await self.redis_client.set(key, serialized_data) + + # Return a lightweight payload containing only the key + return Payload( + metadata={ + "encoding": b"claim-checked", + "temporal.io/claim-check-codec": b"v1", + }, + data=key.encode("utf-8"), + ) +``` + +## Claim Check Plugin + +The `ClaimCheckPlugin` integrates the codec with Temporal's client configuration. + +```python +import os +from temporalio.client import Plugin, ClientConfig +from temporalio.converter import DataConverter + +from claim_check_codec import ClaimCheckCodec + +class ClaimCheckPlugin(Plugin): + def __init__(self): + self.redis_host = os.getenv("REDIS_HOST", "localhost") + self.redis_port = int(os.getenv("REDIS_PORT", "6379")) + + def get_data_converter(self, config: ClientConfig) -> DataConverter: + """Configure the data converter with claim check codec.""" + default_converter_class = config["data_converter"].payload_converter_class + claim_check_codec = ClaimCheckCodec(self.redis_host, self.redis_port) + + return DataConverter( + payload_converter_class=default_converter_class, + payload_codec=claim_check_codec + ) + + def configure_client(self, config: ClientConfig) -> ClientConfig: + """Apply the claim check configuration to the client.""" + config["data_converter"] = self.get_data_converter(config) + return super().configure_client(config) +``` + +## Example: Two-Stage Large Data Processing Workflow + +This example demonstrates the Claim Check pattern with a realistic data processing pipeline that shows how large payloads are handled at multiple stages. + +### Data Models + +```python +from dataclasses import dataclass +from typing import List, Dict, Any + +@dataclass +class LargeDataset: + """Represents a large dataset that would benefit from claim check pattern.""" + data: List[Dict[str, Any]] + metadata: Dict[str, Any] + +@dataclass +class TransformedDataset: + """Large dataset after transformation - still large but processed.""" + data: List[Dict[str, Any]] + metadata: Dict[str, Any] + transformation_stats: Dict[str, Any] + +@dataclass +class SummaryResult: + """Final summary result - small payload.""" + total_items: int + processed_items: int + transformation_stats: Dict[str, Any] + summary_stats: Dict[str, Any] + errors: List[str] +``` + +### Activities + +```python +from temporalio import activity + +@activity.defn +async def transform_large_dataset(dataset: LargeDataset) -> TransformedDataset: + """Transform a large dataset - produces another large dataset. + + This activity demonstrates how the claim check pattern allows processing + of large datasets without hitting Temporal's payload size limits. + The transformation produces another large dataset that gets passed to + the next activity. + """ + processed_count = 0 + errors = [] + transformation_stats = { + "total_items": len(dataset.data), + "transformations_applied": [] + } + + # Create a copy of the data to transform + transformed_data = [] + + for item in dataset.data: + try: + # Create a new item with transformations + transformed_item = item.copy() + + # Apply various transformations + if "value" in item: + transformed_item["processed_value"] = item["value"] * 2 + transformed_item["value_category"] = "high" if item["value"] > 1000 else "low" + processed_count += 1 + + if "text" in item: + # Simulate text processing + words = item["text"].split() + transformed_item["word_count"] = len(words) + transformed_item["avg_word_length"] = sum(len(word) for word in words) / len(words) if words else 0 + transformed_item["text_sentiment"] = "positive" if "good" in item["text"].lower() else "neutral" + processed_count += 1 + + # Add additional computed fields + transformed_item["computed_score"] = ( + transformed_item.get("processed_value", 0) * 0.7 + + transformed_item.get("word_count", 0) * 0.3 + ) + + transformed_data.append(transformed_item) + + except Exception as e: + errors.append(f"Error transforming item {item.get('id', 'unknown')}: {str(e)}") + # Still add the original item even if transformation failed + transformed_data.append(item) + + transformation_stats["processed_items"] = processed_count + transformation_stats["error_count"] = len(errors) + transformation_stats["transformations_applied"] = [ + "value_doubling", "category_assignment", "text_analysis", + "sentiment_analysis", "score_computation" + ] + + # Update metadata + updated_metadata = dataset.metadata.copy() + updated_metadata["transformed_at"] = "2024-01-01T00:00:00Z" + updated_metadata["transformation_version"] = "1.0" + + return TransformedDataset( + data=transformed_data, + metadata=updated_metadata, + transformation_stats=transformation_stats + ) + +@activity.defn +async def generate_summary(transformed_dataset: TransformedDataset) -> SummaryResult: + """Generate a summary from the transformed dataset. + + This activity takes the large transformed dataset and produces a small + summary result, demonstrating the claim check pattern with large activity + input and small output. + """ + data = transformed_dataset.data + total_items = len(data) + + # Calculate summary statistics + summary_stats = { + "total_items": total_items, + "value_stats": { + "min_value": min(item.get("value", 0) for item in data), + "max_value": max(item.get("value", 0) for item in data), + "avg_value": sum(item.get("value", 0) for item in data) / total_items if total_items > 0 else 0, + "high_value_count": sum(1 for item in data if item.get("value_category") == "high") + }, + "text_stats": { + "total_words": sum(item.get("word_count", 0) for item in data), + "avg_word_count": sum(item.get("word_count", 0) for item in data) / total_items if total_items > 0 else 0, + "avg_word_length": sum(item.get("avg_word_length", 0) for item in data) / total_items if total_items > 0 else 0, + "positive_sentiment_count": sum(1 for item in data if item.get("text_sentiment") == "positive") + }, + "score_stats": { + "min_score": min(item.get("computed_score", 0) for item in data), + "max_score": max(item.get("computed_score", 0) for item in data), + "avg_score": sum(item.get("computed_score", 0) for item in data) / total_items if total_items > 0 else 0 + } + } + + return SummaryResult( + total_items=total_items, + processed_items=transformed_dataset.transformation_stats.get("processed_items", 0), + transformation_stats=transformed_dataset.transformation_stats, + summary_stats=summary_stats, + errors=transformed_dataset.transformation_stats.get("error_count", 0) + ) +``` + +### Workflow + +```python +from temporalio import workflow +from datetime import timedelta + +@workflow.defn +class LargeDataProcessingWorkflow: + """Workflow that demonstrates the Claim Check pattern with large datasets. + + This workflow demonstrates the claim check pattern by: + 1. Taking a large dataset as input (large workflow input) + 2. Transforming it into another large dataset (large activity input/output) + 3. Generating a summary from the transformed data (large activity input, small output) + + This shows how the claim check pattern handles large payloads at multiple stages. + """ + + @workflow.run + async def run(self, dataset: LargeDataset) -> SummaryResult: + """Process large dataset using claim check pattern with two-stage processing.""" + # Step 1: Transform the large dataset (large input -> large output) + transformed_dataset = await workflow.execute_activity( + transform_large_dataset, + dataset, + start_to_close_timeout=timedelta(minutes=10), + summary="Transform large dataset" + ) + + # Step 2: Generate summary from transformed data (large input -> small output) + summary_result = await workflow.execute_activity( + generate_summary, + transformed_dataset, + start_to_close_timeout=timedelta(minutes=5), + summary="Generate summary from transformed data" + ) + + return summary_result +``` + +### How This Demonstrates the Claim Check Pattern + +This example shows the Claim Check pattern in action across multiple stages: + +1. **Large Workflow Input**: The workflow receives a large dataset from the client +2. **Large Activity Input/Output**: The first activity transforms the large dataset, producing another large dataset +3. **Large Activity Input, Small Output**: The second activity takes the transformed data and produces a compact summary + +This flow demonstrates how the claim check pattern handles large payloads at multiple stages of processing, making it transparent to your workflow logic while avoiding Temporal's payload size limits. + +## Configuration + +Set environment variables to configure the Redis connection: + +```bash +# Configure Redis connection (optional - defaults to localhost:6379) +export REDIS_HOST=localhost +export REDIS_PORT=6379 +``` + +## Prerequisites + +- **Redis Server**: Required for external storage of large payloads + +## Running the Example + +1. Start Redis server: +```bash +redis-server +``` + +2. Start the Temporal Dev Server: +```bash +temporal server start-dev +``` + +3. Run the worker: +```bash +uv run python -m worker +``` + +4. Start execution: +```bash +uv run python -m start_workflow +``` + +## Codec Server for Web UI + +When using the Claim Check pattern, the Temporal Web UI will show encoded Redis +keys instead of the actual payload data. This makes debugging and monitoring +difficult since you can't see what data is being passed through your workflows. + +### The Problem + +Without a codec server, the Web UI displays raw claim check keys like: +``` +abc123-def4-5678-9abc-def012345678 +``` + +This provides no context about what data is stored or how to access it, making +workflow debugging and monitoring challenging. + +### Our Solution: Lightweight Codec Server + +We've designed a codec server that provides helpful information without the +risks of reading large payload data: + +#### Design Principles + +1. **No Data Reading**: The codec server never reads the actual payload data during decode operations +2. **On-Demand Access**: Full data is available via a separate endpoint when needed +3. **Simple & Safe**: Just provides the Redis key and a link - no assumptions about data content +4. **Performance First**: Zero impact on Web UI performance + +#### What It Shows + +Instead of raw keys, the Web UI displays: +``` +"Claim check data (key: abc123-def4-5678-9abc-def012345678) - View at: http://localhost:8081/view/abc123-def4-5678-9abc-def012345678" +``` + +This gives you: +- **Clear identification**: You know this is claim check data +- **Redis key**: The actual key used for storage +- **Direct access**: Click the URL to view the full payload data + +### Running the Codec Server + +1. Start the codec server: +```bash +uv run python -m codec_server +``` + +2. Configure the Temporal Web UI to use the codec server. For `temporal server + start-dev`, see the [Temporal documentation on configuring codec + servers](https://docs.temporal.io/production-deployment/data-encryption#set-your-codec-server-endpoints-with-web-ui-and-cli) + for the appropriate configuration method. + +3. Access the Temporal Web UI and you'll see helpful summaries instead of raw keys. + +### Why This Approach? + +#### Avoiding Common Pitfalls + +**❌ What we DON'T do:** +- Parse or analyze payload data (could be huge or malformed) +- Attempt to summarize content (assumes data structure) +- Read data during decode operations (performance impact) + +**✅ What we DO:** +- Provide the Redis key for manual inspection +- Offer a direct link to view full data when needed +- Keep the Web UI responsive with minimal information + +#### Benefits + +- **Performance**: No Redis calls during Web UI operations +- **Safety**: No risk of parsing problematic data +- **Flexibility**: Works with any data type or size +- **Debugging**: Full data accessible when needed via `/view/{key}` endpoint + +### Configuration Details + +The codec server implements the Temporal codec server protocol with two endpoints: + +- **`/decode`**: Returns helpful text with Redis key and view URL +- **`/view/{key}`**: Serves the raw payload data for inspection + +When you click the view URL, you'll see the complete payload data as stored in +Redis, formatted appropriately for text or binary content. + +## Considerations + +### Performance Trade-offs + +- **Benefits**: Reduces payload size, improves workflow performance, enables handling of large data +- **Costs**: Additional network calls to external storage, potential latency increase + +### Storage Backend Options + +While this example uses Redis, production systems typically use: +- **AWS S3**: For AWS environments +- **Google Cloud Storage**: For GCP environments +- **Azure Blob Storage**: For Azure environments +- **Redis**: For development and testing + +### Activity Heartbeats + +For production workloads processing very large datasets, consider implementing +activity heartbeats to prevent timeout issues: + +```python +@activity.defn +async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult: + total_items = len(dataset.data) + + for i, item in enumerate(dataset.data): + # Send heartbeat every 100 items to prevent timeout + if i % 100 == 0: + await activity.heartbeat(f"Processed {i}/{total_items} items") + + # Process item... +``` + +This ensures Temporal knows the activity is still making progress during +long-running operations. + +### Error Handling + +The codec includes error handling for: +- Missing keys in storage +- Storage connection failures +- Serialization/deserialization errors + +### Cleanup + +Consider implementing cleanup strategies for stored data: +- TTL (Time To Live) for automatic expiration +- Manual cleanup workflows +- Lifecycle policies for cloud storage + +## Best Practices + +1. **Enable globally**: The claim check pattern applies to all payloads when + enabled, so consider the performance impact across your entire system +2. **Monitor storage**: Track storage usage and costs since all payloads will be + stored externally +3. **Implement cleanup**: Prevent storage bloat with appropriate cleanup + strategies +4. **Test thoroughly**: Verify the pattern works correctly with your specific + data types and doesn't introduce unexpected latency +5. **Consider alternatives**: Evaluate if data compression or other + optimizations might be sufficient before implementing claim check