@@ -38,18 +38,18 @@ class KafkaMessage(typing.NamedTuple):
38
38
The dictionary of message headers key/values.
39
39
"""
40
40
41
- key : typing .Optional [typing .Union [ bytes , str ] ]
41
+ key : typing .Optional [typing .AnyStr ]
42
42
"""
43
43
The message key.
44
44
"""
45
45
46
- value : typing .Union [ bytes , str ]
46
+ value : typing .AnyStr
47
47
"""
48
48
The message value.
49
49
"""
50
50
51
51
52
- KeyMapper = typing .Callable [[AnyCloudEvent ], typing .Union [ bytes , str ] ]
52
+ KeyMapper = typing .Callable [[AnyCloudEvent ], typing .AnyStr ]
53
53
"""
54
54
A callable function that creates a Kafka message key, given a CloudEvent instance.
55
55
"""
@@ -174,7 +174,7 @@ def to_structured(
174
174
f"Failed to map message key with error: { type (e ).__name__ } ('{ e } ')"
175
175
)
176
176
177
- attrs = event .get_attributes (). copy ( )
177
+ attrs : dict [ str , typing . Any ] = dict ( event .get_attributes ())
178
178
179
179
try :
180
180
data = data_marshaller (event .data )
@@ -208,7 +208,7 @@ def from_structured(
208
208
message : KafkaMessage ,
209
209
event_type : typing .Optional [typing .Type [AnyCloudEvent ]] = None ,
210
210
data_unmarshaller : typing .Optional [types .MarshallerType ] = None ,
211
- envelope_unmarshaller : typing .Optional [types .MarshallerType ] = None ,
211
+ envelope_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
212
212
) -> AnyCloudEvent :
213
213
"""
214
214
Returns a CloudEvent from a KafkaMessage in structured format.
@@ -232,20 +232,20 @@ def from_structured(
232
232
"Failed to unmarshall message with error: " f"{ type (e ).__name__ } ('{ e } ')"
233
233
)
234
234
235
- attributes = {}
235
+ attributes : dict [ str , typing . Any ] = {}
236
236
if message .key is not None :
237
237
attributes ["partitionkey" ] = message .key
238
238
239
+ data : typing .Optional [typing .Any ] = None
239
240
for name , value in structure .items ():
240
- decoder = lambda x : x
241
- if name == "data" :
242
- decoder = lambda v : data_unmarshaller (v )
243
- if name == "data_base64" :
244
- decoder = lambda v : data_unmarshaller (base64 .b64decode (v ))
245
- name = "data"
246
-
247
241
try :
248
- decoded_value = decoder (value )
242
+ if name == "data" :
243
+ decoded_value = data_unmarshaller (value )
244
+ elif name == "data_base64" :
245
+ decoded_value = data_unmarshaller (base64 .b64decode (value ))
246
+ name = "data"
247
+ else :
248
+ decoded_value = value
249
249
except Exception as e :
250
250
raise cloud_exceptions .DataUnmarshallerError (
251
251
"Failed to unmarshall data with error: " f"{ type (e ).__name__ } ('{ e } ')"
0 commit comments