Skip to content

Commit cf5bd4b

Browse files
Adding protobuf tests
1 parent d371657 commit cf5bd4b

File tree

9 files changed

+412
-94
lines changed

9 files changed

+412
-94
lines changed

aws_lambda_powertools/utilities/kafka_consumer/deserializer/avro.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import io
4-
from typing import Any
54

65
from avro.io import BinaryDecoder, DatumReader
76
from avro.schema import parse as parse_schema
@@ -68,5 +67,5 @@ def deserialize(self, data: bytes | str) -> object:
6867
return self.reader.read(decoder)
6968
except Exception as e:
7069
raise KafkaConsumerDeserializationError(
71-
f"Error trying to deserializer avro data - {type(e).__name__}: {str(e)}",
70+
f"Error trying to deserialize avro data - {type(e).__name__}: {str(e)}",
7271
) from e

aws_lambda_powertools/utilities/kafka_consumer/deserializer/base.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import base64
44
from abc import ABC, abstractmethod
5-
from typing import Any, overload
5+
from typing import Any
66

77

88
class DeserializerBase(ABC):
@@ -29,7 +29,7 @@ class DeserializerBase(ABC):
2929
"""
3030

3131
@abstractmethod
32-
def deserialize(self, data: bytes | str) -> dict[str, Any] | str | object:
32+
def deserialize(self, data: str) -> dict[str, Any] | str | object:
3333
"""
3434
Deserialize input data to a Python dictionary.
3535
@@ -38,8 +38,8 @@ def deserialize(self, data: bytes | str) -> dict[str, Any] | str | object:
3838
3939
Parameters
4040
----------
41-
data : bytes or str
42-
The data to deserialize, either as bytes or as a string.
41+
data : str
42+
The data to deserialize, it's always a base64 encoded string
4343
4444
Returns
4545
-------
@@ -49,14 +49,9 @@ def deserialize(self, data: bytes | str) -> dict[str, Any] | str | object:
4949
raise NotImplementedError("Subclasses must implement the deserialize method")
5050

5151
def _decode_input(self, data: bytes | str) -> bytes:
52-
if isinstance(data, str):
52+
try:
5353
return base64.b64decode(data)
54-
elif isinstance(data, bytes):
55-
return data
56-
else:
57-
try:
58-
return base64.b64decode(data)
59-
except Exception as e:
60-
raise TypeError(
61-
f"Expected bytes or base64-encoded string, got {type(data).__name__}. Error: {str(e)}",
62-
) from e
54+
except Exception as e:
55+
raise TypeError(
56+
f"Expected bytes or base64-encoded string, got {type(data).__name__}. Error: {str(e)}",
57+
) from e

aws_lambda_powertools/utilities/kafka_consumer/deserializer/default.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import base64
4-
from typing import Any
54

65
from aws_lambda_powertools.utilities.kafka_consumer.deserializer.base import DeserializerBase
76

aws_lambda_powertools/utilities/kafka_consumer/deserializer/protobuf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,5 @@ def deserialize(self, data: bytes | str) -> dict:
6161
return MessageToDict(message, preserving_proto_field_name=True)
6262
except Exception as e:
6363
raise KafkaConsumerDeserializationError(
64-
f"Protocol Buffer deserialization error: {type(e).__name__}: {str(e)}",
64+
f"Error trying to deserialize protobuf data - {type(e).__name__}: {str(e)}",
6565
) from e

tests/functional/kafka_consumer/_avro/test_kafka_consumer_with_avro.py

Lines changed: 40 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import io
33
from copy import deepcopy
4+
from dataclasses import dataclass
45

56
import pytest
67
from avro.io import BinaryEncoder, DatumWriter
@@ -86,57 +87,58 @@ def kafka_event_with_avro_data(avro_encoded_value, avro_encoded_key):
8687
}
8788

8889

89-
def test_kafka_consumer_with_avro_and_dataclass(
90-
kafka_event_with_avro_data,
91-
avro_value_schema,
92-
lambda_context,
93-
user_value_dataclass,
94-
):
95-
"""Test Kafka consumer with Avro deserialization and dataclass output serialization."""
90+
@dataclass
91+
class UserValueDataClass:
92+
name: str
93+
age: int
94+
95+
96+
@dataclass
97+
class UserKeyClass:
98+
user_id: str
99+
100+
101+
def test_kafka_consumer_with_avro(kafka_event_with_avro_data, avro_value_schema, lambda_context):
102+
"""Test Kafka consumer with Avro deserialization without output serialization."""
96103

97104
# Create dict to capture results
98105
result_data = {}
99106

100-
schema_config = SchemaConfig(
101-
value_schema_type="AVRO",
102-
value_schema=avro_value_schema,
103-
value_output_serializer=user_value_dataclass,
104-
)
107+
schema_config = SchemaConfig(value_schema_type="AVRO", value_schema=avro_value_schema)
105108

106109
@kafka_consumer(schema_config=schema_config)
107110
def handler(event: ConsumerRecords, context):
108111
# Capture the results to verify
109112
record = next(event.records)
110113
result_data["value_type"] = type(record.value).__name__
111-
result_data["name"] = record.value.name
112-
result_data["age"] = record.value.age
114+
result_data["name"] = record.value["name"]
115+
result_data["age"] = record.value["age"]
113116
return {"processed": True}
114117

115118
# Call the handler
116119
result = handler(kafka_event_with_avro_data, lambda_context)
117120

118121
# Verify the results
119122
assert result == {"processed": True}
120-
assert result_data["value_type"] == "UserValueDataClass"
123+
assert result_data["value_type"] == "dict"
121124
assert result_data["name"] == "John Doe"
122125
assert result_data["age"] == 30
123126

124127

125-
def test_kafka_consumer_with_avro_and_custom_object(
128+
def test_kafka_consumer_with_avro_and_dataclass(
126129
kafka_event_with_avro_data,
127130
avro_value_schema,
128131
lambda_context,
129-
user_value_dict,
130132
):
131-
"""Test Kafka consumer with Avro deserialization and custom object serialization."""
133+
"""Test Kafka consumer with Avro deserialization and dataclass output serialization."""
132134

133135
# Create dict to capture results
134136
result_data = {}
135137

136138
schema_config = SchemaConfig(
137139
value_schema_type="AVRO",
138140
value_schema=avro_value_schema,
139-
value_output_serializer=user_value_dict,
141+
value_output_serializer=UserValueDataClass,
140142
)
141143

142144
@kafka_consumer(schema_config=schema_config)
@@ -153,34 +155,43 @@ def handler(event: ConsumerRecords, context):
153155

154156
# Verify the results
155157
assert result == {"processed": True}
156-
assert result_data["value_type"] == "UserValueDict"
158+
assert result_data["value_type"] == "UserValueDataClass"
157159
assert result_data["name"] == "John Doe"
158160
assert result_data["age"] == 30
159161

160162

161-
def test_kafka_consumer_with_avro_raw(kafka_event_with_avro_data, avro_value_schema, lambda_context):
162-
"""Test Kafka consumer with Avro deserialization without output serialization."""
163+
def test_kafka_consumer_with_avro_and_custom_object(
164+
kafka_event_with_avro_data,
165+
avro_value_schema,
166+
lambda_context,
167+
user_value_dict,
168+
):
169+
"""Test Kafka consumer with Avro deserialization and custom object serialization."""
163170

164171
# Create dict to capture results
165172
result_data = {}
166173

167-
schema_config = SchemaConfig(value_schema_type="AVRO", value_schema=avro_value_schema)
174+
schema_config = SchemaConfig(
175+
value_schema_type="AVRO",
176+
value_schema=avro_value_schema,
177+
value_output_serializer=user_value_dict,
178+
)
168179

169180
@kafka_consumer(schema_config=schema_config)
170181
def handler(event: ConsumerRecords, context):
171182
# Capture the results to verify
172183
record = next(event.records)
173184
result_data["value_type"] = type(record.value).__name__
174-
result_data["name"] = record.value["name"]
175-
result_data["age"] = record.value["age"]
185+
result_data["name"] = record.value.name
186+
result_data["age"] = record.value.age
176187
return {"processed": True}
177188

178189
# Call the handler
179190
result = handler(kafka_event_with_avro_data, lambda_context)
180191

181192
# Verify the results
182193
assert result == {"processed": True}
183-
assert result_data["value_type"] == "dict"
194+
assert result_data["value_type"] == "UserValueDict"
184195
assert result_data["name"] == "John Doe"
185196
assert result_data["age"] == 30
186197

@@ -207,7 +218,7 @@ def lambda_handler(event: ConsumerRecords, context):
207218

208219
# The exact error message may vary depending on the Avro library's internals,
209220
# but should indicate a deserialization problem
210-
assert "Error trying to deserializer avro data" in str(excinfo.value)
221+
assert "Error trying to deserialize avro data" in str(excinfo.value)
211222

212223

213224
def test_kafka_consumer_with_invalid_avro_schema(kafka_event_with_avro_data, lambda_context):
@@ -245,8 +256,6 @@ def test_kafka_consumer_with_key_deserialization(
245256
lambda_context,
246257
avro_value_schema,
247258
avro_key_schema,
248-
user_value_dataclass,
249-
user_key_dataclass,
250259
):
251260
"""Test deserializing both key and value with different schemas and serializers."""
252261

@@ -257,10 +266,10 @@ def test_kafka_consumer_with_key_deserialization(
257266
schema_config = SchemaConfig(
258267
value_schema_type="AVRO",
259268
value_schema=avro_value_schema,
260-
value_output_serializer=user_value_dataclass,
269+
value_output_serializer=UserValueDataClass,
261270
key_schema_type="AVRO",
262271
key_schema=avro_key_schema,
263-
key_output_serializer=user_key_dataclass,
272+
key_output_serializer=UserKeyClass,
264273
)
265274

266275
@kafka_consumer(schema_config=schema_config)
@@ -283,48 +292,3 @@ def lambda_handler(event: ConsumerRecords, context):
283292
assert key_value_result["value_type"] == "UserValueDataClass"
284293
assert key_value_result["value_name"] == "John Doe"
285294
assert key_value_result["value_age"] == 30
286-
287-
288-
def test_kafka_consumer_with_different_serializers_for_key_and_value(
289-
kafka_event_with_avro_data,
290-
lambda_context,
291-
avro_value_schema,
292-
avro_key_schema,
293-
user_key_dataclass,
294-
user_value_dict,
295-
):
296-
"""Test using different serializer types for key and value."""
297-
298-
# Create dict to capture results
299-
results = {}
300-
301-
# Create schema config with different serializers
302-
schema_config = SchemaConfig(
303-
value_schema_type="AVRO",
304-
value_schema=avro_value_schema,
305-
value_output_serializer=user_value_dict,
306-
key_schema_type="AVRO",
307-
key_schema=avro_key_schema,
308-
key_output_serializer=user_key_dataclass,
309-
)
310-
311-
@kafka_consumer(schema_config=schema_config)
312-
def handler(event: ConsumerRecords, context):
313-
record = next(event.records)
314-
results["key_type"] = type(record.key).__name__
315-
results["key_id"] = record.key.user_id
316-
results["value_type"] = type(record.value).__name__
317-
results["value_name"] = record.value.name
318-
results["value_age"] = record.value.age
319-
return {"processed": True}
320-
321-
# Call the handler
322-
result = handler(kafka_event_with_avro_data, lambda_context)
323-
324-
# Verify the results
325-
assert result == {"processed": True}
326-
assert results["key_type"] == "UserKeyClass"
327-
assert results["key_id"] == "user-123"
328-
assert results["value_type"] == "UserValueDict"
329-
assert results["value_name"] == "John Doe"
330-
assert results["value_age"] == 30

tests/functional/kafka_consumer/_protobuf/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)