Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 383 additions & 0 deletions cookbook/claim-check-pattern-python.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,383 @@
---
title: Claim Check Pattern with Temporal
description: Use the Claim Check pattern to efficiently handle large payloads by storing them externally and passing only keys through Temporal.
tags: [foundations, python, patterns]
source: https://github.com/temporalio/ai-cookbook/tree/main/foundations/claim_check_pattern_python
---

The Claim Check pattern enables efficient handling of large payloads by storing
them externally and passing only keys through Temporal workflows and activities.

This pattern is particularly useful when:
- Working with large files, images, or datasets
- Processing bulk data operations
- Handling payloads that exceed Temporal's size limits
- Improving performance by reducing payload serialization overhead

## How the Claim Check Pattern Works

The Claim Check pattern implements a `PayloadCodec` that:

1. **Encode**: Replaces large payloads with unique keys and stores the original
data in external storage (Redis, S3, etc.)
2. **Decode**: Retrieves the original payload using the key when needed

This allows Temporal workflows to operate with small, lightweight keys instead
of large payloads, while maintaining transparent access to the full data through
automatic encoding/decoding.

## Claim Check Codec Implementation

The `ClaimCheckCodec` class implements the `PayloadCodec` interface to handle
the encoding and decoding of payloads.

```python
import uuid
import redis.asyncio as redis
from typing import Iterable, List

from temporalio.api.common.v1 import Payload
from temporalio.converter import PayloadCodec

class ClaimCheckCodec(PayloadCodec):
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port)

async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Replace large payloads with keys and store original data in Redis."""
out: List[Payload] = []
for payload in payloads:
encoded = await self.encode_payload(payload)
out.append(encoded)
return out

async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Retrieve original payloads from Redis using stored keys."""
out: List[Payload] = []
for payload in payloads:
if payload.metadata.get("temporal.io/claim-check-codec", b"").decode() != "v1":
# Not a claim-checked payload, pass through unchanged
out.append(payload)
continue

redis_key = payload.data.decode("utf-8")
stored_data = await self.redis_client.get(redis_key)
if stored_data is None:
raise ValueError(f"Claim check key not found in Redis: {redis_key}")

original_payload = Payload.FromString(stored_data)
out.append(original_payload)
return out

async def encode_payload(self, payload: Payload) -> Payload:
"""Store payload in Redis and return a key-based payload."""
key = str(uuid.uuid4())
serialized_data = payload.SerializeToString()

# Store the original payload data in Redis
await self.redis_client.set(key, serialized_data)

# Return a lightweight payload containing only the key
return Payload(
metadata={
"encoding": b"claim-checked",
"temporal.io/claim-check-codec": b"v1",
},
data=key.encode("utf-8"),
)
```

## Claim Check Plugin

The `ClaimCheckPlugin` integrates the codec with Temporal's client configuration.

```python
import os
from temporalio.client import Plugin, ClientConfig
from temporalio.converter import DataConverter

from claim_check_codec import ClaimCheckCodec

class ClaimCheckPlugin(Plugin):
def __init__(self):
self.redis_host = os.getenv("REDIS_HOST", "localhost")
self.redis_port = int(os.getenv("REDIS_PORT", "6379"))

def get_data_converter(self, config: ClientConfig) -> DataConverter:
"""Configure the data converter with claim check codec."""
default_converter_class = config["data_converter"].payload_converter_class
claim_check_codec = ClaimCheckCodec(self.redis_host, self.redis_port)

return DataConverter(
payload_converter_class=default_converter_class,
payload_codec=claim_check_codec
)

def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Apply the claim check configuration to the client."""
config["data_converter"] = self.get_data_converter(config)
return super().configure_client(config)
```

## Example: Large Data Processing Workflow

This example demonstrates processing large datasets using the Claim Check pattern.

```python
from temporalio import workflow, activity
from dataclasses import dataclass
from typing import List
import json

@dataclass
class LargeDataset:
"""Represents a large dataset that would benefit from claim check pattern."""
data: List[dict]
metadata: dict

@dataclass
class ProcessingResult:
"""Result of processing the large dataset."""
processed_count: int
summary: dict
errors: List[str]

@activity.defn
async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult:
"""Process a large dataset - this would normally cause payload size issues.

This activity demonstrates how the claim check pattern allows processing
of large datasets without hitting Temporal's payload size limits.

Note: For production workloads processing very large datasets, consider adding
activity heartbeats to prevent timeout issues during long-running processing.

Args:
dataset: Large dataset to process

Returns:
ProcessingResult with processing statistics and any errors
"""
processed_count = 0
errors = []
summary = {"total_items": len(dataset.data)}

for item in dataset.data:
try:
# Simulate processing each item
if "value" in item:
item["processed_value"] = item["value"] * 2
processed_count += 1
elif "text" in item:
# Simulate text processing
item["word_count"] = len(item["text"].split())
processed_count += 1
except Exception as e:
errors.append(f"Error processing item {item.get('id', 'unknown')}: {str(e)}")

summary["processed_items"] = processed_count
summary["error_count"] = len(errors)

return ProcessingResult(
processed_count=processed_count,
summary=summary,
errors=errors
)

@workflow.defn
class LargeDataProcessingWorkflow:
@workflow.run
async def run(self, dataset: LargeDataset) -> ProcessingResult:
"""Process large dataset using claim check pattern."""
result = await workflow.execute_activity(
process_large_dataset,
dataset,
start_to_close_timeout=timedelta(minutes=10),
summary="Process large dataset"
)
return result
```

## Configuration

Set environment variables to configure the Redis connection:

```bash
# Configure Redis connection (optional - defaults to localhost:6379)
export REDIS_HOST=localhost
export REDIS_PORT=6379
```

## Prerequisites

- **Redis Server**: Required for external storage of large payloads

## Running the Example

1. Start Redis server:
```bash
redis-server
```

2. Start the Temporal Dev Server:
```bash
temporal server start-dev
```

3. Run the worker:
```bash
uv run python -m worker
```

4. Start execution:
```bash
uv run python -m start_workflow
```

## Codec Server for Web UI

When using the Claim Check pattern, the Temporal Web UI will show encoded Redis
keys instead of the actual payload data. This makes debugging and monitoring
difficult since you can't see what data is being passed through your workflows.

### The Problem

Without a codec server, the Web UI displays raw claim check keys like:
```
abc123-def4-5678-9abc-def012345678
```

This provides no context about what data is stored or how to access it, making
workflow debugging and monitoring challenging.

### Our Solution: Lightweight Codec Server

We've designed a codec server that provides helpful information without the
risks of reading large payload data:

#### Design Principles

1. **No Data Reading**: The codec server never reads the actual payload data during decode operations
2. **On-Demand Access**: Full data is available via a separate endpoint when needed
3. **Simple & Safe**: Just provides the Redis key and a link - no assumptions about data content
4. **Performance First**: Zero impact on Web UI performance

#### What It Shows

Instead of raw keys, the Web UI displays:
```
"Claim check data (key: abc123-def4-5678-9abc-def012345678) - View at: http://localhost:8081/view/abc123-def4-5678-9abc-def012345678"
```

This gives you:
- **Clear identification**: You know this is claim check data
- **Redis key**: The actual key used for storage
- **Direct access**: Click the URL to view the full payload data

### Running the Codec Server

1. Start the codec server:
```bash
uv run python -m codec_server
```

2. Configure the Temporal Web UI to use the codec server. For `temporal server
start-dev`, see the [Temporal documentation on configuring codec
servers](https://docs.temporal.io/production-deployment/data-encryption#set-your-codec-server-endpoints-with-web-ui-and-cli)
for the appropriate configuration method.

3. Access the Temporal Web UI and you'll see helpful summaries instead of raw keys.

### Why This Approach?

#### Avoiding Common Pitfalls

**❌ What we DON'T do:**
- Parse or analyze payload data (could be huge or malformed)
- Attempt to summarize content (assumes data structure)
- Read data during decode operations (performance impact)

**✅ What we DO:**
- Provide the Redis key for manual inspection
- Offer a direct link to view full data when needed
- Keep the Web UI responsive with minimal information

#### Benefits

- **Performance**: No Redis calls during Web UI operations
- **Safety**: No risk of parsing problematic data
- **Flexibility**: Works with any data type or size
- **Debugging**: Full data accessible when needed via `/view/{key}` endpoint

### Configuration Details

The codec server implements the Temporal codec server protocol with two endpoints:

- **`/decode`**: Returns helpful text with Redis key and view URL
- **`/view/{key}`**: Serves the raw payload data for inspection

When you click the view URL, you'll see the complete payload data as stored in
Redis, formatted appropriately for text or binary content.

## Considerations

### Performance Trade-offs

- **Benefits**: Reduces payload size, improves workflow performance, enables handling of large data
- **Costs**: Additional network calls to external storage, potential latency increase

### Storage Backend Options

While this example uses Redis, production systems typically use:
- **AWS S3**: For AWS environments
- **Google Cloud Storage**: For GCP environments
- **Azure Blob Storage**: For Azure environments
- **Redis**: For development and testing

### Activity Heartbeats

For production workloads processing very large datasets, consider implementing
activity heartbeats to prevent timeout issues:

```python
@activity.defn
async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult:
total_items = len(dataset.data)

for i, item in enumerate(dataset.data):
# Send heartbeat every 100 items to prevent timeout
if i % 100 == 0:
await activity.heartbeat(f"Processed {i}/{total_items} items")

# Process item...
```

This ensures Temporal knows the activity is still making progress during
long-running operations.

### Error Handling

The codec includes error handling for:
- Missing keys in storage
- Storage connection failures
- Serialization/deserialization errors

### Cleanup

Consider implementing cleanup strategies for stored data:
- TTL (Time To Live) for automatic expiration
- Manual cleanup workflows
- Lifecycle policies for cloud storage

## Best Practices

1. **Enable globally**: The claim check pattern applies to all payloads when
enabled, so consider the performance impact across your entire system
2. **Monitor storage**: Track storage usage and costs since all payloads will be
stored externally
3. **Implement cleanup**: Prevent storage bloat with appropriate cleanup
strategies
4. **Test thoroughly**: Verify the pattern works correctly with your specific
data types and doesn't introduce unexpected latency
5. **Consider alternatives**: Evaluate if data compression or other
optimizations might be sufficient before implementing claim check