2121from cloudevents_v1 .kafka .exceptions import KeyMapperError
2222from cloudevents_v1 .sdk import types
2323
24- DEFAULT_MARSHALLER : types .MarshallerType = json .dumps
25- DEFAULT_UNMARSHALLER : types .MarshallerType = json .loads
26- DEFAULT_EMBEDDED_DATA_MARSHALLER : types .MarshallerType = lambda x : x
24+ JSON_MARSHALLER : types .MarshallerType = json .dumps
25+ JSON_UNMARSHALLER : types .UnmarshallerType = json .loads
26+ IDENTITY_MARSHALLER = IDENTITY_UNMARSHALLER = lambda x : x
27+
28+ DEFAULT_MARSHALLER : types .MarshallerType = JSON_MARSHALLER
29+ DEFAULT_UNMARSHALLER : types .UnmarshallerType = JSON_UNMARSHALLER
30+ DEFAULT_EMBEDDED_DATA_MARSHALLER : types .MarshallerType = IDENTITY_MARSHALLER
31+ DEFAULT_EMBEDDED_DATA_UNMARSHALLER : types .UnmarshallerType = IDENTITY_UNMARSHALLER
2732
2833
2934class KafkaMessage (typing .NamedTuple ):
@@ -106,11 +111,29 @@ def to_binary(
106111 return KafkaMessage (headers , message_key , data )
107112
108113
114+ @typing .overload
109115def from_binary (
110116 message : KafkaMessage ,
111- event_type : typing .Optional [typing .Type [AnyCloudEvent ]] = None ,
112- data_unmarshaller : typing .Optional [types .MarshallerType ] = None ,
117+ event_type : None = None ,
118+ data_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
119+ ) -> http .CloudEvent :
120+ pass
121+
122+
123+ @typing .overload
124+ def from_binary (
125+ message : KafkaMessage ,
126+ event_type : typing .Type [AnyCloudEvent ],
127+ data_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
113128) -> AnyCloudEvent :
129+ pass
130+
131+
132+ def from_binary (
133+ message : KafkaMessage ,
134+ event_type : typing .Optional [typing .Type [AnyCloudEvent ]] = None ,
135+ data_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
136+ ) -> typing .Union [http .CloudEvent , AnyCloudEvent ]:
114137 """
115138 Returns a CloudEvent from a KafkaMessage in binary format.
116139
@@ -139,10 +162,11 @@ def from_binary(
139162 raise cloud_exceptions .DataUnmarshallerError (
140163 f"Failed to unmarshall data with error: { type (e ).__name__ } ('{ e } ')"
141164 )
165+ result : typing .Union [http .CloudEvent , AnyCloudEvent ]
142166 if event_type :
143167 result = event_type .create (attributes , data )
144168 else :
145- result = http .CloudEvent .create (attributes , data ) # type: ignore
169+ result = http .CloudEvent .create (attributes , data )
146170 return result
147171
148172
@@ -205,12 +229,32 @@ def to_structured(
205229 return KafkaMessage (headers , message_key , value )
206230
207231
232+ @typing .overload
208233def from_structured (
209234 message : KafkaMessage ,
210- event_type : typing .Optional [typing .Type [AnyCloudEvent ]] = None ,
211- data_unmarshaller : typing .Optional [types .MarshallerType ] = None ,
235+ event_type : None = None ,
236+ data_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
237+ envelope_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
238+ ) -> http .CloudEvent :
239+ pass
240+
241+
242+ @typing .overload
243+ def from_structured (
244+ message : KafkaMessage ,
245+ event_type : typing .Type [AnyCloudEvent ],
246+ data_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
212247 envelope_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
213248) -> AnyCloudEvent :
249+ pass
250+
251+
252+ def from_structured (
253+ message : KafkaMessage ,
254+ event_type : typing .Optional [typing .Type [AnyCloudEvent ]] = None ,
255+ data_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
256+ envelope_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
257+ ) -> typing .Union [http .CloudEvent , AnyCloudEvent ]:
214258 """
215259 Returns a CloudEvent from a KafkaMessage in structured format.
216260
@@ -222,7 +266,7 @@ def from_structured(
222266 :returns: CloudEvent
223267 """
224268
225- data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
269+ data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_UNMARSHALLER
226270 envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER
227271 try :
228272 structure = envelope_unmarshaller (message .value )
@@ -259,8 +303,9 @@ def from_structured(
259303 attributes ["datacontenttype" ] = val .decode ()
260304 else :
261305 attributes [header .lower ()] = val .decode ()
306+ result : typing .Union [AnyCloudEvent , http .CloudEvent ]
262307 if event_type :
263308 result = event_type .create (attributes , data )
264309 else :
265- result = http .CloudEvent .create (attributes , data ) # type: ignore
310+ result = http .CloudEvent .create (attributes , data )
266311 return result
0 commit comments