From 5eeeedfe3852b08f8ed4e13916a7aa631dc45f2b Mon Sep 17 00:00:00 2001 From: Vishal Bhavsar Date: Mon, 12 May 2025 17:37:44 +0000 Subject: [PATCH 1/4] Add support for rack id in the consumer --- aiokafka/consumer/consumer.py | 3 ++ aiokafka/consumer/fetcher.py | 64 ++++++++++++++++++++++++++++++++--- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 559fa35a..299b1aaf 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -260,6 +260,7 @@ def __init__( exclude_internal_topics=True, connections_max_idle_ms=540000, isolation_level="read_uncommitted", + rack_id="", sasl_mechanism="PLAIN", sasl_plain_password=None, sasl_plain_username=None, @@ -324,6 +325,7 @@ def __init__( self._max_poll_records = max_poll_records self._consumer_timeout = consumer_timeout_ms / 1000 self._isolation_level = isolation_level + self._rack_id = rack_id self._rebalance_timeout_ms = rebalance_timeout_ms self._max_poll_interval_ms = max_poll_interval_ms @@ -397,6 +399,7 @@ async def start(self): retry_backoff_ms=self._retry_backoff_ms, auto_offset_reset=self._auto_offset_reset, isolation_level=self._isolation_level, + rack_id=self._rack_id, ) if self._group_id is not None: diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index b7a419a2..2823a4f3 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -372,8 +372,14 @@ class Fetcher: OffsetOutOfRange errors: 'earliest' will move to the oldest available message, 'latest' will move to the most recent. Any ofther value will raise the exception. Default: 'latest'. + session_id (int): The fetch session ID. + session_epoch (int): The fetch session epoch, which is used for + ordering requests in a session. isolation_level (str): Controls how to read messages written transactionally. See consumer description. + forgotten_topics_data ([str, [int]]): In an incremental fetch + request, the partitions to remove. + rack_id (str): Rack ID of the consumer making this request. """ def __init__( @@ -393,6 +399,10 @@ def __init__( retry_backoff_ms=100, auto_offset_reset="latest", isolation_level="read_uncommitted", + session_id=0, + session_epoch=-1, + forgotten_topics_data=None, + rack_id="", ): self._client = client self._loop = client._loop @@ -416,6 +426,10 @@ def __init__( else: raise ValueError(f"Incorrect isolation level {isolation_level}") + self._session_id = session_id + self._session_epoch = session_epoch + self._forgotten_topics_data = forgotten_topics_data or [] + self._rack_id = rack_id self._records = collections.OrderedDict() self._in_flight = set() self._pending_tasks = set() @@ -427,8 +441,8 @@ def __init__( # waiters directly self._subscriptions.register_fetch_waiters(self._fetch_waiters) - if client.api_version >= (0, 11): - req_version = 4 + if client.api_version >= (2, 4, 0): + req_version = 11 elif client.api_version >= (0, 10, 1): req_version = 3 elif client.api_version >= (0, 10): @@ -645,7 +659,43 @@ def _get_actions_per_node(self, assignment): (tp.partition, position, self._max_partition_fetch_bytes) ) klass = self._fetch_request_class - if klass.API_VERSION > 3: + if klass.API_VERSION > 10: + topics = collections.defaultdict(list) + for k, v in by_topics.items(): + for partition_info in v: + topics[k].append( + ( + partition_info[0], + -1, + partition_info[1], + -1, + partition_info[2], + ) + ) + + req = klass( + -1, # replica_id + self._fetch_max_wait_ms, + self._fetch_min_bytes, + self._fetch_max_bytes, + self._isolation_level, + self._session_id, + self._session_epoch, + list(topics.items()), + self._forgotten_topics_data, + self._rack_id, + ) + elif klass.API_VERSION > 6: + req = klass( + -1, # replica_id + self._fetch_max_wait_ms, + self._fetch_min_bytes, + self._fetch_max_bytes, + self._isolation_level, + list(by_topics.items()), + self._forgotten_topics_data, + ) + elif klass.API_VERSION > 3: req = klass( -1, # replica_id self._fetch_max_wait_ms, @@ -706,8 +756,12 @@ async def _proc_fetch_request(self, assignment, node_id, request): fetch_offsets = {} for topic, partitions in request.topics: - for partition, offset, _ in partitions: - fetch_offsets[TopicPartition(topic, partition)] = offset + if self._client.api_version >= (2, 4, 0): + for partition, _, offset, _, _ in partitions: + fetch_offsets[TopicPartition(topic, partition)] = offset + else: + for partition, offset, _ in partitions: + fetch_offsets[TopicPartition(topic, partition)] = offset now_ms = int(1000 * time.time()) for topic, partitions in response.topics: From d0a20d60164a3de21f58e2d51b72709f5e9535d1 Mon Sep 17 00:00:00 2001 From: Vishal Bhavsar Date: Mon, 12 May 2025 17:59:02 +0000 Subject: [PATCH 2/4] Add unit test --- tests/test_fetcher.py | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index 96fd2532..b2d202f9 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -26,6 +26,7 @@ UnknownTopicOrPartitionError, ) from aiokafka.protocol.fetch import FetchRequest_v0 as FetchRequest +from aiokafka.protocol.fetch import FetchRequest_v3, FetchRequest_v11 from aiokafka.protocol.fetch import FetchResponse_v0 as FetchResponse from aiokafka.protocol.offset import OffsetResponse from aiokafka.record.default_records import ( @@ -599,3 +600,43 @@ async def test_solitary_abort_marker(self): # Since isolation_level is READ_COMMITTED, no consumer records are # expected to be returned here. self.assertEqual(len(list(partition_recs)), 0) + + @run_until_complete + async def test_fetcher_request_class_and_session_params(self): + class DummyClient: + def __init__(self, api_version): + self.api_version = api_version + self._loop = get_running_loop() + + subs = SubscriptionState() + + # Test new threshold for rack_id support (API_VERSION 11) + client = DummyClient((2, 4, 0)) + fetcher = Fetcher( + client, + subs, + session_id=10, + session_epoch=20, + forgotten_topics_data=[("t", [1])], + rack_id="rack1", + ) + # Cancel background fetch task + fetcher._fetch_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await fetcher._fetch_task + # Parameters stored correctly + self.assertEqual(fetcher._session_id, 10) + self.assertEqual(fetcher._session_epoch, 20) + self.assertEqual(fetcher._forgotten_topics_data, [("t", [1])]) + self.assertEqual(fetcher._rack_id, "rack1") + # Request class selected correctly + self.assertIs(fetcher._fetch_request_class, FetchRequest_v11) + + # Test legacy threshold (API_VERSION 3) + client2 = DummyClient((0, 10, 1)) + fetcher2 = Fetcher(client2, subs) + fetcher2._fetch_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await fetcher2._fetch_task + self.assertEqual(fetcher2._fetch_request_class.API_VERSION, 3) + self.assertIs(fetcher2._fetch_request_class, FetchRequest_v3) From f0e80d25ee69fcc4095bf95e06622df03a69c602 Mon Sep 17 00:00:00 2001 From: Vishal Bhavsar Date: Mon, 12 May 2025 18:13:19 +0000 Subject: [PATCH 3/4] Fix fallback --- aiokafka/consumer/fetcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index 2823a4f3..103bd74e 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -443,6 +443,8 @@ def __init__( if client.api_version >= (2, 4, 0): req_version = 11 + elif client.api_version >= (0, 11): + req_version = 4 elif client.api_version >= (0, 10, 1): req_version = 3 elif client.api_version >= (0, 10): From 3b6fb2267a81fd95aedac5901928b3d0c3c1c59d Mon Sep 17 00:00:00 2001 From: Vishal Bhavsar Date: Wed, 14 May 2025 23:03:17 +0000 Subject: [PATCH 4/4] Fix response error processing --- aiokafka/consumer/fetcher.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index 103bd74e..fe505eb9 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -783,7 +783,13 @@ async def _proc_fetch_request(self, assignment, node_id, request): continue if error_type is Errors.NoError: - if request.API_VERSION >= 4: + if request.API_VERSION >= 11: + aborted_transactions = part_data[-3] + lso = part_data[-5] + elif request.API_VERSION >= 5: + aborted_transactions = part_data[-2] + lso = part_data[-4] + elif request.API_VERSION >= 4: aborted_transactions = part_data[-2] lso = part_data[-3] else: