Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def __init__(
exclude_internal_topics=True,
connections_max_idle_ms=540000,
isolation_level="read_uncommitted",
rack_id="",
sasl_mechanism="PLAIN",
sasl_plain_password=None,
sasl_plain_username=None,
Expand Down Expand Up @@ -324,6 +325,7 @@ def __init__(
self._max_poll_records = max_poll_records
self._consumer_timeout = consumer_timeout_ms / 1000
self._isolation_level = isolation_level
self._rack_id = rack_id
self._rebalance_timeout_ms = rebalance_timeout_ms
self._max_poll_interval_ms = max_poll_interval_ms

Expand Down Expand Up @@ -397,6 +399,7 @@ async def start(self):
retry_backoff_ms=self._retry_backoff_ms,
auto_offset_reset=self._auto_offset_reset,
isolation_level=self._isolation_level,
rack_id=self._rack_id,
)

if self._group_id is not None:
Expand Down
64 changes: 60 additions & 4 deletions aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,14 @@ class Fetcher:
OffsetOutOfRange errors: 'earliest' will move to the oldest
available message, 'latest' will move to the most recent. Any
ofther value will raise the exception. Default: 'latest'.
session_id (int): The fetch session ID.
session_epoch (int): The fetch session epoch, which is used for
ordering requests in a session.
isolation_level (str): Controls how to read messages written
transactionally. See consumer description.
forgotten_topics_data ([str, [int]]): In an incremental fetch
request, the partitions to remove.
rack_id (str): Rack ID of the consumer making this request.
"""

def __init__(
Expand All @@ -393,6 +399,10 @@ def __init__(
retry_backoff_ms=100,
auto_offset_reset="latest",
isolation_level="read_uncommitted",
session_id=0,
session_epoch=-1,
forgotten_topics_data=None,
rack_id="",
Comment on lines +402 to +405
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

):
self._client = client
self._loop = client._loop
Expand All @@ -416,6 +426,10 @@ def __init__(
else:
raise ValueError(f"Incorrect isolation level {isolation_level}")

self._session_id = session_id
self._session_epoch = session_epoch
self._forgotten_topics_data = forgotten_topics_data or []
self._rack_id = rack_id
self._records = collections.OrderedDict()
self._in_flight = set()
self._pending_tasks = set()
Expand All @@ -427,7 +441,9 @@ def __init__(
# waiters directly
self._subscriptions.register_fetch_waiters(self._fetch_waiters)

if client.api_version >= (0, 11):
if client.api_version >= (2, 4, 0):
req_version = 11
elif client.api_version >= (0, 11):
req_version = 4
elif client.api_version >= (0, 10, 1):
req_version = 3
Expand Down Expand Up @@ -645,7 +661,43 @@ def _get_actions_per_node(self, assignment):
(tp.partition, position, self._max_partition_fetch_bytes)
)
klass = self._fetch_request_class
if klass.API_VERSION > 3:
if klass.API_VERSION > 10:
topics = collections.defaultdict(list)
for k, v in by_topics.items():
for partition_info in v:
topics[k].append(
(
partition_info[0],
-1,
partition_info[1],
-1,
partition_info[2],
)
)

req = klass(
-1, # replica_id
self._fetch_max_wait_ms,
self._fetch_min_bytes,
self._fetch_max_bytes,
self._isolation_level,
self._session_id,
self._session_epoch,
list(topics.items()),
self._forgotten_topics_data,
self._rack_id,
)
elif klass.API_VERSION > 6:
req = klass(
-1, # replica_id
self._fetch_max_wait_ms,
self._fetch_min_bytes,
self._fetch_max_bytes,
self._isolation_level,
list(by_topics.items()),
self._forgotten_topics_data,
)
elif klass.API_VERSION > 3:
req = klass(
-1, # replica_id
self._fetch_max_wait_ms,
Expand Down Expand Up @@ -706,8 +758,12 @@ async def _proc_fetch_request(self, assignment, node_id, request):

fetch_offsets = {}
for topic, partitions in request.topics:
for partition, offset, _ in partitions:
fetch_offsets[TopicPartition(topic, partition)] = offset
if self._client.api_version >= (2, 4, 0):
for partition, _, offset, _, _ in partitions:
fetch_offsets[TopicPartition(topic, partition)] = offset
else:
for partition, offset, _ in partitions:
fetch_offsets[TopicPartition(topic, partition)] = offset

now_ms = int(1000 * time.time())
for topic, partitions in response.topics:
Expand Down
41 changes: 41 additions & 0 deletions tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
UnknownTopicOrPartitionError,
)
from aiokafka.protocol.fetch import FetchRequest_v0 as FetchRequest
from aiokafka.protocol.fetch import FetchRequest_v3, FetchRequest_v11
from aiokafka.protocol.fetch import FetchResponse_v0 as FetchResponse
from aiokafka.protocol.offset import OffsetResponse
from aiokafka.record.default_records import (
Expand Down Expand Up @@ -599,3 +600,43 @@ async def test_solitary_abort_marker(self):
# Since isolation_level is READ_COMMITTED, no consumer records are
# expected to be returned here.
self.assertEqual(len(list(partition_recs)), 0)

@run_until_complete
async def test_fetcher_request_class_and_session_params(self):
class DummyClient:
def __init__(self, api_version):
self.api_version = api_version
self._loop = get_running_loop()

subs = SubscriptionState()

# Test new threshold for rack_id support (API_VERSION 11)
client = DummyClient((2, 4, 0))
fetcher = Fetcher(
client,
subs,
session_id=10,
session_epoch=20,
forgotten_topics_data=[("t", [1])],
rack_id="rack1",
)
# Cancel background fetch task
fetcher._fetch_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await fetcher._fetch_task
# Parameters stored correctly
self.assertEqual(fetcher._session_id, 10)
self.assertEqual(fetcher._session_epoch, 20)
self.assertEqual(fetcher._forgotten_topics_data, [("t", [1])])
self.assertEqual(fetcher._rack_id, "rack1")
# Request class selected correctly
self.assertIs(fetcher._fetch_request_class, FetchRequest_v11)

# Test legacy threshold (API_VERSION 3)
client2 = DummyClient((0, 10, 1))
fetcher2 = Fetcher(client2, subs)
fetcher2._fetch_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await fetcher2._fetch_task
self.assertEqual(fetcher2._fetch_request_class.API_VERSION, 3)
self.assertIs(fetcher2._fetch_request_class, FetchRequest_v3)