Skip to content

Commit edaef89

Browse files
authored
Producer/Consumer SASL auth examples (#824)
1 parent 65d8142 commit edaef89

File tree

2 files changed

+127
-0
lines changed

2 files changed

+127
-0
lines changed

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The scripts in this directory provide code examples using Confluent's Python cli
1111
* [json_consumer.py](json_consumer.py): DeserializingConsumer with JsonDeserializer
1212
* [protobuf_producer.py](protobuf_producer.py): SerializingProducer with ProtobufSerializer
1313
* [protobuf_consumer.py](protobuf_consumer.py): DeserializingConsumer with ProtobufDeserializer
14+
* [sasl_producer.py](sasl_producer.py): SerializingProducer with SASL Authentication
1415

1516
Additional examples for [Confluent Cloud](https://www.confluent.io/confluent-cloud/):
1617

examples/sasl_producer.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
#
20+
# This is a simple example of the SerializingProducer using SASL authentication.
21+
#
22+
import argparse
23+
24+
from six.moves import input
25+
26+
from confluent_kafka import SerializingProducer
27+
from confluent_kafka.serialization import StringSerializer
28+
29+
30+
def delivery_report(err, msg):
31+
"""
32+
Reports the failure or success of a message delivery.
33+
34+
Args:
35+
err (KafkaError): The error that occurred on None on success.
36+
37+
msg (Message): The message that was produced or failed.
38+
39+
Note:
40+
In the delivery report callback the Message.key() and Message.value()
41+
will be the binary format as encoded by any configured Serializers and
42+
not the same object that was passed to produce().
43+
If you wish to pass the original object(s) for key and value to delivery
44+
report callback we recommend a bound callback or lambda where you pass
45+
the objects along.
46+
47+
"""
48+
if err is not None:
49+
print("Delivery failed for User record {}: {}".format(msg.key(), err))
50+
return
51+
print('User record {} successfully produced to {} [{}] at offset {}'.format(
52+
msg.key(), msg.topic(), msg.partition(), msg.offset()))
53+
54+
55+
def sasl_conf(args):
56+
sasl_mechanism = args.sasl_mechanism.upper()
57+
58+
sasl_conf = {'sasl.mechanism': sasl_mechanism,
59+
# Set to SASL_SSL to enable TLS support.
60+
'security.protocol': 'SASL_PLAINTEXT'}
61+
62+
if sasl_mechanism != 'GSSAPI':
63+
sasl_conf.update({'sasl.username': args.user_principal,
64+
'sasl.password': args.user_secret})
65+
66+
if sasl_mechanism == 'GSSAPI':
67+
sasl_conf.update({'sasl.kerberos.service.name', args.broker_principal,
68+
# Keytabs are not supported on Windows. Instead the
69+
# the logged on user's credentials are used to
70+
# authenticate.
71+
'sasl.kerberos.principal', args.user_principal,
72+
'sasl.kerberos.keytab', args.user_secret})
73+
return sasl_conf
74+
75+
76+
def main(args):
77+
topic = args.topic
78+
delimiter = args.delimiter
79+
producer_conf = {'bootstrap.servers': args.bootstrap_servers,
80+
'key.serializer': StringSerializer('utf_8'),
81+
'value.serializer': StringSerializer('utf_8')}
82+
83+
producer_conf.update(sasl_conf(args))
84+
85+
producer = SerializingProducer(producer_conf)
86+
87+
print("Producing records to topic {}. ^C to exit.".format(topic))
88+
while True:
89+
# Serve on_delivery callbacks from previous calls to produce()
90+
producer.poll(0.0)
91+
try:
92+
msg_data = input(">")
93+
msg = msg_data.split(delimiter)
94+
if len(msg) == 2:
95+
producer.produce(topic=topic, key=msg[0], value=msg[1],
96+
on_delivery=delivery_report)
97+
else:
98+
producer.produce(topic=topic, value=msg[0],
99+
on_delivery=delivery_report)
100+
except KeyboardInterrupt:
101+
break
102+
103+
print("\nFlushing {} records...".format(len(producer)))
104+
producer.flush()
105+
106+
107+
if __name__ == '__main__':
108+
parser = argparse.ArgumentParser(description="SerializingProducer"
109+
" SASL Example")
110+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
111+
help="Bootstrap broker(s) (host[:port])")
112+
parser.add_argument('-t', dest="topic", default="example_producer_sasl",
113+
help="Topic name")
114+
parser.add_argument('-d', dest="delimiter", default="|",
115+
help="Key-Value delimiter. Defaults to '|'"),
116+
parser.add_argument('-m', dest="sasl_mechanism", default='PLAIN',
117+
help="SASL mechanism to use for authentication."
118+
"Defaults to PLAIN")
119+
parser.add_argument('--tls', dest="enab_tls", default=False)
120+
parser.add_argument('-u', dest="user_principal", required=True,
121+
help="Username")
122+
parser.add_argument('-s', dest="user_secret", required=True,
123+
help="Password(ignored on Windows) for PLAIN and SCRAM,"
124+
" or path to keytab if GSSAPI.")
125+
126+
main(parser.parse_args())

0 commit comments

Comments
 (0)