diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8490fdb46..c5d8f8636 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -2,6 +2,7 @@ from collections import defaultdict import copy +import itertools import logging import socket import time @@ -255,12 +256,7 @@ def _refresh_controller_id(self, timeout_ms=30000): if 1 <= version <= 6: timeout_at = time.time() + timeout_ms / 1000 while time.time() < timeout_at: - request = MetadataRequest[version]() - future = self._send_request_to_node(self._client.least_loaded_node(), request) - - self._wait_for_futures([future]) - - response = future.value + response = self.send_request(MetadataRequest[version]()) controller_id = response.controller_id if controller_id == -1: log.warning("Controller ID not available, got -1") @@ -281,7 +277,7 @@ def _refresh_controller_id(self, timeout_ms=30000): "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." .format(version)) - def _find_coordinator_id_send_request(self, group_id): + def _find_coordinator_id_request(self, group_id): """Send a FindCoordinatorRequest to a broker. Arguments: @@ -289,18 +285,14 @@ def _find_coordinator_id_send_request(self, group_id): name as a string. Returns: - A message future + FindCoordinatorRequest """ version = self._client.api_version(FindCoordinatorRequest, max_version=2) if version <= 0: request = FindCoordinatorRequest[version](group_id) elif version <= 2: request = FindCoordinatorRequest[version](group_id, 0) - else: - raise NotImplementedError( - "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." - .format(version)) - return self._send_request_to_node(self._client.least_loaded_node(), request) + return request def _find_coordinator_id_process_response(self, response): """Process a FindCoordinatorResponse. @@ -335,16 +327,9 @@ def _find_coordinator_ids(self, group_ids): A dict of {group_id: node_id} where node_id is the id of the broker that is the coordinator for the corresponding group. """ - groups_futures = { - group_id: self._find_coordinator_id_send_request(group_id) - for group_id in group_ids - } - self._wait_for_futures(groups_futures.values()) - groups_coordinators = { - group_id: self._find_coordinator_id_process_response(future.value) - for group_id, future in groups_futures.items() - } - return groups_coordinators + requests = [(self._find_coordinator_id_request(group_id), None) for group_id in group_ids] + coordinator_ids = self.send_requests(requests, response_fn=self._find_coordinator_id_process_response) + return dict(zip(group_ids, coordinator_ids)) def _send_request_to_node(self, node_id, request, wakeup=True): """Send a Kafka protocol message to a specific broker. @@ -366,6 +351,40 @@ def _send_request_to_node(self, node_id, request, wakeup=True): return Future().failure(e) return self._client.send(node_id, request, wakeup) + def _wait_for_futures(self, futures): + """Block until all futures complete. If any fail, raise the encountered exception. + + Arguments: + futures: A list of Future objects awaiting results. + + Raises: + The first encountered exception if a future fails. + """ + while not all(future.succeeded() for future in futures): + for future in futures: + self._client.poll(future=future) + + if future.failed(): + raise future.exception # pylint: disable-msg=raising-bad-type + + def send_request(self, request, node_id=None): + if node_id is None: + node_id = self._client.least_loaded_node() + self._client.await_ready(node_id) + future = self._client.send(node_id, request) + self._wait_for_futures([future]) # raises exception on failure + return future.value + + def send_requests(self, requests_and_node_ids, response_fn=lambda x: x): + futures = [] + for request, node_id in requests_and_node_ids: + if node_id is None: + node_id = self._client.least_loaded_node() + self._client.await_ready(node_id) + futures.append(self._client.send(node_id, request)) + self._wait_for_futures(futures) + return [response_fn(future.value) for future in futures] + def _send_request_to_controller(self, request): """Send a Kafka protocol message to the cluster controller. @@ -380,11 +399,7 @@ def _send_request_to_controller(self, request): tries = 2 # in case our cached self._controller_id is outdated while tries: tries -= 1 - future = self._send_request_to_node(self._controller_id, request) - - self._wait_for_futures([future]) - - response = future.value + response = self.send_request(request, node_id=self._controller_id) # In Java, the error field name is inconsistent: # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors # - DeleteTopicsResponse uses topic_error_codes @@ -499,10 +514,6 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): timeout=timeout_ms, validate_only=validate_only ) - else: - raise NotImplementedError( - "Support for CreateTopics v{} has not yet been added to KafkaAdminClient." - .format(version)) # TODO convert structs to a more pythonic interface # TODO raise exceptions if errors return self._send_request_to_controller(request) @@ -522,18 +533,12 @@ def delete_topics(self, topics, timeout_ms=None): """ version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - if version <= 3: - request = DeleteTopicsRequest[version]( + return self._send_request_to_controller( + DeleteTopicsRequest[version]( topics=topics, timeout=timeout_ms ) - response = self._send_request_to_controller(request) - else: - raise NotImplementedError( - "Support for DeleteTopics v{} has not yet been added to KafkaAdminClient." - .format(version)) - return response - + ) def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): """ @@ -554,12 +559,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): allow_auto_topic_creation=auto_topic_creation ) - future = self._send_request_to_node( - self._client.least_loaded_node(), - request - ) - self._wait_for_futures([future]) - return future.value + return self.send_request(request) def list_topics(self): """Retrieve a list of all topic names in the cluster. @@ -677,16 +677,7 @@ def describe_acls(self, acl_filter): permission_type=acl_filter.permission_type ) - else: - raise NotImplementedError( - "Support for DescribeAcls v{} has not yet been added to KafkaAdmin." - .format(version) - ) - - future = self._send_request_to_node(self._client.least_loaded_node(), request) - self._wait_for_futures([future]) - response = future.value - + response = self.send_request(request) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: # optionally we could retry if error_type.retriable @@ -799,16 +790,7 @@ def create_acls(self, acls): request = CreateAclsRequest[version]( creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] ) - else: - raise NotImplementedError( - "Support for CreateAcls v{} has not yet been added to KafkaAdmin." - .format(version) - ) - - future = self._send_request_to_node(self._client.least_loaded_node(), request) - self._wait_for_futures([future]) - response = future.value - + response = self.send_request(request) return self._convert_create_acls_response_to_acls(acls, response) @staticmethod @@ -922,16 +904,7 @@ def delete_acls(self, acl_filters): request = DeleteAclsRequest[version]( filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] ) - else: - raise NotImplementedError( - "Support for DeleteAcls v{} has not yet been added to KafkaAdmin." - .format(version) - ) - - future = self._send_request_to_node(self._client.least_loaded_node(), request) - self._wait_for_futures([future]) - response = future.value - + response = self.send_request(request) return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) @staticmethod @@ -966,7 +939,7 @@ def describe_configs(self, config_resources, include_synonyms=False): supported by all versions. Default: False. Returns: - Appropriate version of DescribeConfigsResponse class. + List of DescribeConfigsResponses. """ # Break up requests by type - a broker config request must be sent to the specific broker. @@ -980,58 +953,36 @@ def describe_configs(self, config_resources, include_synonyms=False): else: topic_resources.append(self._convert_describe_config_resource_request(config_resource)) - futures = [] version = self._client.api_version(DescribeConfigsRequest, max_version=2) - if version == 0: - if include_synonyms: - raise IncompatibleBrokerVersion( - "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." - .format(self.config['api_version'])) - - if len(broker_resources) > 0: - for broker_resource in broker_resources: - try: - broker_id = int(broker_resource[1]) - except ValueError: - raise ValueError("Broker resource names must be an integer or a string represented integer") - - futures.append(self._send_request_to_node( - broker_id, - DescribeConfigsRequest[version](resources=[broker_resource]) - )) - - if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), - DescribeConfigsRequest[version](resources=topic_resources) - )) + if include_synonyms and version == 0: + raise IncompatibleBrokerVersion( + "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." + .format(self.config['api_version'])) - elif version <= 2: - if len(broker_resources) > 0: - for broker_resource in broker_resources: - try: - broker_id = int(broker_resource[1]) - except ValueError: - raise ValueError("Broker resource names must be an integer or a string represented integer") - - futures.append(self._send_request_to_node( - broker_id, - DescribeConfigsRequest[version]( - resources=[broker_resource], - include_synonyms=include_synonyms) - )) - - if len(topic_resources) > 0: - futures.append(self._send_request_to_node( - self._client.least_loaded_node(), - DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) - )) - else: - raise NotImplementedError( - "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version)) + requests = [] + if len(broker_resources) > 0: + for broker_resource in broker_resources: + try: + broker_id = int(broker_resource[1]) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") - self._wait_for_futures(futures) - return [f.value for f in futures] + if version == 0: + request = DescribeConfigsRequest[version](resources=[broker_resource]) + else: + request = DescribeConfigsRequest[version]( + resources=[broker_resource], + include_synonyms=include_synonyms) + requests.append((request, broker_id)) + + if len(topic_resources) > 0: + if version == 0: + request = DescribeConfigsRequest[version](resources=topic_resources) + else: + request = DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) + requests.append((request, None)) + + return self.send_requests(requests) @staticmethod def _convert_alter_config_resource_request(config_resource): @@ -1067,25 +1018,16 @@ def alter_configs(self, config_resources): Appropriate version of AlterConfigsResponse class. """ version = self._client.api_version(AlterConfigsRequest, max_version=1) - if version <= 1: - request = AlterConfigsRequest[version]( - resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] - ) - else: - raise NotImplementedError( - "Support for AlterConfigs v{} has not yet been added to KafkaAdminClient." - .format(version)) + request = AlterConfigsRequest[version]( + resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] + ) # TODO the Java client has the note: # // We must make a separate AlterConfigs request for every BROKER resource we want to alter # // and send the request to that specific broker. Other resources are grouped together into # // a single request that may be sent to any broker. # # So this is currently broken as it always sends to the least_loaded_node() - future = self._send_request_to_node(self._client.least_loaded_node(), request) - - self._wait_for_futures([future]) - response = future.value - return response + return self.send_request(request) # alter replica logs dir protocol not yet implemented # Note: have to lookup the broker with the replica assignment and send the request to that broker @@ -1129,16 +1071,11 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal """ version = self._client.api_version(CreatePartitionsRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) - if version <= 1: - request = CreatePartitionsRequest[version]( - topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], - timeout=timeout_ms, - validate_only=validate_only - ) - else: - raise NotImplementedError( - "Support for CreatePartitions v{} has not yet been added to KafkaAdminClient." - .format(version)) + request = CreatePartitionsRequest[version]( + topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], + timeout=timeout_ms, + validate_only=validate_only + ) return self._send_request_to_controller(request) def _get_leader_for_partitions(self, partitions, timeout_ms=None): @@ -1195,8 +1132,6 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id timeout_ms = self._validate_timeout(timeout_ms) responses = [] version = self._client.api_version(DeleteRecordsRequest, max_version=0) - if version is None: - raise IncompatibleBrokerVersion("Broker does not support DeleteGroupsRequest") # We want to make as few requests as possible # If a single node serves as a partition leader for multiple partitions (and/or @@ -1221,10 +1156,8 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id ], timeout_ms=timeout_ms ) - future = self._send_request_to_node(leader, request) - self._wait_for_futures([future]) - - responses.append(future.value.to_object()) + response = self.send_request(request, node_id=leader) + responses.append(response.to_object()) partition2result = {} partition2error = {} @@ -1266,15 +1199,14 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id # describe delegation_token protocol not yet implemented # Note: send the request to the least_loaded_node() - def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False): + def _describe_consumer_groups_request(self, group_id, include_authorized_operations=False): """Send a DescribeGroupsRequest to the group's coordinator. Arguments: group_id: The group name as a string - group_coordinator_id: The node_id of the groups' coordinator broker. Returns: - A message future. + DescribeGroupsRequest object """ version = self._client.api_version(DescribeGroupsRequest, max_version=3) if version <= 2: @@ -1295,11 +1227,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, groups=(group_id,), include_authorized_operations=include_authorized_operations ) - else: - raise NotImplementedError( - "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient." - .format(version)) - return self._send_request_to_node(group_coordinator_id, request) + return request def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" @@ -1378,46 +1306,25 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include plan to change this to return namedtuples as well as decoding the partition assignments. """ - group_descriptions = [] - if group_coordinator_id is not None: groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids} else: groups_coordinators = self._find_coordinator_ids(group_ids) - futures = [ - self._describe_consumer_groups_send_request( - group_id, - coordinator_id, - include_authorized_operations) + requests = [ + (self._describe_consumer_groups_request(group_id, include_authorized_operations), coordinator_id) for group_id, coordinator_id in groups_coordinators.items() ] - self._wait_for_futures(futures) - - for future in futures: - response = future.value - group_description = self._describe_consumer_groups_process_response(response) - group_descriptions.append(group_description) + return self.send_requests(requests, response_fn=self._describe_consumer_groups_process_response) - return group_descriptions - - def _list_consumer_groups_send_request(self, broker_id): + def _list_consumer_groups_request(self): """Send a ListGroupsRequest to a broker. - Arguments: - broker_id (int): The broker's node_id. - Returns: - A message future + ListGroupsRequest object """ version = self._client.api_version(ListGroupsRequest, max_version=2) - if version <= 2: - request = ListGroupsRequest[version]() - else: - raise NotImplementedError( - "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient." - .format(version)) - return self._send_request_to_node(broker_id, request) + return ListGroupsRequest[version]() def _list_consumer_groups_process_response(self, response): """Process a ListGroupsResponse into a list of groups.""" @@ -1467,23 +1374,20 @@ def list_consumer_groups(self, broker_ids=None): # because if a group coordinator fails after being queried, and its # consumer groups move to new brokers that haven't yet been queried, # then the same group could be returned by multiple brokers. - consumer_groups = set() if broker_ids is None: broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()] - futures = [self._list_consumer_groups_send_request(b) for b in broker_ids] - self._wait_for_futures(futures) - for f in futures: - response = f.value - consumer_groups.update(self._list_consumer_groups_process_response(response)) - return list(consumer_groups) + requests = [ + (self._list_consumer_groups_request(), broker_id) + for broker_id in broker_ids + ] + consumer_groups = self.send_requests(requests, response_fn=self._list_consumer_groups_process_response) + return list(set().union(*consumer_groups)) - def _list_consumer_group_offsets_send_request(self, group_id, - group_coordinator_id, partitions=None): + def _list_consumer_group_offsets_request(self, group_id, partitions=None): """Send an OffsetFetchRequest to a broker. Arguments: group_id (str): The consumer group id name for which to fetch offsets. - group_coordinator_id (int): The node_id of the group's coordinator broker. Keyword Arguments: partitions: A list of TopicPartitions for which to fetch @@ -1491,30 +1395,24 @@ def _list_consumer_group_offsets_send_request(self, group_id, known offsets for the consumer group. Default: None. Returns: - A message future + OffsetFetchRequest object """ version = self._client.api_version(OffsetFetchRequest, max_version=5) - if version <= 5: - if partitions is None: - if version <= 1: - raise ValueError( - """OffsetFetchRequest_v{} requires specifying the - partitions for which to fetch offsets. Omitting the - partitions is only supported on brokers >= 0.10.2. - For details, see KIP-88.""".format(version)) - topics_partitions = None - else: - # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] - topics_partitions_dict = defaultdict(set) - for topic, partition in partitions: - topics_partitions_dict[topic].add(partition) - topics_partitions = list(six.iteritems(topics_partitions_dict)) - request = OffsetFetchRequest[version](group_id, topics_partitions) + if partitions is None: + if version <= 1: + raise ValueError( + """OffsetFetchRequest_v{} requires specifying the + partitions for which to fetch offsets. Omitting the + partitions is only supported on brokers >= 0.10.2. + For details, see KIP-88.""".format(version)) + topics_partitions = None else: - raise NotImplementedError( - "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient." - .format(version)) - return self._send_request_to_node(group_coordinator_id, request) + # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] + topics_partitions_dict = defaultdict(set) + for topic, partition in partitions: + topics_partitions_dict[topic].add(partition) + topics_partitions = list(six.iteritems(topics_partitions_dict)) + return OffsetFetchRequest[version](group_id, topics_partitions) def _list_consumer_group_offsets_process_response(self, response): """Process an OffsetFetchResponse. @@ -1592,10 +1490,8 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, """ if group_coordinator_id is None: group_coordinator_id = self._find_coordinator_ids([group_id])[group_id] - future = self._list_consumer_group_offsets_send_request( - group_id, group_coordinator_id, partitions) - self._wait_for_futures([future]) - response = future.value + request = self._list_consumer_group_offsets_request(group_id, partitions) + response = self.send_request(request, node_id=group_coordinator_id) return self._list_consumer_group_offsets_process_response(response) def delete_consumer_groups(self, group_ids, group_coordinator_id=None): @@ -1621,23 +1517,20 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None): Returns: A list of tuples (group_id, KafkaError) """ + coordinators_groups = defaultdict(list) if group_coordinator_id is not None: - futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)] + coordinators_groups[group_coordinator_id] = group_ids else: - coordinators_groups = defaultdict(list) for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items(): coordinators_groups[coordinator_id].append(group_id) - futures = [ - self._delete_consumer_groups_send_request(group_ids, coordinator_id) - for coordinator_id, group_ids in coordinators_groups.items() - ] - self._wait_for_futures(futures) + requests = [ + (self._delete_consumer_groups_request(group_ids), coordinator_id) + for coordinator_id, group_ids in coordinators_groups.items() + ] - results = [] - for f in futures: - results.extend(self._convert_delete_groups_response(f.value)) - return results + results = self.send_requests(requests, response_fn=self._convert_delete_groups_response) + return list(itertools.chain(*results)) def _convert_delete_groups_response(self, response): """Parse the DeleteGroupsResponse, mapping group IDs to their respective errors. @@ -1658,24 +1551,17 @@ def _convert_delete_groups_response(self, response): "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient." .format(response.API_VERSION)) - def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): - """Send a DeleteGroupsRequest to the specified broker (the group coordinator). + def _delete_consumer_groups_request(self, group_ids): + """Build a DeleteGroupsRequest to send to a broker (the group coordinator). Arguments: group_ids ([str]): A list of consumer group IDs to be deleted. - group_coordinator_id (int): The node_id of the broker coordinating these groups. Returns: - A future representing the in-flight DeleteGroupsRequest. + A DeleteGroupsRequest object. """ version = self._client.api_version(DeleteGroupsRequest, max_version=1) - if version <= 1: - request = DeleteGroupsRequest[version](group_ids) - else: - raise NotImplementedError( - "Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient." - .format(version)) - return self._send_request_to_node(group_coordinator_id, request) + return DeleteGroupsRequest[version](group_ids) @staticmethod def _convert_topic_partitions(topic_partitions): @@ -1722,35 +1608,11 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ # TODO convert structs to a more pythonic interface return self._send_request_to_controller(request) - def _wait_for_futures(self, futures): - """Block until all futures complete. If any fail, raise the encountered exception. - - Arguments: - futures: A list of Future objects awaiting results. - - Raises: - The first encountered exception if a future fails. - """ - while not all(future.succeeded() for future in futures): - for future in futures: - self._client.poll(future=future) - - if future.failed(): - raise future.exception # pylint: disable-msg=raising-bad-type - def describe_log_dirs(self): """Send a DescribeLogDirsRequest request to a broker. Returns: - A message future + DescribeLogDirsResponse object """ version = self._client.api_version(DescribeLogDirsRequest, max_version=0) - if version <= 0: - request = DescribeLogDirsRequest[version]() - future = self._send_request_to_node(self._client.least_loaded_node(), request) - self._wait_for_futures([future]) - else: - raise NotImplementedError( - "Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient." - .format(version)) - return future.value + return self.send_request(DescribeLogDirsRequest[version]()) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index f95f367e8..1c38400e7 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -383,6 +383,3 @@ def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): kafka_admin_client.delete_records({p0: 1000}) with pytest.raises(BrokerResponseError): kafka_admin_client.delete_records({p0: 1000, p1: 1000}) - - -