2020# This is a simple example of the SerializingProducer using Avro.
2121#
2222import argparse
23+ import os
2324
2425from confluent_kafka import DeserializingConsumer
2526from confluent_kafka .schema_registry import SchemaRegistryClient
@@ -66,19 +67,16 @@ def dict_to_user(obj, ctx):
6667
6768def main (args ):
6869 topic = args .topic
70+ is_specific = args .specific == "true"
6971
70- schema_str = """
71- {
72- "namespace": "confluent.io.examples.serialization.avro",
73- "name": "User",
74- "type": "record",
75- "fields": [
76- {"name": "name", "type": "string"},
77- {"name": "favorite_number", "type": "int"},
78- {"name": "favorite_color", "type": "string"}
79- ]
80- }
81- """
72+ if is_specific :
73+ schema = "user_specific.avsc"
74+ else :
75+ schema = "user_generic.avsc"
76+
77+ path = os .path .realpath (os .path .dirname (__file__ ))
78+ with open (f"{ path } /avro/{ schema } " ) as f :
79+ schema_str = f .read ()
8280
8381 sr_conf = {'url' : args .schema_registry }
8482 schema_registry_client = SchemaRegistryClient (sr_conf )
@@ -110,8 +108,8 @@ def main(args):
110108 "\t favorite_number: {}\n "
111109 "\t favorite_color: {}\n "
112110 .format (msg .key (), user .name ,
113- user .favorite_color ,
114- user .favorite_number ))
111+ user .favorite_number ,
112+ user .favorite_color ))
115113 except KeyboardInterrupt :
116114 break
117115
@@ -129,5 +127,7 @@ def main(args):
129127 help = "Topic name" )
130128 parser .add_argument ('-g' , dest = "group" , default = "example_serde_avro" ,
131129 help = "Consumer group" )
130+ parser .add_argument ('-p' , dest = "specific" , default = "true" ,
131+ help = "Avro specific record" )
132132
133133 main (parser .parse_args ())
0 commit comments