Skip to content

Commit 96d48e2

Browse files
mahajanadhityaemasabpranavrth
authored
[KIP-554] User SCRAM credentials API (#1575)
requires broker version >= 2.7.0 --------- Co-authored-by: Emanuele Sabellico <[email protected]> Co-authored-by: Pranav Rathi <[email protected]>
1 parent 367eb4b commit 96d48e2

File tree

9 files changed

+1205
-9
lines changed

9 files changed

+1205
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ v2.2.0 is a feature release with the following features, fixes and enhancements:
66

77
- [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API)
88
IncrementalAlterConfigs API (#1517).
9+
- [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API):
10+
User SASL/SCRAM credentials alteration and description (#1575).
911

1012
confluent-kafka-python is based on librdkafka v2.2.0, see the
1113
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0)

docs/index.rst

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ Supporting classes
5050
- :ref:`AclPermissionType <pythonclient_acl_permission_type>`
5151
- :ref:`AclBinding <pythonclient_acl_binding>`
5252
- :ref:`AclBindingFilter <pythonclient_acl_binding_filter>`
53+
- :ref:`ScramCredentialInfo <pythonclient_scram_credential_info>`
54+
- :ref:`UserScramCredentialsDescription <pythonclient_user_scram_credentials_description>`
55+
- :ref:`UserScramCredentialAlteration <pythonclient_user_scram_credential_alteration>`
56+
- :ref:`UserScramCredentialUpsertion <pythonclient_user_scram_credential_upsertion>`
57+
- :ref:`UserScramCredentialDeletion <pythonclient_user_scram_credential_deletion>`
5358

5459
Experimental
5560
These classes are experimental and are likely to be removed, or subject to incompatible
@@ -188,6 +193,60 @@ AclBindingFilter
188193
.. autoclass:: confluent_kafka.admin.AclBindingFilter
189194
:members:
190195

196+
.. _pythonclient_scram_mechanism:
197+
198+
**************
199+
ScramMechanism
200+
**************
201+
202+
.. autoclass:: confluent_kafka.admin.ScramMechanism
203+
:members:
204+
205+
.. _pythonclient_scram_credential_info:
206+
207+
*******************
208+
ScramCredentialInfo
209+
*******************
210+
211+
.. autoclass:: confluent_kafka.admin.ScramCredentialInfo
212+
:members:
213+
214+
.. _pythonclient_user_scram_credentials_description:
215+
216+
*******************************
217+
UserScramCredentialsDescription
218+
*******************************
219+
220+
.. autoclass:: confluent_kafka.admin.UserScramCredentialsDescription
221+
:members:
222+
223+
.. _pythonclient_user_scram_credential_alteration:
224+
225+
*****************************
226+
UserScramCredentialAlteration
227+
*****************************
228+
229+
.. autoclass:: confluent_kafka.admin.UserScramCredentialAlteration
230+
:members:
231+
232+
.. _pythonclient_user_scram_credential_upsertion:
233+
234+
****************************
235+
UserScramCredentialUpsertion
236+
****************************
237+
238+
.. autoclass:: confluent_kafka.admin.UserScramCredentialUpsertion
239+
:members:
240+
241+
.. _pythonclient_user_scram_credential_deletion:
242+
243+
***************************
244+
UserScramCredentialDeletion
245+
***************************
246+
247+
.. autoclass:: confluent_kafka.admin.UserScramCredentialDeletion
248+
:members:
249+
191250
.. _pythonclient_consumer:
192251

193252
********

examples/adminapi.py

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
2323
ConfigEntry, ConfigSource, AclBinding,
2424
AclBindingFilter, ResourceType, ResourcePatternType,
25-
AclOperation, AclPermissionType, AlterConfigOpType)
25+
AclOperation, AclPermissionType, AlterConfigOpType,
26+
ScramMechanism, ScramCredentialInfo,
27+
UserScramCredentialUpsertion, UserScramCredentialDeletion)
2628
import sys
2729
import threading
2830
import logging
@@ -596,6 +598,89 @@ def example_alter_consumer_group_offsets(a, args):
596598
raise
597599

598600

601+
def example_describe_user_scram_credentials(a, args):
602+
"""
603+
Describe User Scram Credentials
604+
"""
605+
futmap = a.describe_user_scram_credentials(args)
606+
607+
for username, fut in futmap.items():
608+
print("Username: {}".format(username))
609+
try:
610+
response = fut.result()
611+
for scram_credential_info in response.scram_credential_infos:
612+
print(f" Mechanism: {scram_credential_info.mechanism} " +
613+
f"Iterations: {scram_credential_info.iterations}")
614+
except KafkaException as e:
615+
print(" Error: {}".format(e))
616+
except Exception as e:
617+
print(f" Unexpected exception: {e}")
618+
619+
620+
def example_alter_user_scram_credentials(a, args):
621+
"""
622+
AlterUserScramCredentials
623+
"""
624+
alterations_args = []
625+
alterations = []
626+
i = 0
627+
op_cnt = 0
628+
629+
while i < len(args):
630+
op = args[i]
631+
if op == "UPSERT":
632+
if i + 5 >= len(args):
633+
raise ValueError(
634+
f"Invalid number of arguments for alteration {op_cnt}, expected 5, got {len(args) - i - 1}")
635+
user = args[i + 1]
636+
mechanism = ScramMechanism[args[i + 2]]
637+
iterations = int(args[i + 3])
638+
password = bytes(args[i + 4], 'utf8')
639+
# if salt is an empty string,
640+
# set it to None to generate it randomly.
641+
salt = args[i + 5]
642+
if not salt:
643+
salt = None
644+
else:
645+
salt = bytes(salt, 'utf8')
646+
alterations_args.append([op, user, mechanism, iterations,
647+
iterations, password, salt])
648+
i += 6
649+
elif op == "DELETE":
650+
if i + 2 >= len(args):
651+
raise ValueError(
652+
f"Invalid number of arguments for alteration {op_cnt}, expected 2, got {len(args) - i - 1}")
653+
user = args[i + 1]
654+
mechanism = ScramMechanism[args[i + 2]]
655+
alterations_args.append([op, user, mechanism])
656+
i += 3
657+
else:
658+
raise ValueError(f"Invalid alteration {op}, must be UPSERT or DELETE")
659+
op_cnt += 1
660+
661+
for alteration_arg in alterations_args:
662+
op = alteration_arg[0]
663+
if op == "UPSERT":
664+
[_, user, mechanism, iterations,
665+
iterations, password, salt] = alteration_arg
666+
scram_credential_info = ScramCredentialInfo(mechanism, iterations)
667+
upsertion = UserScramCredentialUpsertion(user, scram_credential_info,
668+
password, salt)
669+
alterations.append(upsertion)
670+
elif op == "DELETE":
671+
[_, user, mechanism] = alteration_arg
672+
deletion = UserScramCredentialDeletion(user, mechanism)
673+
alterations.append(deletion)
674+
675+
futmap = a.alter_user_scram_credentials(alterations)
676+
for username, fut in futmap.items():
677+
try:
678+
fut.result()
679+
print("{}: Success".format(username))
680+
except KafkaException as e:
681+
print("{}: Error: {}".format(username, e))
682+
683+
599684
if __name__ == '__main__':
600685
if len(sys.argv) < 3:
601686
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
@@ -625,7 +710,11 @@ def example_alter_consumer_group_offsets(a, args):
625710
sys.stderr.write(
626711
' alter_consumer_group_offsets <group> <topic1> <partition1> <offset1> ' +
627712
'<topic2> <partition2> <offset2> ..\n')
628-
713+
sys.stderr.write(' describe_user_scram_credentials [<user1> <user2> ..]\n')
714+
sys.stderr.write(' alter_user_scram_credentials UPSERT <user1> <mechanism1> ' +
715+
'<iterations1> <password1> <salt1> ' +
716+
'[UPSERT <user2> <mechanism2> <iterations2> ' +
717+
' <password2> <salt2> DELETE <user3> <mechanism3> ..]\n')
629718
sys.exit(1)
630719

631720
broker = sys.argv[1]
@@ -650,7 +739,9 @@ def example_alter_consumer_group_offsets(a, args):
650739
'describe_consumer_groups': example_describe_consumer_groups,
651740
'delete_consumer_groups': example_delete_consumer_groups,
652741
'list_consumer_group_offsets': example_list_consumer_group_offsets,
653-
'alter_consumer_group_offsets': example_alter_consumer_group_offsets}
742+
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
743+
'describe_user_scram_credentials': example_describe_user_scram_credentials,
744+
'alter_user_scram_credentials': example_alter_user_scram_credentials}
654745

655746
if operation not in opsmap:
656747
sys.stderr.write('Unknown operation: %s\n' % operation)

src/confluent_kafka/admin/__init__.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@
4040
ConsumerGroupDescription,
4141
MemberAssignment,
4242
MemberDescription)
43+
from ._scram import (UserScramCredentialAlteration, # noqa: F401
44+
UserScramCredentialUpsertion,
45+
UserScramCredentialDeletion,
46+
ScramCredentialInfo,
47+
ScramMechanism,
48+
UserScramCredentialsDescription)
49+
4350
from ..cimpl import (KafkaException, # noqa: F401
4451
KafkaError,
4552
_AdminClientImpl,
@@ -65,6 +72,7 @@
6572
from confluent_kafka import ConsumerGroupState \
6673
as _ConsumerGroupState
6774

75+
6876
try:
6977
string_type = basestring
7078
except NameError:
@@ -235,6 +243,28 @@ def _make_acls_result(f, futmap):
235243
for resource, fut in futmap.items():
236244
fut.set_exception(e)
237245

246+
@staticmethod
247+
def _make_user_scram_credentials_result(f, futmap):
248+
try:
249+
results = f.result()
250+
len_results = len(results)
251+
len_futures = len(futmap)
252+
if len(results) != len_futures:
253+
raise RuntimeError(
254+
f"Results length {len_results} is different from future-map length {len_futures}")
255+
for username, value in results.items():
256+
fut = futmap.get(username, None)
257+
if fut is None:
258+
raise RuntimeError(
259+
f"username {username} not found in future-map: {futmap}")
260+
if isinstance(value, KafkaError):
261+
fut.set_exception(KafkaException(value))
262+
else:
263+
fut.set_result(value)
264+
except Exception as e:
265+
for _, fut in futmap.items():
266+
fut.set_exception(e)
267+
238268
@staticmethod
239269
def _create_future():
240270
f = concurrent.futures.Future()
@@ -366,6 +396,59 @@ def _check_alter_consumer_group_offsets_request(request):
366396
raise ValueError(
367397
"Element of 'topic_partitions' must not have negative value for 'offset' field")
368398

399+
@staticmethod
400+
def _check_describe_user_scram_credentials_request(users):
401+
if not isinstance(users, list):
402+
raise TypeError("Expected input to be list of String")
403+
for user in users:
404+
if not isinstance(user, string_type):
405+
raise TypeError("Each value should be a string")
406+
if not user:
407+
raise ValueError("'user' cannot be empty")
408+
409+
@staticmethod
410+
def _check_alter_user_scram_credentials_request(alterations):
411+
if not isinstance(alterations, list):
412+
raise TypeError("Expected input to be list")
413+
if len(alterations) == 0:
414+
raise ValueError("Expected at least one alteration")
415+
for alteration in alterations:
416+
if not isinstance(alteration, UserScramCredentialAlteration):
417+
raise TypeError("Expected each element of list to be subclass of UserScramCredentialAlteration")
418+
if alteration.user is None:
419+
raise TypeError("'user' cannot be None")
420+
if not isinstance(alteration.user, string_type):
421+
raise TypeError("'user' must be a string")
422+
if not alteration.user:
423+
raise ValueError("'user' cannot be empty")
424+
425+
if isinstance(alteration, UserScramCredentialUpsertion):
426+
if alteration.password is None:
427+
raise TypeError("'password' cannot be None")
428+
if not isinstance(alteration.password, bytes):
429+
raise TypeError("'password' must be bytes")
430+
if not alteration.password:
431+
raise ValueError("'password' cannot be empty")
432+
433+
if alteration.salt is not None and not alteration.salt:
434+
raise ValueError("'salt' can be None but cannot be empty")
435+
if alteration.salt and not isinstance(alteration.salt, bytes):
436+
raise TypeError("'salt' must be bytes")
437+
438+
if not isinstance(alteration.scram_credential_info, ScramCredentialInfo):
439+
raise TypeError("Expected credential_info to be ScramCredentialInfo Type")
440+
if alteration.scram_credential_info.iterations < 1:
441+
raise ValueError("Iterations should be positive")
442+
if not isinstance(alteration.scram_credential_info.mechanism, ScramMechanism):
443+
raise TypeError("Expected the mechanism to be ScramMechanism Type")
444+
elif isinstance(alteration, UserScramCredentialDeletion):
445+
if not isinstance(alteration.mechanism, ScramMechanism):
446+
raise TypeError("Expected the mechanism to be ScramMechanism Type")
447+
else:
448+
raise TypeError("Expected each element of list 'alterations' " +
449+
"to be either a UserScramCredentialUpsertion or a " +
450+
"UserScramCredentialDeletion")
451+
369452
def create_topics(self, new_topics, **kwargs):
370453
"""
371454
Create one or more new topics.
@@ -871,3 +954,61 @@ def set_sasl_credentials(self, username, password):
871954
:raises TypeException: Invalid input.
872955
"""
873956
super(AdminClient, self).set_sasl_credentials(username, password)
957+
958+
def describe_user_scram_credentials(self, users, **kwargs):
959+
"""
960+
Describe user SASL/SCRAM credentials.
961+
962+
:param list(str) users: List of user names to describe.
963+
Duplicate users aren't allowed.
964+
:param float request_timeout: The overall request timeout in seconds,
965+
including broker lookup, request transmission, operation time
966+
on broker, and response. Default: `socket.timeout.ms*1000.0`
967+
968+
:returns: A dict of futures keyed by user name.
969+
The future result() method returns the
970+
:class:`UserScramCredentialsDescription` or
971+
raises KafkaException
972+
973+
:rtype: dict[str, future]
974+
975+
:raises TypeError: Invalid input type.
976+
:raises ValueError: Invalid input value.
977+
"""
978+
AdminClient._check_describe_user_scram_credentials_request(users)
979+
980+
f, futmap = AdminClient._make_futures_v2(users, None,
981+
AdminClient._make_user_scram_credentials_result)
982+
983+
super(AdminClient, self).describe_user_scram_credentials(users, f, **kwargs)
984+
985+
return futmap
986+
987+
def alter_user_scram_credentials(self, alterations, **kwargs):
988+
"""
989+
Alter user SASL/SCRAM credentials.
990+
991+
:param list(UserScramCredentialAlteration) alterations: List of
992+
:class:`UserScramCredentialAlteration` to apply.
993+
The pair (user, mechanism) must be unique among alterations.
994+
:param float request_timeout: The overall request timeout in seconds,
995+
including broker lookup, request transmission, operation time
996+
on broker, and response. Default: `socket.timeout.ms*1000.0`
997+
998+
:returns: A dict of futures keyed by user name.
999+
The future result() method returns None or
1000+
raises KafkaException
1001+
1002+
:rtype: dict[str, future]
1003+
1004+
:raises TypeError: Invalid input type.
1005+
:raises ValueError: Invalid input value.
1006+
"""
1007+
AdminClient._check_alter_user_scram_credentials_request(alterations)
1008+
1009+
f, futmap = AdminClient._make_futures_v2(set([alteration.user for alteration in alterations]), None,
1010+
AdminClient._make_user_scram_credentials_result)
1011+
1012+
super(AdminClient, self).alter_user_scram_credentials(alterations, f, **kwargs)
1013+
1014+
return futmap

0 commit comments

Comments
 (0)