@@ -59,63 +59,56 @@ class UserKeyClass:
59
59
60
60
61
61
def test_kafka_consumer_with_json (kafka_event_with_json_data , lambda_context ):
62
- """Test Kafka consumer with JSON deserialization without output serialization."""
63
-
64
- # Create dict to capture results
65
- result_data = {}
66
-
62
+ # GIVEN
63
+ # A Kafka consumer configured to deserialize JSON data
64
+ # without any additional output serialization
67
65
schema_config = SchemaConfig (value_schema_type = "JSON" )
68
66
69
67
@kafka_consumer (schema_config = schema_config )
70
68
def handler (event : ConsumerRecords , context ):
71
- # Capture the results to verify
72
- record = next (event .records )
73
- result_data ["value_type" ] = type (record .value ).__name__
74
- result_data ["name" ] = record .value ["name" ]
75
- result_data ["age" ] = record .value ["age" ]
76
- return {"processed" : True }
69
+ # Return the deserialized JSON value for verification
70
+ return event .record .value
77
71
78
- # Call the handler
72
+ # WHEN
73
+ # The handler processes a Kafka event containing JSON-encoded data
79
74
result = handler (kafka_event_with_json_data , lambda_context )
80
75
81
- # Verify the results
82
- assert result == { "processed" : True }
83
- assert result_data [ "value_type" ] == "dict"
84
- assert result_data ["name" ] == "John Doe"
85
- assert result_data ["age" ] == 30
76
+ # THEN
77
+ # The JSON should be correctly deserialized into a Python dictionary
78
+ # with the expected field values
79
+ assert result ["name" ] == "John Doe"
80
+ assert result ["age" ] == 30
86
81
87
82
88
83
def test_kafka_consumer_with_json_and_dataclass (kafka_event_with_json_data , lambda_context ):
89
- """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
90
-
91
- # Create dict to capture results
92
- result_data = {}
93
-
84
+ # GIVEN
85
+ # A Kafka consumer configured to deserialize JSON data
86
+ # and convert it to a UserValueDataClass instance
94
87
schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueDataClass )
95
88
96
89
@kafka_consumer (schema_config = schema_config )
97
90
def handler (event : ConsumerRecords , context ):
98
- # Capture the results to verify
99
- record = next ( event . records )
100
- result_data [ "value_type" ] = type ( record .value ). __name__
101
- result_data [ "name" ] = record . value . name
102
- result_data [ "age" ] = record . value . age
103
- return { "processed" : True }
104
-
105
- # Call the handler
91
+ # Extract the deserialized and serialized value
92
+ # which should be a UserValueDataClass instance
93
+ value : UserValueDataClass = event . record .value
94
+ return value
95
+
96
+ # WHEN
97
+ # The handler processes a Kafka event containing JSON-encoded data
98
+ # which is deserialized into a dictionary and then converted to a dataclass
106
99
result = handler (kafka_event_with_json_data , lambda_context )
107
100
108
- # Verify the results
109
- assert result == {"processed" : True }
110
- assert result_data ["value_type" ] == "UserValueDataClass"
111
- assert result_data ["name" ] == "John Doe"
112
- assert result_data ["age" ] == 30
101
+ # THEN
102
+ # The result should be a UserValueDataClass instance
103
+ # with the correct property values from the original JSON
104
+ assert isinstance (result , UserValueDataClass )
105
+ assert result .name == "John Doe"
106
+ assert result .age == 30
113
107
114
108
115
109
def test_kafka_consumer_with_invalid_json_data (kafka_event_with_json_data , lambda_context ):
116
- """Test error handling when JSON data is invalid."""
117
-
118
- # Create invalid JSON data
110
+ # GIVEN
111
+ # A Kafka event with raw string data that is not valid base64-encoded JSON
119
112
invalid_data = "invalid json data"
120
113
kafka_event_with_json_data = deepcopy (kafka_event_with_json_data )
121
114
kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["value" ] = invalid_data
@@ -124,33 +117,34 @@ def test_kafka_consumer_with_invalid_json_data(kafka_event_with_json_data, lambd
124
117
125
118
@kafka_consumer (schema_config = schema_config )
126
119
def handler (event : ConsumerRecords , context ):
127
- # This should never be reached if deserializer fails
128
- record = next (event .records )
129
- assert record .value
130
- return {"processed" : True }
120
+ return event .record .value
131
121
132
- # This should raise a deserialization error
122
+ # WHEN/THEN
123
+ # The handler should fail to process the invalid JSON data
124
+ # and raise a specific deserialization error
133
125
with pytest .raises (KafkaConsumerDeserializationError ) as excinfo :
134
126
handler (kafka_event_with_json_data , lambda_context )
135
127
128
+ # Ensure the error contains useful diagnostic information
136
129
assert "Error trying to deserialize json data" in str (excinfo .value )
137
130
138
131
139
- # Tests for Complex Types with Pydantic TypeAdapter
140
- def test_kafka_consumer_with_multiple_records (lambda_context ):
141
- """Test processing multiple records in a single event."""
142
-
143
- # Create data for multiple records
132
+ def test_kafka_consumer_with_multiple_records_json (lambda_context ):
133
+ # GIVEN
134
+ # Three different user records to process
135
+ # First user: John Doe, age 30
144
136
data1 = {"name" : "John Doe" , "age" : 30 }
137
+ # Second user: Jane Smith, age 25
145
138
data2 = {"name" : "Jane Smith" , "age" : 25 }
139
+ # Third user: Bob Johnson, age 40
146
140
data3 = {"name" : "Bob Johnson" , "age" : 40 }
147
141
148
- # Encode the data
142
+ # Base64-encoded JSON data for each record
149
143
encoded1 = base64 .b64encode (json .dumps (data1 ).encode ("utf-8" )).decode ("utf-8" )
150
144
encoded2 = base64 .b64encode (json .dumps (data2 ).encode ("utf-8" )).decode ("utf-8" )
151
145
encoded3 = base64 .b64encode (json .dumps (data3 ).encode ("utf-8" )).decode ("utf-8" )
152
146
153
- # Create a kafka event with multiple records
147
+ # A Kafka event containing multiple records across different offsets
154
148
multi_record_event = {
155
149
"eventSource" : "aws:kafka" ,
156
150
"records" : {
@@ -189,105 +183,149 @@ def test_kafka_consumer_with_multiple_records(lambda_context):
189
183
},
190
184
}
191
185
192
- # Create schema config
193
- schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueDataClass )
194
-
195
- # Create list to store processed records
186
+ # A list to capture processed record details
196
187
processed_records = []
197
188
189
+ # A Kafka consumer configured to deserialize JSON and convert to dataclass instances
190
+ schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueDataClass )
191
+
198
192
@kafka_consumer (schema_config = schema_config )
199
193
def handler (event : ConsumerRecords , context ):
200
- # Process all records
194
+ # Process each record and collect its properties
201
195
for record in event .records :
202
196
processed_records .append ({"name" : record .value .name , "age" : record .value .age })
203
197
return {"processed" : len (processed_records )}
204
198
205
- # Call the handler
199
+ # WHEN
200
+ # The handler processes the Kafka event containing multiple JSON records
206
201
result = handler (multi_record_event , lambda_context )
207
202
208
- # Verify the results
203
+ # THEN
204
+ # The handler should successfully process all three records
205
+ # and return the correct count
209
206
assert result == {"processed" : 3 }
210
207
assert len (processed_records ) == 3
208
+
209
+ # All three users should be correctly deserialized into dataclass instances
210
+ # and their properties should be accessible
211
211
assert any (r ["name" ] == "John Doe" and r ["age" ] == 30 for r in processed_records )
212
212
assert any (r ["name" ] == "Jane Smith" and r ["age" ] == 25 for r in processed_records )
213
213
assert any (r ["name" ] == "Bob Johnson" and r ["age" ] == 40 for r in processed_records )
214
214
215
215
216
216
def test_kafka_consumer_default_deserializer_value (kafka_event_with_json_data , lambda_context ):
217
- """Test Kafka consumer when no schema config is provided."""
217
+ # GIVEN
218
+ # A simple string message encoded in base64
219
+ raw_data = b"data"
220
+ base64_data = base64 .b64encode (raw_data ).decode ("utf-8" )
218
221
219
- base64_data = base64 . b64encode ( b" data" )
220
- kafka_event_with_json_data = deepcopy (kafka_event_with_json_data )
221
- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["value" ] = base64_data
222
+ # A Kafka event with the base64-encoded data as value
223
+ basic_kafka_event = deepcopy (kafka_event_with_json_data )
224
+ basic_kafka_event ["records" ]["my-topic-1" ][0 ]["value" ] = base64_data
222
225
226
+ # A Kafka consumer with no schema configuration specified
227
+ # which should default to base64 decoding only
223
228
@kafka_consumer ()
224
229
def handler (event : ConsumerRecords , context ):
225
- # Capture the results to verify
230
+ # Get the first record's value
226
231
record = next (event .records )
227
- # Should get raw base64-encoded data with no deserialization
232
+ # Should receive UTF-8 decoded data with no further processing
228
233
return record .value
229
234
230
- # Call the handler
231
- result = handler (kafka_event_with_json_data , lambda_context )
235
+ # WHEN
236
+ # The handler processes the Kafka event with default deserializer
237
+ result = handler (basic_kafka_event , lambda_context )
232
238
233
- # Verify the results
239
+ # THEN
240
+ # The result should be the UTF-8 decoded string from the base64 data
241
+ # with no additional deserialization applied
234
242
assert result == "data"
243
+ assert isinstance (result , str )
235
244
236
245
237
246
def test_kafka_consumer_default_deserializer_key (kafka_event_with_json_data , lambda_context ):
238
- """Test Kafka consumer when no schema config is provided."""
247
+ # GIVEN
248
+ # A simple string message encoded in base64 for the key
249
+ raw_key_data = b"data"
250
+ base64_key = base64 .b64encode (raw_key_data ).decode ("utf-8" )
239
251
240
- base64_data = base64 . b64encode ( b" data" )
241
- kafka_event_with_json_data = deepcopy (kafka_event_with_json_data )
242
- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ] = base64_data
252
+ # A Kafka event with the base64-encoded data as key
253
+ kafka_event_with_key = deepcopy (kafka_event_with_json_data )
254
+ kafka_event_with_key ["records" ]["my-topic-1" ][0 ]["key" ] = base64_key
243
255
256
+ # A Kafka consumer with no schema configuration specified
257
+ # which should default to base64 decoding only
244
258
@kafka_consumer ()
245
259
def handler (event : ConsumerRecords , context ):
246
- # Capture the results to verify
260
+ # Get the first record's key
247
261
record = next (event .records )
248
- # Should get raw base64-encoded data with no deserialization
262
+ # Should receive UTF-8 decoded key with no further processing
249
263
return record .key
250
264
251
- # Call the handler
252
- result = handler (kafka_event_with_json_data , lambda_context )
265
+ # WHEN
266
+ # The handler processes the Kafka event with default key deserializer
267
+ result = handler (kafka_event_with_key , lambda_context )
253
268
254
- # Verify the results
269
+ # THEN
270
+ # The key should be the UTF-8 decoded string from the base64 data
271
+ # with no additional deserialization or transformation applied
255
272
assert result == "data"
273
+ assert isinstance (result , str )
256
274
257
275
258
276
def test_kafka_consumer_default_deserializer_key_is_none (kafka_event_with_json_data , lambda_context ):
259
- """Test Kafka consumer when no schema config is provided."""
260
-
261
- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ] = None
277
+ # GIVEN
278
+ # A Kafka event with a null key in the record
279
+ kafka_event_with_null_key = deepcopy (kafka_event_with_json_data )
280
+ kafka_event_with_null_key ["records" ]["my-topic-1" ][0 ]["key" ] = None
262
281
282
+ # A Kafka consumer with no schema configuration specified
263
283
@kafka_consumer ()
264
284
def handler (event : ConsumerRecords , context ):
265
- # Capture the results to verify
285
+ # Get the first record's key which should be None
266
286
record = next (event .records )
267
- # Should get raw base64-encoded data with no deserialization
268
287
return record .key
269
288
270
- # Call the handler
271
- result = handler (kafka_event_with_json_data , lambda_context )
289
+ # WHEN
290
+ # The handler processes the Kafka event with a null key
291
+ result = handler (kafka_event_with_null_key , lambda_context )
272
292
273
- # Verify the results
293
+ # THEN
294
+ # The key should be preserved as None without any attempt at deserialization
274
295
assert result is None
275
296
276
297
277
298
def test_kafka_consumer_metadata_fields (kafka_event_with_json_data , lambda_context ):
278
- """Test Kafka consumer when no schema config is provided."""
279
-
280
- kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ] = None
299
+ # GIVEN
300
+ # A Kafka event with specific metadata we want to verify is preserved
301
+ kafka_event = deepcopy (kafka_event_with_json_data )
302
+ kafka_event ["records" ]["my-topic-1" ][0 ]["key" ] = None
281
303
304
+ # A Kafka consumer with no schema configuration
305
+ # that returns the full record object for inspection
282
306
@kafka_consumer ()
283
307
def handler (event : ConsumerRecords , context ):
284
308
return event .record
285
309
286
- # Call the handler
287
- result = handler (kafka_event_with_json_data , lambda_context )
310
+ # WHEN
311
+ # The handler processes the Kafka event and returns the record object
312
+ result = handler (kafka_event , lambda_context )
313
+
314
+ # THEN
315
+ # The record should preserve all original metadata fields
288
316
289
- # Verify the results
290
- assert result .original_value == kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["value" ]
291
- assert result .original_key == kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["key" ]
292
- assert result .original_headers == kafka_event_with_json_data ["records" ]["my-topic-1" ][0 ]["headers" ]
317
+ # Original encoded values should be preserved
318
+ assert result .original_value == kafka_event ["records" ]["my-topic-1" ][0 ]["value" ]
319
+ assert result .original_key == kafka_event ["records" ]["my-topic-1" ][0 ]["key" ]
320
+
321
+ # Original headers array should be preserved
322
+ assert result .original_headers == kafka_event ["records" ]["my-topic-1" ][0 ]["headers" ]
323
+
324
+ # Headers should be parsed into a dictionary for easy access
293
325
assert result .headers == {"headerKey" : b"headerValue" }
326
+
327
+ # Additional metadata checks could be added here:
328
+ assert result .topic == kafka_event ["records" ]["my-topic-1" ][0 ]["topic" ]
329
+ assert result .partition == kafka_event ["records" ]["my-topic-1" ][0 ]["partition" ]
330
+ assert result .offset == kafka_event ["records" ]["my-topic-1" ][0 ]["offset" ]
331
+ assert result .timestamp == kafka_event ["records" ]["my-topic-1" ][0 ]["timestamp" ]
0 commit comments