@@ -54,38 +54,33 @@ class UserKeyModel(BaseModel):
54
54
55
55
56
56
def test_kafka_consumer_with_json_value_and_pydantic (kafka_event_with_json_data , lambda_context ):
57
- """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
58
-
59
- # Create dict to capture results
60
- result_data = {}
61
-
57
+ # GIVEN
58
+ # A Kafka consumer configured to deserialize JSON data
59
+ # and convert it to a Pydantic model instance
62
60
schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueModel )
63
61
64
62
@kafka_consumer (schema_config = schema_config )
65
63
def handler (event : ConsumerRecords , context ):
66
- # Capture the results to verify
67
- record = next ( event . records )
68
- result_data [ "value_type" ] = type ( record .value ). __name__
69
- result_data [ "name" ] = record . value . name
70
- result_data [ "age" ] = record . value . age
71
- return { "processed" : True }
72
-
73
- # Call the handler
64
+ # Extract the deserialized and serialized value
65
+ # which should be a UserValueModel instance
66
+ value : UserValueModel = event . record .value
67
+ return value
68
+
69
+ # WHEN
70
+ # The handler processes a Kafka event containing JSON-encoded data
71
+ # which is deserialized into a dictionary and then converted to a Pydantic model
74
72
result = handler (kafka_event_with_json_data , lambda_context )
75
73
76
- # Verify the results
77
- assert result == { "processed" : True }
78
- assert result_data [ "value_type" ] == " UserValueModel"
79
- assert result_data [ " name" ] == "John Doe"
80
- assert result_data [ " age" ] == 30
74
+ # THEN
75
+ # The result should be a UserValueModel instance with the correct properties
76
+ assert isinstance ( result , UserValueModel )
77
+ assert result . name == "John Doe"
78
+ assert result . age == 30
81
79
82
80
83
81
def test_kafka_consumer_with_json_value_and_union_tag (kafka_event_with_json_data , lambda_context ):
84
82
"""Test Kafka consumer with JSON deserialization and dataclass output serialization."""
85
83
86
- # Create dict to capture results
87
- result_data = {}
88
-
89
84
class UserValueModel (BaseModel ):
90
85
name : Literal ["John Doe" ]
91
86
age : int
@@ -96,67 +91,73 @@ class UserValueModel2(BaseModel):
96
91
97
92
UnionModel = Annotated [Union [UserValueModel , UserValueModel2 ], Field (discriminator = "name" )]
98
93
94
+ # GIVEN
95
+ # A Kafka consumer configured to deserialize JSON data
96
+ # and convert it to a Pydantic model instance with Union Tags
99
97
schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UnionModel )
100
98
101
99
@kafka_consumer (schema_config = schema_config )
102
100
def handler (event : ConsumerRecords , context ):
103
- # Capture the results to verify
104
- record = next ( event . records )
105
- result_data [ "value_type" ] = type ( record .value ). __name__
106
- result_data [ "name" ] = record . value . name
107
- result_data [ "age" ] = record . value . age
108
- return { "processed" : True }
109
-
110
- # Call the handler
101
+ # Extract the deserialized and serialized value
102
+ # which should be a UserValueModel instance
103
+ value : UserValueModel = event . record .value
104
+ return value
105
+
106
+ # WHEN
107
+ # The handler processes a Kafka event containing JSON-encoded data
108
+ # which is deserialized into a dictionary and then converted to a Pydantic model
111
109
result = handler (kafka_event_with_json_data , lambda_context )
112
110
113
- # Verify the results
114
- assert result == { "processed" : True }
115
- assert result_data [ "value_type" ] == " UserValueModel"
116
- assert result_data [ " name" ] == "John Doe"
117
- assert result_data [ " age" ] == 30
111
+ # THEN
112
+ # The result should be a UserValueModel instance with the correct properties
113
+ assert isinstance ( result , UserValueModel )
114
+ assert result . name == "John Doe"
115
+ assert result . age == 30
118
116
119
117
120
118
def test_kafka_consumer_with_json_key_and_pydantic (kafka_event_with_json_data , lambda_context ):
121
- """Test Kafka consumer with JSON deserialization and dataclass output serialization."""
122
-
123
- # Create dict to capture results
124
- result_data = {}
125
-
126
- schema_config = SchemaConfig (key_schema_type = "JSON" , key_output_serializer = UserKeyModel )
119
+ # GIVEN
120
+ # A Kafka consumer configured to deserialize only the key using JSON
121
+ # and convert it to a Pydantic UserKeyModel instance
122
+ schema_config = SchemaConfig (
123
+ key_schema_type = "JSON" ,
124
+ key_output_serializer = UserKeyModel ,
125
+ )
127
126
128
127
@kafka_consumer (schema_config = schema_config )
129
128
def handler (event : ConsumerRecords , context ):
130
- # Capture the results to verify
131
- record = next (event .records )
132
- result_data ["value_type" ] = type (record .key ).__name__
133
- result_data ["user_id" ] = record .key .user_id
134
- return {"processed" : True }
129
+ # Extract the deserialized key to verify
130
+ key : UserKeyModel = event .record .key
131
+ return key
135
132
136
- # Call the handler
133
+ # WHEN
134
+ # The handler processes a Kafka event, deserializing only the key portion as JSON
135
+ # while leaving the value in its original format
137
136
result = handler (kafka_event_with_json_data , lambda_context )
138
137
139
- # Verify the results
140
- assert result == {"processed" : True }
141
- assert result_data ["value_type" ] == "UserKeyModel"
142
- assert result_data ["user_id" ] == "123"
138
+ # THEN
139
+ # The key should be properly deserialized from JSON and converted to a UserKeyModel
140
+ # with the expected user_id value
141
+ assert isinstance (result , UserKeyModel )
142
+ assert result .user_id == "123"
143
143
144
144
145
- # Tests for Complex Types with Pydantic TypeAdapter
146
145
def test_kafka_consumer_with_multiple_records (lambda_context ):
147
- """Test processing multiple records in a single event."""
148
-
149
- # Create data for multiple records
146
+ # GIVEN
147
+ # Three different user records to process
148
+ # First user: John Doe, age 30
150
149
data1 = {"name" : "John Doe" , "age" : 30 }
150
+ # Second user: Jane Smith, age 25
151
151
data2 = {"name" : "Jane Smith" , "age" : 25 }
152
+ # Third user: Bob Johnson, age 40
152
153
data3 = {"name" : "Bob Johnson" , "age" : 40 }
153
154
154
- # Encode the data
155
+ # Base64-encoded JSON data for each record
155
156
encoded1 = base64 .b64encode (json .dumps (data1 ).encode ("utf-8" )).decode ("utf-8" )
156
157
encoded2 = base64 .b64encode (json .dumps (data2 ).encode ("utf-8" )).decode ("utf-8" )
157
158
encoded3 = base64 .b64encode (json .dumps (data3 ).encode ("utf-8" )).decode ("utf-8" )
158
159
159
- # Create a kafka event with multiple records
160
+ # A Kafka event containing multiple records across different offsets
160
161
multi_record_event = {
161
162
"eventSource" : "aws:kafka" ,
162
163
"records" : {
@@ -195,25 +196,31 @@ def test_kafka_consumer_with_multiple_records(lambda_context):
195
196
},
196
197
}
197
198
198
- # Create schema config
199
- schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueModel )
200
-
201
- # Create list to store processed records
199
+ # A list to capture processed record details
202
200
processed_records = []
203
201
202
+ # A Kafka consumer configured to deserialize JSON and convert to Pydantic models
203
+ schema_config = SchemaConfig (value_schema_type = "JSON" , value_output_serializer = UserValueModel )
204
+
204
205
@kafka_consumer (schema_config = schema_config )
205
206
def handler (event : ConsumerRecords , context ):
206
- # Process all records
207
+ # Process each record and collect its properties
207
208
for record in event .records :
208
209
processed_records .append ({"name" : record .value .name , "age" : record .value .age })
209
210
return {"processed" : len (processed_records )}
210
211
211
- # Call the handler
212
+ # WHEN
213
+ # The handler processes the Kafka event containing multiple JSON records
212
214
result = handler (multi_record_event , lambda_context )
213
215
214
- # Verify the results
216
+ # THEN
217
+ # The handler should successfully process all three records
218
+ # and return the correct count
215
219
assert result == {"processed" : 3 }
216
220
assert len (processed_records ) == 3
221
+
222
+ # All three users should be correctly deserialized and processed
223
+ # regardless of their order in the event
217
224
assert any (r ["name" ] == "John Doe" and r ["age" ] == 30 for r in processed_records )
218
225
assert any (r ["name" ] == "Jane Smith" and r ["age" ] == 25 for r in processed_records )
219
226
assert any (r ["name" ] == "Bob Johnson" and r ["age" ] == 40 for r in processed_records )
0 commit comments