@@ -100,42 +100,28 @@ class UserKeyClass:
100
100
101
101
102
102
def test_kafka_consumer_with_avro (kafka_event_with_avro_data , avro_value_schema , lambda_context ):
103
- """Test Kafka consumer with Avro deserialization without output serialization."""
104
-
105
- # Create dict to capture results
106
- result_data = {}
107
-
103
+ # GIVEN A Kafka consumer configured with Avro schema deserialization
108
104
schema_config = SchemaConfig (value_schema_type = "AVRO" , value_schema = avro_value_schema )
109
105
110
106
@kafka_consumer (schema_config = schema_config )
111
107
def handler (event : ConsumerRecords , context ):
112
- # Capture the results to verify
113
- record = next (event .records )
114
- result_data ["value_type" ] = type (record .value ).__name__
115
- result_data ["name" ] = record .value ["name" ]
116
- result_data ["age" ] = record .value ["age" ]
117
- return {"processed" : True }
108
+ return event .record .value
118
109
119
- # Call the handler
110
+ # WHEN The handler processes the Kafka event containing Avro-encoded data
120
111
result = handler (kafka_event_with_avro_data , lambda_context )
121
112
122
- # Verify the results
123
- assert result == {"processed" : True }
124
- assert result_data ["value_type" ] == "dict"
125
- assert result_data ["name" ] == "John Doe"
126
- assert result_data ["age" ] == 30
113
+ # THEN The Avro data should be correctly deserialized into a Python dictionary
114
+ assert result ["name" ] == "John Doe"
115
+ assert result ["age" ] == 30
127
116
128
117
129
118
def test_kafka_consumer_with_avro_and_dataclass (
130
119
kafka_event_with_avro_data ,
131
120
avro_value_schema ,
132
121
lambda_context ,
133
122
):
134
- """Test Kafka consumer with Avro deserialization and dataclass output serialization."""
135
-
136
- # Create dict to capture results
137
- result_data = {}
138
-
123
+ # GIVEN A Kafka consumer configured with Avro schema deserialization
124
+ # and a dataclass for output serialization
139
125
schema_config = SchemaConfig (
140
126
value_schema_type = "AVRO" ,
141
127
value_schema = avro_value_schema ,
@@ -145,35 +131,33 @@ def test_kafka_consumer_with_avro_and_dataclass(
145
131
@kafka_consumer (schema_config = schema_config )
146
132
def handler (event : ConsumerRecords , context ):
147
133
# Capture the results to verify
148
- record = next (event .records )
149
- result_data ["value_type" ] = type (record .value ).__name__
150
- result_data ["name" ] = record .value .name
151
- result_data ["age" ] = record .value .age
152
- return {"processed" : True }
134
+ value : UserValueDataClass = event .record .value
135
+ return value
153
136
154
- # Call the handler
137
+ # WHEN The handler processes the Kafka event containing Avro-encoded data
138
+ # and serializes the output as a UserValueDataClass instance
155
139
result = handler (kafka_event_with_avro_data , lambda_context )
156
140
157
- # Verify the results
158
- assert result == { "processed" : True }
159
- assert result_data [ "value_type" ] == "UserValueDataClass "
160
- assert result_data [ "name" ] == "John Doe"
161
- assert result_data [ "age" ] == 30
141
+ # THEN The Avro data should be correctly deserialized and converted to a dataclass instance
142
+ # with the expected property values
143
+ assert result . name == "John Doe "
144
+ assert result . age == 30
145
+ assert isinstance ( result , UserValueDataClass )
162
146
163
147
164
- def test_kafka_consumer_with_avro_and_custom_object (
148
+ def test_kafka_consumer_with_avro_and_custom_function (
165
149
kafka_event_with_avro_data ,
166
150
avro_value_schema ,
167
151
lambda_context ,
168
152
):
169
- """Test Kafka consumer with Avro deserialization and custom object serialization."""
170
-
153
+ # GIVEN A custom serialization function that removes the age field from the dictionary
171
154
def dict_output (data : dict ) -> dict :
155
+ # removing age key
156
+ del data ["age" ]
172
157
return data
173
158
174
- # Create dict to capture results
175
- result_data = {}
176
-
159
+ # A Kafka consumer configured with Avro schema deserialization
160
+ # and a custom function for output transformation
177
161
schema_config = SchemaConfig (
178
162
value_schema_type = "AVRO" ,
179
163
value_schema = avro_value_schema ,
@@ -183,23 +167,20 @@ def dict_output(data: dict) -> dict:
183
167
@kafka_consumer (schema_config = schema_config )
184
168
def handler (event : ConsumerRecords , context ):
185
169
# Capture the results to verify
186
- record = next (event .records )
187
- result_data ["name" ] = record .value .get ("name" )
188
- result_data ["age" ] = record .value .get ("age" )
189
- return {"processed" : True }
170
+ return event .record .value
190
171
191
- # Call the handler
172
+ # WHEN The handler processes the Kafka event containing Avro-encoded data
173
+ # and applies the custom transformation function to the output
192
174
result = handler (kafka_event_with_avro_data , lambda_context )
193
175
194
- # Verify the results
195
- assert result == { "processed" : True }
196
- assert result_data ["name" ] == "John Doe"
197
- assert result_data [ "age" ] == 30
176
+ # THEN The Avro data should be correctly deserialized and transformed
177
+ # with the name field intact but the age field removed
178
+ assert result ["name" ] == "John Doe"
179
+ assert "age" not in result
198
180
199
181
200
182
def test_kafka_consumer_with_invalid_avro_data (kafka_event_with_avro_data , lambda_context , avro_value_schema ):
201
- """Test error handling when Avro data is invalid."""
202
- # Create invalid avro data
183
+ # GIVEN A Kafka event with deliberately corrupted Avro data
203
184
invalid_data = base64 .b64encode (b"invalid avro data" ).decode ("utf-8" )
204
185
kafka_event_with_avro_data_temp = deepcopy (kafka_event_with_avro_data )
205
186
kafka_event_with_avro_data_temp ["records" ]["my-topic-1" ][0 ]["value" ] = invalid_data
@@ -209,11 +190,11 @@ def test_kafka_consumer_with_invalid_avro_data(kafka_event_with_avro_data, lambd
209
190
@kafka_consumer (schema_config = schema_config )
210
191
def lambda_handler (event : ConsumerRecords , context ):
211
192
# This should never be reached if deserializer fails
212
- record = next (event .records )
213
- assert record .value
214
- return {"processed" : True }
193
+ return event .record .value
215
194
216
- # This should raise a deserialization error
195
+ # WHEN/THEN
196
+ # The handler should fail to process the invalid Avro data
197
+ # and raise a specific deserialization error
217
198
with pytest .raises (KafkaConsumerDeserializationError ) as excinfo :
218
199
lambda_handler (kafka_event_with_avro_data_temp , lambda_context )
219
200
@@ -223,8 +204,8 @@ def lambda_handler(event: ConsumerRecords, context):
223
204
224
205
225
206
def test_kafka_consumer_with_invalid_avro_schema (kafka_event_with_avro_data , lambda_context ):
226
- """Test error handling when Avro data is invalid."""
227
-
207
+ # GIVEN
208
+ # An intentionally malformed Avro schema with syntax errors
228
209
avro_schema = """
229
210
{
230
211
"type": "record",
@@ -234,16 +215,17 @@ def test_kafka_consumer_with_invalid_avro_schema(kafka_event_with_avro_data, lam
234
215
}
235
216
"""
236
217
218
+ # A Kafka consumer configured with the invalid schema
237
219
schema_config = SchemaConfig (value_schema_type = "AVRO" , value_schema = avro_schema )
238
220
239
221
@kafka_consumer (schema_config = schema_config )
240
222
def lambda_handler (event : ConsumerRecords , context ):
241
223
# This should never be reached if deserializer fails
242
- record = next (event .records )
243
- assert record .value
244
- return {"processed" : True }
224
+ return event .record .value
245
225
246
- # This should raise a deserialization error
226
+ # WHEN/THEN
227
+ # The handler should fail during initialization when it tries to parse the schema
228
+ # and raise a specific schema parser error
247
229
with pytest .raises (KafkaConsumerAvroSchemaParserError ) as excinfo :
248
230
lambda_handler (kafka_event_with_avro_data , lambda_context )
249
231
@@ -260,10 +242,10 @@ def test_kafka_consumer_with_key_deserialization(
260
242
):
261
243
"""Test deserializing both key and value with different schemas and serializers."""
262
244
263
- # Create dict to capture results
264
245
key_value_result = {}
265
246
266
- # Create schema config with both key and value
247
+ # GIVEN A Kafka consumer configured with Avro schemas for both key and value
248
+ # with different output serializers for each
267
249
schema_config = SchemaConfig (
268
250
value_schema_type = "AVRO" ,
269
251
value_schema = avro_value_schema ,
@@ -283,27 +265,47 @@ def lambda_handler(event: ConsumerRecords, context):
283
265
key_value_result ["value_age" ] = record .value .age
284
266
return {"processed" : True }
285
267
286
- # Call the handler
268
+ # WHEN
269
+ # The handler processes the Kafka event, deserializing both key and value
287
270
result = lambda_handler (kafka_event_with_avro_data , lambda_context )
288
271
289
- # Verify the results
272
+ # THEN
273
+ # The handler should return success and the captured properties should match expectations
290
274
assert result == {"processed" : True }
275
+
276
+ # Key should be correctly deserialized into a UserKeyClass instance
291
277
assert key_value_result ["key_type" ] == "UserKeyClass"
292
278
assert key_value_result ["key_id" ] == "user-123"
279
+
280
+ # Value should be correctly deserialized into a UserValueDataClass instance
293
281
assert key_value_result ["value_type" ] == "UserValueDataClass"
294
282
assert key_value_result ["value_name" ] == "John Doe"
295
283
assert key_value_result ["value_age" ] == 30
296
284
297
285
298
286
def test_kafka_consumer_without_avro_value_schema ():
299
- """Test error handling when Avro data is invalid."""
287
+ # GIVEN
288
+ # A scenario where AVRO schema type is specified for value
289
+ # but no actual schema is provided
300
290
301
- with pytest .raises (KafkaConsumerMissingSchemaError ):
291
+ # WHEN/THEN
292
+ # SchemaConfig initialization should fail with an appropriate error
293
+ with pytest .raises (KafkaConsumerMissingSchemaError ) as excinfo :
302
294
SchemaConfig (value_schema_type = "AVRO" , value_schema = None )
303
295
296
+ # Verify the error message mentions 'value_schema'
297
+ assert "value_schema" in str (excinfo .value )
298
+
304
299
305
300
def test_kafka_consumer_without_avro_key_schema ():
306
- """Test error handling when Avro data is invalid."""
301
+ # GIVEN
302
+ # A scenario where AVRO schema type is specified for key
303
+ # but no actual schema is provided
307
304
308
- with pytest .raises (KafkaConsumerMissingSchemaError ):
305
+ # WHEN/THEN
306
+ # SchemaConfig initialization should fail with an appropriate error
307
+ with pytest .raises (KafkaConsumerMissingSchemaError ) as excinfo :
309
308
SchemaConfig (key_schema_type = "AVRO" , key_schema = None )
309
+
310
+ # Verify the error message mentions 'key_schema'
311
+ assert "key_schema" in str (excinfo .value )
0 commit comments