diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 16b3fbb68..e7757e7b3 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -418,7 +418,7 @@ def _reset_offsets_async(self, timestamps): expire_at = time.time() + self.config['request_timeout_ms'] / 1000 self._subscriptions.set_reset_pending(partitions, expire_at) - def on_success(result): + def on_success(timestamps_and_epochs, result): fetched_offsets, partitions_to_retry = result if partitions_to_retry: self._subscriptions.reset_failed(partitions_to_retry, time.time() + self.config['retry_backoff_ms'] / 1000) @@ -428,7 +428,7 @@ def on_success(result): ts, _epoch = timestamps_and_epochs[partition] self._reset_offset_if_needed(partition, ts, offset.offset) - def on_failure(error): + def on_failure(partitions, error): self._subscriptions.reset_failed(partitions, time.time() + self.config['retry_backoff_ms'] / 1000) self._client.cluster.request_update() @@ -439,8 +439,8 @@ def on_failure(error): log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error) future = self._send_list_offsets_request(node_id, timestamps_and_epochs) - future.add_callback(on_success) - future.add_errback(on_failure) + future.add_callback(on_success, timestamps_and_epochs) + future.add_errback(on_failure, partitions) def _send_list_offsets_requests(self, timestamps): """Fetch offsets for each partition in timestamps dict. This may send diff --git a/test/test_fetcher.py b/test/test_fetcher.py index f4e1f3f73..0ef349500 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -134,18 +134,27 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker): def test__reset_offsets_async(fetcher, mocker): - tp = TopicPartition("topic", 0) + tp0 = TopicPartition("topic", 0) + tp1 = TopicPartition("topic", 1) fetcher._subscriptions.subscribe(topics=["topic"]) - fetcher._subscriptions.assign_from_subscribed([tp]) - fetcher._subscriptions.request_offset_reset(tp) - fetched_offsets = {tp: OffsetAndTimestamp(1001, None, -1)} + fetcher._subscriptions.assign_from_subscribed([tp0, tp1]) + fetcher._subscriptions.request_offset_reset(tp0) + fetcher._subscriptions.request_offset_reset(tp1) + mocker.patch.object(fetcher._client.cluster, "leader_for_partition", side_effect=[0, 1]) mocker.patch.object(fetcher._client, 'ready', return_value=True) - mocker.patch.object(fetcher, '_send_list_offsets_request', - return_value=Future().success((fetched_offsets, set()))) - mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - fetcher._reset_offsets_async({tp: OffsetResetStrategy.EARLIEST}) - assert not fetcher._subscriptions.assignment[tp].awaiting_reset - assert fetcher._subscriptions.assignment[tp].position.offset == 1001 + future1 = Future() + future2 = Future() + mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=[future1, future2]) + fetcher._reset_offsets_async({ + tp0: OffsetResetStrategy.EARLIEST, + tp1: OffsetResetStrategy.EARLIEST, + }) + future1.success(({tp0: OffsetAndTimestamp(1001, None, -1)}, set())), + future2.success(({tp1: OffsetAndTimestamp(1002, None, -1)}, set())), + assert not fetcher._subscriptions.assignment[tp0].awaiting_reset + assert not fetcher._subscriptions.assignment[tp1].awaiting_reset + assert fetcher._subscriptions.assignment[tp0].position.offset == 1001 + assert fetcher._subscriptions.assignment[tp1].position.offset == 1002 def test__send_list_offsets_requests(fetcher, mocker):