2222import logging
2323import argparse
2424from confluent_kafka import Producer
25- from confluent_kafka .serialization import StringSerializer
25+ from confluent_kafka .schema_registry .json_schema import JSONSerializer
26+ from confluent_kafka .serialization import (StringSerializer ,
27+ SerializationContext , MessageField )
28+ from confluent_kafka .schema_registry import SchemaRegistryClient
29+
30+
31+ class User (object ):
32+ """
33+ User record
34+
35+ Args:
36+ name (str): User's name
37+
38+ favorite_number (int): User's favorite number
39+
40+ favorite_color (str): User's favorite color
41+
42+ address(str): User's address; confidential
43+ """
44+
45+ def __init__ (self , name , address , favorite_number , favorite_color ):
46+ self .name = name
47+ self .favorite_number = favorite_number
48+ self .favorite_color = favorite_color
49+ # address should not be serialized, see user_to_dict()
50+ self ._address = address
51+
52+
53+ def user_to_dict (user , ctx ):
54+ """
55+ Returns a dict representation of a User instance for serialization.
56+
57+ Args:
58+ user (User): User instance.
59+
60+ ctx (SerializationContext): Metadata pertaining to the serialization
61+ operation.
62+
63+ Returns:
64+ dict: Dict populated with user attributes to be serialized.
65+ """
66+
67+ # User._address must not be serialized; omit from dict
68+ return dict (name = user .name ,
69+ favorite_number = user .favorite_number ,
70+ favorite_color = user .favorite_color )
2671
2772
2873def producer_config (args ):
@@ -45,6 +90,21 @@ def producer_config(args):
4590 return params
4691
4792
93+ def schema_registry_config (args ):
94+ params = {
95+ 'url' : args .schema_registry ,
96+ 'bearer.auth.credentials.source' : 'OAUTHBEARER_AZURE_IMDS' ,
97+ 'bearer.auth.issuer.endpoint.query' : 'resource=&api-version=&client_id=' ,
98+ }
99+ # These two parameters are only applicable when producing to
100+ # confluent cloud where some sasl extensions are required.
101+ if args .logical_cluster and args .identity_pool_id :
102+ params ['bearer.auth.logical.cluster' ] = args .logical_cluster
103+ params ['bearer.auth.identity.pool.id' ] = args .identity_pool_id
104+
105+ return params
106+
107+
48108def delivery_report (err , msg ):
49109 """
50110 Reports the failure or success of a message delivery.
@@ -72,27 +132,54 @@ def delivery_report(err, msg):
72132
73133def main (args ):
74134 topic = args .topic
75- delimiter = args .delimiter
76135 producer_conf = producer_config (args )
77136 producer = Producer (producer_conf )
78- serializer = StringSerializer ('utf_8' )
137+ string_serializer = StringSerializer ('utf_8' )
138+ schema_str = """
139+ {
140+ "$schema": "http://json-schema.org/draft-07/schema#",
141+ "title": "User",
142+ "description": "A Confluent Kafka Python User",
143+ "type": "object",
144+ "properties": {
145+ "name": {
146+ "description": "User's name",
147+ "type": "string"
148+ },
149+ "favorite_number": {
150+ "description": "User's favorite number",
151+ "type": "number",
152+ "exclusiveMinimum": 0
153+ },
154+ "favorite_color": {
155+ "description": "User's favorite color",
156+ "type": "string"
157+ }
158+ },
159+ "required": [ "name", "favorite_number", "favorite_color" ]
160+ }
161+ """
162+ schema_registry_conf = schema_registry_config (args )
163+ schema_registry_client = SchemaRegistryClient (schema_registry_conf )
164+
165+ string_serializer = StringSerializer ('utf_8' )
166+ json_serializer = JSONSerializer (schema_str , schema_registry_client , user_to_dict )
79167
80168 print ('Producing records to topic {}. ^C to exit.' .format (topic ))
81169 while True :
82170 # Serve on_delivery callbacks from previous calls to produce()
83171 producer .poll (0.0 )
84172 try :
85- msg_data = input (">" )
86- msg = msg_data .split (delimiter )
87- if len (msg ) == 2 :
88- producer .produce (topic = topic ,
89- key = serializer (msg [0 ]),
90- value = serializer (msg [1 ]),
91- on_delivery = delivery_report )
92- else :
93- producer .produce (topic = topic ,
94- value = serializer (msg [0 ]),
95- on_delivery = delivery_report )
173+ name = input (">" )
174+ user = User (name = name ,
175+ address = "NA" ,
176+ favorite_color = "blue" ,
177+ favorite_number = 7 )
178+ serialized_user = json_serializer (user , SerializationContext (topic , MessageField .VALUE ))
179+ producer .produce (topic = topic ,
180+ key = string_serializer (name ),
181+ value = serialized_user ,
182+ on_delivery = delivery_report )
96183 except KeyboardInterrupt :
97184 break
98185
@@ -106,8 +193,8 @@ def main(args):
106193 help = "Bootstrap broker(s) (host[:port])" )
107194 parser .add_argument ('-t' , dest = "topic" , default = "example_producer_oauth" ,
108195 help = "Topic name" )
109- parser .add_argument ('-d ' , dest = "delimiter " , default = "|" ,
110- help = "Key-Value delimiter. Defaults to '|'" ),
196+ parser .add_argument ('-s ' , dest = "schema_registry " , required = True ,
197+ help = "Schema Registry (http(s)://host[:port]" )
111198 parser .add_argument ('--query' , dest = "query" , required = True ,
112199 help = "Query parameters for Azure IMDS token endpoint" )
113200 parser .add_argument ('--logical-cluster' , dest = "logical_cluster" , required = False , help = "Logical Cluster." )
0 commit comments