Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 119 additions & 46 deletions docs/features/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ description: Utility
status: new
---

???+ info "Work in progress"
This documentation page is a work in progress for an upcoming feature in Powertools for AWS Lambda. If you're seeing this page, it means the release process is underway, but the feature is not yet available on npm. Please check back soon for the final version.

The Kafka Consumer utility transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem.

```mermaid
Expand Down Expand Up @@ -72,15 +69,15 @@ Depending on the schema types you want to use, install the library and the corre

Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType.

### Required resources
<!-- ### Required resources

To use the Kafka consumer utility, you need an AWS Lambda function configured with a Kafka event source. This can be Amazon MSK, MSK Serverless, or a self-hosted Kafka cluster.

=== "gettingStartedWithMsk.yaml"

```yaml
--8<-- "examples/snippets/kafka/templates/gettingStartedWithMsk.yaml"
```
``` -->

### Using ESM with Schema Registry

Expand All @@ -97,19 +94,19 @@ The Kafka consumer utility transforms raw Kafka events into an intuitive format

=== "Avro Messages"

```typescript
```typescript hl_lines="2-3 8-13 15 19"
--8<-- "examples/snippets/kafka/gettingStartedAvro.ts"
```

=== "Protocol Buffers"

```typescript
```typescript hl_lines="1-2 8-13 15 19"
--8<-- "examples/snippets/kafka/gettingStartedProtobuf.ts"
```

=== "JSON Messages"

```typescript
```typescript hl_lines="1-2 7-11 13 17"
--8<-- "examples/snippets/kafka/gettingStartedJson.ts"
```

Expand All @@ -119,7 +116,7 @@ The `kafkaConsumer` function can deserialize both keys and values independently

=== "index.ts"

```typescript
```typescript hl_lines="9 13 22 25-26"
--8<-- "examples/snippets/kafka/gettingStartedKeyValue.ts:func"
```

Expand Down Expand Up @@ -197,72 +194,142 @@ Each Kafka record contains important metadata that you can access alongside the

=== "Working with Record Metadata"

```typescript
```typescript hl_lines="10"
--8<-- "examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts"
```

For debugging purposes, you can also access the original key, value, and headers in their base64-encoded form, these are available in the `originalValue`, `originalKey`, and `originalHeaders` properties of the `record`.

#### Available metadata properties

| Property | Description | Example Use Case |
|-------------------|-----------------------------------------------------|---------------------------------------------|
| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers |
| `partition` | Kafka partition number | Tracking message distribution |
| `offset` | Position in the partition | De-duplication, exactly-once processing |
| `timestamp` | Unix timestamp when record was created | Event timing analysis |
| `timestamp_type` | Timestamp type (`CREATE_TIME` or `LOG_APPEND_TIME`) | Data lineage verification |
| `headers` | Key-value pairs attached to the message | Cross-cutting concerns like correlation IDs |
| `key` | Deserialized message key | Customer ID or entity identifier |
| `value` | Deserialized message content | The actual business data |
| `originalValue` | Base64-encoded original message value | Debugging or custom deserialization |
| `originalKey` | Base64-encoded original message key | Debugging or custom deserialization |
| `originalHeaders` | Base64-encoded original message headers | Debugging or custom deserialization |

### Custom output serializers
| Property | Description | Example Use Case |
|-----------------------|------------------------------------------------------------------|---------------------------------------------------------------------|
| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers |
| `partition` | Kafka partition number | Tracking message distribution |
| `offset` | Position in the partition | De-duplication, exactly-once processing |
| `timestamp` | Unix timestamp when record was created | Event timing analysis |
| `timestamp_type` | Timestamp type (`CREATE_TIME` or `LOG_APPEND_TIME`) | Data lineage verification |
| `headers` | Key-value pairs attached to the message | Cross-cutting concerns like correlation IDs |
| `key` | Deserialized message key | Customer ID or entity identifier |
| `value` | Deserialized message content | The actual business data |
| `originalValue` | Base64-encoded original message value | Debugging or custom deserialization |
| `originalKey` | Base64-encoded original message key | Debugging or custom deserialization |
| `originalHeaders` | Base64-encoded original message headers | Debugging or custom deserialization |
| `valueSchemaMetadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation |
| `keySchemaMetadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation |

### Additional Parsing

You can parse deserialized data using your preferred parsing library. This can help you integrate Kafka data with your domain schemas and application architecture, providing type hints, runtime parsing and validation, and advanced data transformations.

=== "Zod"

```typescript
```typescript hl_lines="25 29"
--8<-- "examples/snippets/kafka/advancedWorkingWithZod.ts"
```

=== "Valibot"

```typescript
```typescript hl_lines="28 32"
--8<-- "examples/snippets/kafka/advancedWorkingWithValibot.ts"
```

=== "ArkType"

```typescript
```typescript hl_lines="25 29"
--8<-- "examples/snippets/kafka/advancedWorkingWithArkType.ts"
```

#### Exception types
### Error handling

Handle errors gracefully when processing Kafka messages to ensure your application maintains resilience and provides clear diagnostic information. The Kafka consumer utility provides specific exception types to help you identify and handle deserialization issues effectively.

!!! tip
Fields like `value`, `key`, and `headers` are decoded lazily, meaning they are only deserialized when accessed. This allows you to handle deserialization errors at the point of access rather than when the record is first processed.

| Exception | Description | Common Causes |
|-----------|-------------|---------------|
| `KafkaConsumerDeserializationError` | Raised when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration |
| `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema |
| `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) |
| `KafkaConsumerOutputSerializerError` | Raised when output serializer fails | Error in custom serializer function, incompatible data, or validation failures in Pydantic models |
=== "Basic Error Handling"

```typescript hl_lines="29 36 45"
--8<-- "examples/snippets/kafka/advancedBasicErrorHandling.ts:3"
```

1. If you want to handle deserialization and parsing errors, you should destructure or access the `value`, `key`, or `headers` properties of the record within the `for...of` loop.

=== "Parser Error Handling"

```typescript hl_lines="41 44"
--8<-- "examples/snippets/kafka/advancedParserErrorHandling.ts:3"
```

1. The `cause` property of the error is populated with the original Standard Schema parsing error, allowing you to access detailed information about the parsing failure.

#### Error types

| Exception | Description | Common Causes |
|--------------------------------------|-----------------------------------------------|-----------------------------------------------------------------------------|
| `KafkaConsumerError`. | Base class for all Kafka consumer errors | General unhandled errors |
| `KafkaConsumerDeserializationError` | Thrown when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration |
| `KafkaConsumerMissingSchemaError` | Thrown when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) |
| `KafkaConsumerOutputSerializerError` | Thrown when additional schema parsing fails | Parsing failures in Standard Schema models |

### Integrating with Idempotency

When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The idempotency utility prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once.
When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The [Idempotency utility](./idempotency.md) prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once.

The Idempotency utility automatically stores the result of each successful operation, returning the cached result if the same message is processed again, which prevents potentially harmful duplicate operations like double-charging customers or double-counting metrics.

!!! tip
By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.

=== "Idempotent Kafka Processing"

```typescript
```typescript hl_lines="44 51"
--8<-- "examples/snippets/kafka/advancedWorkingWithIdempotency.ts"
```

TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.
### Best practices

#### Handling large messages

When processing large Kafka messages in Lambda, be mindful of memory limitations. Although the Kafka consumer utility optimizes memory usage, large deserialized messages can still exhaust the function resources.

=== "Handling Large Messages"

```typescript hl_lines="18-20"
--8<-- "examples/snippets/kafka/advancedHandlingLargeMessages.ts:6"
```

For large messages, consider these proven approaches:

* **Store the data:** use Amazon S3 and include only the S3 reference in your Kafka message
* **Split large payloads:** use multiple smaller messages with sequence identifiers
* **Increase memory:** Increase your Lambda function's memory allocation, which also increases CPU capacity

#### Batch size configuration

The number of Kafka records processed per Lambda invocation is controlled by your Event Source Mapping configuration. Properly sized batches optimize cost and performance.

=== "Handling Large Messages"

```yaml hl_lines="16"
--8<-- "examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml"
```

Different workloads benefit from different batch configurations:

* **High-volume, simple processing:** Use larger batches (100-500 records) with short timeout
* **Complex processing with database operations:** Use smaller batches (10-50 records)
* **Mixed message sizes:** Set appropriate batching window (1-5 seconds) to handle variability

#### Cross-language compatibility

When using binary serialization formats across multiple programming languages, ensure consistent schema handling to prevent deserialization failures.

Common cross-language challenges to address:

* **Field naming conventions:** camelCase in Java vs snake_case in Python
* **Date/time:** representation differences
* **Numeric precision handling:** especially decimals, doubles, and floats

### Troubleshooting

Expand All @@ -274,14 +341,14 @@ First, check that your schema definition exactly matches the message format. Eve

For binary messages that fail to deserialize, examine the raw encoded data:

```python
# DO NOT include this code in production handlers
# For troubleshooting purposes only
```javascript
// DO NOT include this code in production handlers
// For troubleshooting purposes only
import base64

raw_bytes = base64.b64decode(record.raw_value)
print(f"Message size: {len(raw_bytes)} bytes")
print(f"First 50 bytes (hex): {raw_bytes[:50].hex()}")
const rawBytes = Buffer.from(record.originalValue, 'base64');
console.log(`Message size: ${rawBytes.length} bytes`);
console.log(`First 50 bytes (hex): ${rawBytes.slice(0, 50).toString('hex')}`);
```

#### Schema compatibility issues
Expand Down Expand Up @@ -311,7 +378,7 @@ For timeout issues:
* Implement chunked or asynchronous processing patterns for time-consuming operations
* Monitor and optimize database operations, external API calls, or other I/O operations in your handler

???+ tip "Monitoring memory usage"
!!! tip "Monitoring memory usage"
Use CloudWatch metrics to track your function's memory utilization. If it consistently exceeds 80% of allocated memory, consider increasing the memory allocation or optimizing your code.

## Kafka consumer workflow
Expand Down Expand Up @@ -342,4 +409,10 @@ For timeout issues:

## Testing your code

TBD
Testing Kafka consumer code requires simulating Lambda events with Kafka messages. You can create simple test cases using local JSON files without needing a live Kafka cluster. Below an example of how to simulate a JSON message.

=== "Testing your code"

```typescript
--8<-- "examples/snippets/kafka/advancedTestingYourCode.ts"
```
52 changes: 52 additions & 0 deletions examples/snippets/kafka/advancedBasicErrorHandling.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
declare function processRecord(record: unknown): Promise<void>;

import { readFileSync } from 'node:fs';
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
import { KafkaConsumerDeserializationError } from '@aws-lambda-powertools/kafka/errors';
import type { ConsumerRecord } from '@aws-lambda-powertools/kafka/types';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';

const logger = new Logger({ serviceName: 'kafka-consumer' });

const schemaConfig = {
value: {
type: SchemaType.AVRO,
schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'),
},
} satisfies SchemaConfig;

export const handler = kafkaConsumer(async (event, _context) => {
const results: {
successful: number;
failed: Array<ConsumerRecord<unknown, unknown>>;
} = {
successful: 0,
failed: [],
};
for (const record of event.records) {
try {
const { value, partition, offset, topic } = record; // (1)!
logger.setCorrelationId(`${topic}-${partition}-${offset}`);

await processRecord(value);

results.successful += 1;
} catch (error) {
if (error instanceof KafkaConsumerDeserializationError) {
results.failed.push(record);
logger.error('Error deserializing message', { error });
} else {
logger.error('Error processing message', { error });
}
}

if (results.failed.length > 0) {
// Handle failed records, e.g., send to a dead-letter queue
}

logger.info('Successfully processed records', {
successful: results.successful,
});
}
}, schemaConfig);
42 changes: 42 additions & 0 deletions examples/snippets/kafka/advancedHandlingLargeMessages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
declare function processRecordFromS3({
key,
bucket,
}: { key: string; bucket: string }): Promise<void>;

import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import { Logger } from '@aws-lambda-powertools/logger';
import { object, safeParse, string } from 'valibot';

const logger = new Logger({ serviceName: 'kafka-consumer' });

const LargeMessage = object({
key: string(),
bucket: string(),
});

export const handler = kafkaConsumer(async (event, _context) => {
for (const record of event.records) {
const { topic, value, originalValue } = record;
const valueSize = Buffer.byteLength(originalValue, 'utf8');
const parsedValue = safeParse(LargeMessage, value);
if (
topic === 'product-catalog' &&
valueSize > 3_000_000 &&
parsedValue.success
) {
logger.info('Large message detected, processing from S3', {
size: valueSize,
});

const { key, bucket } = parsedValue.output;
await processRecordFromS3({ key, bucket });

logger.info('Processed large message from S3', {
key,
bucket,
});
}

// regular processing of the record
}
});
Loading
Loading