Skip to content

Commit 2445889

Browse files
Refactoring tests
1 parent 13284b3 commit 2445889

File tree

5 files changed

+48
-35
lines changed

5 files changed

+48
-35
lines changed

aws_lambda_powertools/utilities/data_classes/kafka_event.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@
1010
from collections.abc import Iterator
1111

1212

13+
class ConsumerRecordSchemaMetadata(DictWrapper):
14+
@property
15+
def data_format(self) -> str | None:
16+
"""The data format of the Kafka record."""
17+
return self.get("dataFormat", None)
18+
19+
@property
20+
def schema_id(self) -> str | None:
21+
"""The schema id of the Kafka record."""
22+
return self.get("schemaId", None)
23+
24+
1325
class KafkaEventRecordBase(DictWrapper):
1426
@property
1527
def topic(self) -> str:
@@ -36,6 +48,22 @@ def timestamp_type(self) -> str:
3648
"""The Kafka record timestamp type."""
3749
return self["timestampType"]
3850

51+
@property
52+
def key_schema_metadata(self) -> ConsumerRecordSchemaMetadata | None:
53+
"""The metadata of the Key Kafka record."""
54+
return (
55+
None if self.get("keySchemaMetadata") is None else ConsumerRecordSchemaMetadata(self["keySchemaMetadata"])
56+
)
57+
58+
@property
59+
def value_schema_metadata(self) -> ConsumerRecordSchemaMetadata | None:
60+
"""The metadata of the Value Kafka record."""
61+
return (
62+
None
63+
if self.get("valueSchemaMetadata") is None
64+
else ConsumerRecordSchemaMetadata(self["valueSchemaMetadata"])
65+
)
66+
3967

4068
class KafkaEventRecord(KafkaEventRecordBase):
4169
@property

aws_lambda_powertools/utilities/kafka/consumer_records.py

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from functools import cached_property
44
from typing import TYPE_CHECKING, Any
55

6-
from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict, DictWrapper
6+
from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict
77
from aws_lambda_powertools.utilities.data_classes.kafka_event import KafkaEventBase, KafkaEventRecordBase
88
from aws_lambda_powertools.utilities.kafka.deserializer.deserializer import get_deserializer
99
from aws_lambda_powertools.utilities.kafka.serialization.serialization import serialize_to_output_type
@@ -14,18 +14,6 @@
1414
from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig
1515

1616

17-
class ConsumerRecordSchemaMetadata(DictWrapper):
18-
@property
19-
def data_format(self) -> str | None:
20-
"""The data format of the Kafka record."""
21-
return self.get("dataFormat", None)
22-
23-
@property
24-
def schema_id(self) -> str | None:
25-
"""The schema id of the Kafka record."""
26-
return self.get("schemaId", None)
27-
28-
2917
class ConsumerRecordRecords(KafkaEventRecordBase):
3018
"""
3119
A Kafka Consumer Record
@@ -114,22 +102,6 @@ def headers(self) -> dict[str, bytes]:
114102
"""Decodes the headers as a single dictionary."""
115103
return CaseInsensitiveDict((k, bytes(v)) for chunk in self.original_headers for k, v in chunk.items())
116104

117-
@property
118-
def key_schema_metadata(self) -> ConsumerRecordSchemaMetadata | None:
119-
"""The metadata of the Key Kafka record."""
120-
return (
121-
None if self.get("keySchemaMetadata") is None else ConsumerRecordSchemaMetadata(self["keySchemaMetadata"])
122-
)
123-
124-
@property
125-
def value_schema_metadata(self) -> ConsumerRecordSchemaMetadata | None:
126-
"""The metadata of the Value Kafka record."""
127-
return (
128-
None
129-
if self.get("valueSchemaMetadata") is None
130-
else ConsumerRecordSchemaMetadata(self["valueSchemaMetadata"])
131-
)
132-
133105

134106
class ConsumerRecords(KafkaEventBase):
135107
"""Self-managed or MSK Apache Kafka event trigger

aws_lambda_powertools/utilities/kafka/serialization/dataclass.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@
1212

1313
class DataclassOutputSerializer(OutputSerializerBase):
1414
def serialize(self, data: dict[str, Any], output: type[T] | Callable | None = None) -> T | dict[str, Any]:
15-
if output is None:
16-
return data
17-
1815
if not is_dataclass(output):
1916
raise ValueError("Output class must be a dataclass")
2017

aws_lambda_powertools/utilities/kafka/serialization/pydantic.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@
1414

1515
class PydanticOutputSerializer(OutputSerializerBase):
1616
def serialize(self, data: dict[str, Any], output: type[T] | Callable | None = None) -> T | dict[str, Any]:
17-
if output is None:
18-
return data
19-
2017
# Use TypeAdapter for better support of Union types and other complex types
2118
adapter: TypeAdapter = TypeAdapter(output)
2219
return adapter.validate_python(data)

tests/functional/kafka_consumer/required_dependencies/test_kafka_consumer.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,3 +272,22 @@ def handler(event: ConsumerRecords, context):
272272

273273
# Verify the results
274274
assert result is None
275+
276+
277+
def test_kafka_consumer_metadata_fields(kafka_event_with_json_data, lambda_context):
278+
"""Test Kafka consumer when no schema config is provided."""
279+
280+
kafka_event_with_json_data["records"]["my-topic-1"][0]["key"] = None
281+
282+
@kafka_consumer()
283+
def handler(event: ConsumerRecords, context):
284+
return event.record
285+
286+
# Call the handler
287+
result = handler(kafka_event_with_json_data, lambda_context)
288+
289+
# Verify the results
290+
assert result.original_value == kafka_event_with_json_data["records"]["my-topic-1"][0]["value"]
291+
assert result.original_key == kafka_event_with_json_data["records"]["my-topic-1"][0]["key"]
292+
assert result.original_headers == kafka_event_with_json_data["records"]["my-topic-1"][0]["headers"]
293+
assert result.headers

0 commit comments

Comments
 (0)