Skip to content

Commit ab09f28

Browse files
committed
Add SR serialization to the Azure IMDS producer example
1 parent 6df4cce commit ab09f28

File tree

2 files changed

+104
-17
lines changed

2 files changed

+104
-17
lines changed

examples/oauth_oidc_ccloud_azure_imds_producer.py

Lines changed: 103 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,52 @@
2222
import logging
2323
import argparse
2424
from confluent_kafka import Producer
25-
from confluent_kafka.serialization import StringSerializer
25+
from confluent_kafka.schema_registry.json_schema import JSONSerializer
26+
from confluent_kafka.serialization import (StringSerializer,
27+
SerializationContext, MessageField)
28+
from confluent_kafka.schema_registry import SchemaRegistryClient
29+
30+
31+
class User(object):
32+
"""
33+
User record
34+
35+
Args:
36+
name (str): User's name
37+
38+
favorite_number (int): User's favorite number
39+
40+
favorite_color (str): User's favorite color
41+
42+
address(str): User's address; confidential
43+
"""
44+
45+
def __init__(self, name, address, favorite_number, favorite_color):
46+
self.name = name
47+
self.favorite_number = favorite_number
48+
self.favorite_color = favorite_color
49+
# address should not be serialized, see user_to_dict()
50+
self._address = address
51+
52+
53+
def user_to_dict(user, ctx):
54+
"""
55+
Returns a dict representation of a User instance for serialization.
56+
57+
Args:
58+
user (User): User instance.
59+
60+
ctx (SerializationContext): Metadata pertaining to the serialization
61+
operation.
62+
63+
Returns:
64+
dict: Dict populated with user attributes to be serialized.
65+
"""
66+
67+
# User._address must not be serialized; omit from dict
68+
return dict(name=user.name,
69+
favorite_number=user.favorite_number,
70+
favorite_color=user.favorite_color)
2671

2772

2873
def producer_config(args):
@@ -45,6 +90,21 @@ def producer_config(args):
4590
return params
4691

4792

93+
def schema_registry_config(args):
94+
params = {
95+
'url': args.schema_registry,
96+
'bearer.auth.credentials.source': 'OAUTHBEARER_AZURE_IMDS',
97+
'bearer.auth.issuer.endpoint.query': args.query,
98+
}
99+
# These two parameters are only applicable when producing to
100+
# confluent cloud where some sasl extensions are required.
101+
if args.logical_cluster and args.identity_pool_id:
102+
params['bearer.auth.logical.cluster'] = args.logical_cluster
103+
params['bearer.auth.identity.pool.id'] = args.identity_pool_id
104+
105+
return params
106+
107+
48108
def delivery_report(err, msg):
49109
"""
50110
Reports the failure or success of a message delivery.
@@ -72,27 +132,54 @@ def delivery_report(err, msg):
72132

73133
def main(args):
74134
topic = args.topic
75-
delimiter = args.delimiter
76135
producer_conf = producer_config(args)
77136
producer = Producer(producer_conf)
78-
serializer = StringSerializer('utf_8')
137+
string_serializer = StringSerializer('utf_8')
138+
schema_str = """
139+
{
140+
"$schema": "http://json-schema.org/draft-07/schema#",
141+
"title": "User",
142+
"description": "A Confluent Kafka Python User",
143+
"type": "object",
144+
"properties": {
145+
"name": {
146+
"description": "User's name",
147+
"type": "string"
148+
},
149+
"favorite_number": {
150+
"description": "User's favorite number",
151+
"type": "number",
152+
"exclusiveMinimum": 0
153+
},
154+
"favorite_color": {
155+
"description": "User's favorite color",
156+
"type": "string"
157+
}
158+
},
159+
"required": [ "name", "favorite_number", "favorite_color" ]
160+
}
161+
"""
162+
schema_registry_conf = schema_registry_config(args)
163+
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
164+
165+
string_serializer = StringSerializer('utf_8')
166+
json_serializer = JSONSerializer(schema_str, schema_registry_client, user_to_dict)
79167

80168
print('Producing records to topic {}. ^C to exit.'.format(topic))
81169
while True:
82170
# Serve on_delivery callbacks from previous calls to produce()
83171
producer.poll(0.0)
84172
try:
85-
msg_data = input(">")
86-
msg = msg_data.split(delimiter)
87-
if len(msg) == 2:
88-
producer.produce(topic=topic,
89-
key=serializer(msg[0]),
90-
value=serializer(msg[1]),
91-
on_delivery=delivery_report)
92-
else:
93-
producer.produce(topic=topic,
94-
value=serializer(msg[0]),
95-
on_delivery=delivery_report)
173+
name = input(">")
174+
user = User(name=name,
175+
address="NA",
176+
favorite_color="blue",
177+
favorite_number=7)
178+
serialized_user = json_serializer(user, SerializationContext(topic, MessageField.VALUE))
179+
producer.produce(topic=topic,
180+
key=string_serializer(name),
181+
value=serialized_user,
182+
on_delivery=delivery_report)
96183
except KeyboardInterrupt:
97184
break
98185

@@ -106,8 +193,8 @@ def main(args):
106193
help="Bootstrap broker(s) (host[:port])")
107194
parser.add_argument('-t', dest="topic", default="example_producer_oauth",
108195
help="Topic name")
109-
parser.add_argument('-d', dest="delimiter", default="|",
110-
help="Key-Value delimiter. Defaults to '|'"),
196+
parser.add_argument('-s', dest="schema_registry", required=True,
197+
help="Schema Registry (http(s)://host[:port]")
111198
parser.add_argument('--query', dest="query", required=True,
112199
help="Query parameters for Azure IMDS token endpoint")
113200
parser.add_argument('--logical-cluster', dest="logical_cluster", required=False, help="Logical Cluster.")

src/confluent_kafka/schema_registry/common/_oauthbearer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def _validate(self):
141141
self.token_endpoint = urlunparse(parsed_token_endpoint)
142142
elif not token_endpoint_override:
143143
raise ValueError("bearer.auth.issuer.endpoint.query must be provided "
144-
"when bearer.auth.issuer.endpoint.url isn overridden")
144+
"when bearer.auth.issuer.endpoint.url isn't overridden")
145145

146146

147147
class _StaticFieldProvider(_BearerFieldProvider):

0 commit comments

Comments
 (0)