Skip to content

Commit fc4f786

Browse files
selevitSergey Levitinods
authored
Support SASL authentication in AIOKafkaAdminClient (#890)
* Support SASL authentication in AIOKafkaAdminClient [#889] * Add missing deps into README * Added a foundation for admin sasl tests * Bring back gssapi_consumer_factory() method * Add forgotten await-s * Add more admin tests * Add CHANGES * Swap method back to minimize diff --------- Co-authored-by: Sergey Levitin <[email protected]> Co-authored-by: Denis Otkidach <[email protected]>
1 parent 34b2d19 commit fc4f786

File tree

4 files changed

+81
-3
lines changed

4 files changed

+81
-3
lines changed

CHANGES/889.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added SASL authentication support to `AIOKafkaAdminClient`.

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ generate ssh keys for some tests.
8181

8282
Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::
8383

84-
sudo apt-get install -y libsnappy-dev libzstd-dev
84+
sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev krb5-user
8585
make setup
8686

8787
Running tests with coverage::

aiokafka/admin.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,13 @@ def __init__(self, *, loop=None,
8181
metadata_max_age_ms: int = 300000,
8282
security_protocol: str = "PLAINTEXT",
8383
ssl_context: Optional[SSLContext] = None,
84-
api_version: str = "auto"):
84+
api_version: str = "auto",
85+
sasl_mechanism: str = 'PLAIN',
86+
sasl_plain_username: Optional[str] = None,
87+
sasl_plain_password: Optional[str] = None,
88+
sasl_kerberos_service_name: str = 'kafka',
89+
sasl_kerberos_domain_name: Optional[str] = None,
90+
sasl_oauth_token_provider: Optional[str] = None):
8591
self._closed = False
8692
self._started = False
8793
self._version_info = {}
@@ -94,7 +100,13 @@ def __init__(self, *, loop=None,
94100
api_version=api_version,
95101
ssl_context=ssl_context,
96102
security_protocol=security_protocol,
97-
connections_max_idle_ms=connections_max_idle_ms)
103+
connections_max_idle_ms=connections_max_idle_ms,
104+
sasl_mechanism=sasl_mechanism,
105+
sasl_plain_username=sasl_plain_username,
106+
sasl_plain_password=sasl_plain_password,
107+
sasl_kerberos_service_name=sasl_kerberos_service_name,
108+
sasl_kerberos_domain_name=sasl_kerberos_domain_name,
109+
sasl_oauth_token_provider=sasl_oauth_token_provider)
98110

99111
async def close(self):
100112
"""Close the KafkaAdminClient connection to the Kafka broker."""

tests/test_sasl.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from aiokafka.producer import AIOKafkaProducer
88
from aiokafka.consumer import AIOKafkaConsumer
9+
from aiokafka.admin import AIOKafkaAdminClient
910

1011
from aiokafka.errors import (
1112
TopicAuthorizationFailedError, GroupAuthorizationFailedError,
@@ -74,6 +75,19 @@ async def consumer_factory(self, user="test", **kw):
7475
await consumer.start()
7576
return consumer
7677

78+
async def admin_client_factory(self, user="test", **kw):
79+
admin_client = AIOKafkaAdminClient(
80+
bootstrap_servers=[self.sasl_hosts],
81+
security_protocol="SASL_PLAINTEXT",
82+
sasl_mechanism="PLAIN",
83+
sasl_plain_username=user,
84+
sasl_plain_password=user,
85+
**kw
86+
)
87+
self.add_cleanup(admin_client.close)
88+
await admin_client.start()
89+
return admin_client
90+
7791
async def gssapi_producer_factory(self, **kw):
7892
if self.kafka_version == "0.9.0.1":
7993
kw['api_version'] = "0.9"
@@ -110,6 +124,21 @@ async def gssapi_consumer_factory(self, **kw):
110124
await consumer.start()
111125
return consumer
112126

127+
async def gssapi_admin_client_factory(self, **kw):
128+
if self.kafka_version == "0.9.0.1":
129+
kw['api_version'] = "0.9"
130+
131+
admin_client = AIOKafkaAdminClient(
132+
bootstrap_servers=[self.sasl_hosts],
133+
security_protocol="SASL_PLAINTEXT",
134+
sasl_mechanism="GSSAPI",
135+
sasl_kerberos_domain_name="localhost",
136+
**kw
137+
)
138+
self.add_cleanup(admin_client.close)
139+
await admin_client.start()
140+
return admin_client
141+
113142
async def scram_producer_factory(self, user="test", **kw):
114143
producer = AIOKafkaProducer(
115144

@@ -142,6 +171,19 @@ async def scram_consumer_factory(self, user="test", **kw):
142171
await consumer.start()
143172
return consumer
144173

174+
async def scram_admin_client_factory(self, user="test", **kw):
175+
admin_client = AIOKafkaAdminClient(
176+
bootstrap_servers=[self.sasl_hosts],
177+
security_protocol="SASL_PLAINTEXT",
178+
sasl_mechanism="SCRAM-SHA-256",
179+
sasl_plain_username=user,
180+
sasl_plain_password=user,
181+
**kw
182+
)
183+
self.add_cleanup(admin_client.close)
184+
await admin_client.start()
185+
return admin_client
186+
145187
@kafka_versions('>=0.10.0')
146188
@run_until_complete
147189
async def test_sasl_plaintext_basic(self):
@@ -181,6 +223,29 @@ async def test_sasl_plaintext_scram(self):
181223
msg = await consumer.getone()
182224
self.assertEqual(msg.value, b"Super scram msg")
183225

226+
@kafka_versions('>=0.10.0')
227+
@run_until_complete
228+
async def test_admin_client_sasl_plaintext_basic(self):
229+
admin_client = await self.admin_client_factory()
230+
cluster_info = await admin_client.describe_cluster()
231+
self.assertGreaterEqual(len(cluster_info["brokers"]), 1)
232+
233+
@kafka_versions('>=0.10.0')
234+
@run_until_complete
235+
async def test_admin_client_sasl_plaintext_gssapi(self):
236+
self.kerberos_utils.kinit("client/localhost")
237+
admin_client = await self.gssapi_admin_client_factory()
238+
cluster_info = await admin_client.describe_cluster()
239+
self.assertGreaterEqual(len(cluster_info["brokers"]), 1)
240+
241+
@kafka_versions('>=0.10.0')
242+
@run_until_complete
243+
async def test_admin_client_sasl_plaintext_scrum(self):
244+
self.kafka_config.add_scram_user("test", "test")
245+
admin_client = await self.scram_admin_client_factory()
246+
cluster_info = await admin_client.describe_cluster()
247+
self.assertGreaterEqual(len(cluster_info["brokers"]), 1)
248+
184249
##########################################################################
185250
# Topic Resource
186251
##########################################################################

0 commit comments

Comments
 (0)