File tree Expand file tree Collapse file tree 2 files changed +10
-7
lines changed Expand file tree Collapse file tree 2 files changed +10
-7
lines changed Original file line number Diff line number Diff line change @@ -86,8 +86,8 @@ data = {
8686 ' name' : ' John Doe' ,
8787 ' favorite_number' : 6
8888}
89- producer.send(' my-topic' , value = DataAndSchema (data, schema))
90- # the value MUST be an instance of DataAndSchema when we're using the SchemaRegistrySerializer
89+ producer.send(' my-topic' , value = (data, schema))
90+ # the value MUST be a tuple when we're using the SchemaRegistrySerializer
9191```
9292
9393Read Kafka messages with ` SchemaRegistryDeserializer ` :
@@ -115,8 +115,11 @@ consumer = KafkaConsumer('my-topic', value_deserializer=deserializer)
115115for message in consumer:
116116 # The deserializer produces DataAndSchema instances
117117 value: DataAndSchema = message.value
118- value.data
119- value.schema
118+ # which are NamedTuples with a `data` and `schema` property
119+ value.data == value[0 ]
120+ value.schema == value[1 ]
121+ # and can be deconstructed
122+ data, schema = value
120123```
121124
122125## Contributing
Original file line number Diff line number Diff line change 2626class DataAndSchema (NamedTuple ):
2727 """Data and its schema.
2828
29- Should be used to wrap the data and schema together before calling the
29+ Can be used to wrap the data and schema together before calling the
3030 producer's producing methods.
3131 """
3232 data : Any
@@ -74,9 +74,9 @@ def __init__(
7474 def serialize (self , topic : str , data_and_schema : DataAndSchema ):
7575 if data_and_schema is None :
7676 return None
77- if not isinstance (data_and_schema , DataAndSchema ):
77+ if not isinstance (data_and_schema , tuple ):
7878 raise TypeError ('AvroSerializer can only serialize' ,
79- f' { DataAndSchema } , got { type (data_and_schema )} ' )
79+ f' { tuple } , got { type (data_and_schema )} ' )
8080 data , schema = data_and_schema
8181 schema_version = self ._get_schema_version (topic , schema )
8282 serialized = schema .write (data )
You can’t perform that action at this time.
0 commit comments