|
1 | 1 | import base64
|
2 | 2 | import json
|
| 3 | +from typing import Literal, Union |
3 | 4 |
|
4 | 5 | import pytest
|
5 |
| -from pydantic import BaseModel |
| 6 | +from pydantic import BaseModel, Field |
6 | 7 |
|
7 | 8 | from aws_lambda_powertools.utilities.kafka.consumer_records import ConsumerRecords
|
8 | 9 | from aws_lambda_powertools.utilities.kafka.kafka_consumer import kafka_consumer
|
@@ -79,6 +80,44 @@ def handler(event: ConsumerRecords, context):
|
79 | 80 | assert result_data["age"] == 30
|
80 | 81 |
|
81 | 82 |
|
| 83 | +def test_kafka_consumer_with_json_value_and_union_tag(kafka_event_with_json_data, lambda_context): |
| 84 | + """Test Kafka consumer with JSON deserialization and dataclass output serialization.""" |
| 85 | + |
| 86 | + # Create dict to capture results |
| 87 | + result_data = {} |
| 88 | + |
| 89 | + class UserValueModel(BaseModel): |
| 90 | + name: Literal["John Doe"] |
| 91 | + age: int |
| 92 | + |
| 93 | + class UserValueModel2(BaseModel): |
| 94 | + name: Literal["Not using"] |
| 95 | + email: str |
| 96 | + |
| 97 | + class Model(BaseModel): |
| 98 | + name: Union[UserValueModel, UserValueModel2] = Field(discriminator="name") |
| 99 | + |
| 100 | + schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=UserValueModel) |
| 101 | + |
| 102 | + @kafka_consumer(schema_config=schema_config) |
| 103 | + def handler(event: ConsumerRecords, context): |
| 104 | + # Capture the results to verify |
| 105 | + record = next(event.records) |
| 106 | + result_data["value_type"] = type(record.value).__name__ |
| 107 | + result_data["name"] = record.value.name |
| 108 | + result_data["age"] = record.value.age |
| 109 | + return {"processed": True} |
| 110 | + |
| 111 | + # Call the handler |
| 112 | + result = handler(kafka_event_with_json_data, lambda_context) |
| 113 | + |
| 114 | + # Verify the results |
| 115 | + assert result == {"processed": True} |
| 116 | + assert result_data["value_type"] == "UserValueModel" |
| 117 | + assert result_data["name"] == "John Doe" |
| 118 | + assert result_data["age"] == 30 |
| 119 | + |
| 120 | + |
82 | 121 | def test_kafka_consumer_with_json_key_and_pydantic(kafka_event_with_json_data, lambda_context):
|
83 | 122 | """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
|
84 | 123 |
|
|
0 commit comments