From 66069b91ef32d3dd4e2e29fe033c0b8901d748af Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 3 May 2025 17:45:17 -0700 Subject: [PATCH 1/3] Test _reset_offsets_async with multiple partitions => KeyError --- test/test_fetcher.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index f4e1f3f73..0ef349500 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -134,18 +134,27 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker): def test__reset_offsets_async(fetcher, mocker): - tp = TopicPartition("topic", 0) + tp0 = TopicPartition("topic", 0) + tp1 = TopicPartition("topic", 1) fetcher._subscriptions.subscribe(topics=["topic"]) - fetcher._subscriptions.assign_from_subscribed([tp]) - fetcher._subscriptions.request_offset_reset(tp) - fetched_offsets = {tp: OffsetAndTimestamp(1001, None, -1)} + fetcher._subscriptions.assign_from_subscribed([tp0, tp1]) + fetcher._subscriptions.request_offset_reset(tp0) + fetcher._subscriptions.request_offset_reset(tp1) + mocker.patch.object(fetcher._client.cluster, "leader_for_partition", side_effect=[0, 1]) mocker.patch.object(fetcher._client, 'ready', return_value=True) - mocker.patch.object(fetcher, '_send_list_offsets_request', - return_value=Future().success((fetched_offsets, set()))) - mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) - fetcher._reset_offsets_async({tp: OffsetResetStrategy.EARLIEST}) - assert not fetcher._subscriptions.assignment[tp].awaiting_reset - assert fetcher._subscriptions.assignment[tp].position.offset == 1001 + future1 = Future() + future2 = Future() + mocker.patch.object(fetcher, '_send_list_offsets_request', side_effect=[future1, future2]) + fetcher._reset_offsets_async({ + tp0: OffsetResetStrategy.EARLIEST, + tp1: OffsetResetStrategy.EARLIEST, + }) + future1.success(({tp0: OffsetAndTimestamp(1001, None, -1)}, set())), + future2.success(({tp1: OffsetAndTimestamp(1002, None, -1)}, set())), + assert not fetcher._subscriptions.assignment[tp0].awaiting_reset + assert not fetcher._subscriptions.assignment[tp1].awaiting_reset + assert fetcher._subscriptions.assignment[tp0].position.offset == 1001 + assert fetcher._subscriptions.assignment[tp1].position.offset == 1002 def test__send_list_offsets_requests(fetcher, mocker): From 30a608e8228c3124c31b796b35c238fdaa3059b9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 3 May 2025 17:52:43 -0700 Subject: [PATCH 2/3] Fix _reset_offsets_async => do not use loop var closures --- kafka/consumer/fetcher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 16b3fbb68..96e62a3e7 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -414,11 +414,11 @@ def _reset_offsets_async(self, timestamps): for node_id, timestamps_and_epochs in six.iteritems(timestamps_by_node): if not self._client.ready(node_id): continue - partitions = set(timestamps_and_epochs.keys()) expire_at = time.time() + self.config['request_timeout_ms'] / 1000 + partitions = set(timestamps_and_epochs.keys()) self._subscriptions.set_reset_pending(partitions, expire_at) - def on_success(result): + def on_success(timestamps_and_epochs, result): fetched_offsets, partitions_to_retry = result if partitions_to_retry: self._subscriptions.reset_failed(partitions_to_retry, time.time() + self.config['retry_backoff_ms'] / 1000) @@ -428,7 +428,7 @@ def on_success(result): ts, _epoch = timestamps_and_epochs[partition] self._reset_offset_if_needed(partition, ts, offset.offset) - def on_failure(error): + def on_failure(partitions, error): self._subscriptions.reset_failed(partitions, time.time() + self.config['retry_backoff_ms'] / 1000) self._client.cluster.request_update() @@ -439,8 +439,8 @@ def on_failure(error): log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error) future = self._send_list_offsets_request(node_id, timestamps_and_epochs) - future.add_callback(on_success) - future.add_errback(on_failure) + future.add_callback(on_success, timestamps_and_epochs) + future.add_errback(on_failure, partitions) def _send_list_offsets_requests(self, timestamps): """Fetch offsets for each partition in timestamps dict. This may send From 868d975c94d233f18991e2e527f771980e216942 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 3 May 2025 18:41:09 -0700 Subject: [PATCH 3/3] revert unnecessary ordering change --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 96e62a3e7..e7757e7b3 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -414,8 +414,8 @@ def _reset_offsets_async(self, timestamps): for node_id, timestamps_and_epochs in six.iteritems(timestamps_by_node): if not self._client.ready(node_id): continue - expire_at = time.time() + self.config['request_timeout_ms'] / 1000 partitions = set(timestamps_and_epochs.keys()) + expire_at = time.time() + self.config['request_timeout_ms'] / 1000 self._subscriptions.set_reset_pending(partitions, expire_at) def on_success(timestamps_and_epochs, result):