Skip to content

Commit 2cbeee6

Browse files
authored
Chore: close some debts after kafka-python merge (#962)
* Drop isinstance hack (not needed anymore) * Add 2.6 case to check_version
1 parent 72c1969 commit 2cbeee6

File tree

3 files changed

+7
-9
lines changed

3 files changed

+7
-9
lines changed

aiokafka/client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
from aiokafka import __version__
88
from aiokafka.conn import collect_hosts, create_conn, CloseReason
99
from aiokafka.cluster import ClusterMetadata
10-
from aiokafka.protocol.admin import DescribeAclsRequest_v2
10+
from aiokafka.protocol.admin import (
11+
DescribeAclsRequest_v2, DescribeClientQuotasRequest_v0
12+
)
1113
from aiokafka.protocol.commit import OffsetFetchRequest
1214
from aiokafka.protocol.coordination import FindCoordinatorRequest
1315
from aiokafka.protocol.fetch import FetchRequest
@@ -421,7 +423,7 @@ async def _get_conn(
421423
# XXX: earlier we only did an assert here, but it seems it's
422424
# possible to get a leader that is for some reason not in
423425
# metadata.
424-
# I think requerying metadata should solve this problem
426+
# I think requiring metadata should solve this problem
425427
if broker is None:
426428
raise StaleMetadata(
427429
'Broker id %s not in current metadata' % node_id)
@@ -581,8 +583,7 @@ def _check_api_version_response(self, response):
581583
# in descending order. As soon as we find one that works, return it
582584
test_cases = [
583585
# format (<broker version>, <needed struct>)
584-
# TODO Requires unreleased version of python-kafka
585-
# ((2, 6, 0), DescribeClientQuotasRequest[0]),
586+
((2, 6, 0), DescribeClientQuotasRequest_v0),
586587
((2, 5, 0), DescribeAclsRequest_v2),
587588
((2, 4, 0), ProduceRequest[8]),
588589
((2, 3, 0), FetchRequest[11]),

aiokafka/producer/sender.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -807,9 +807,6 @@ def _can_retry(self, error, batch):
807807
# as long as we set proper sequence, pid and epoch.
808808
if self._sender._txn_manager is None and batch.expired():
809809
return False
810-
# XXX: remove unknown topic check as we fix
811-
# https://github.com/dpkp/kafka-python/issues/1155
812-
if error.retriable or isinstance(error, UnknownTopicOrPartitionError)\
813-
or error is UnknownTopicOrPartitionError:
810+
if error.retriable:
814811
return True
815812
return False

tests/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ async def test_send_request(self):
289289
self.assertTrue(isinstance(resp, MetadataResponse))
290290
await client.close()
291291

292-
@kafka_versions('<2.6') # FIXME Not implemented yet
292+
@kafka_versions('<2.7') # FIXME Not implemented yet
293293
@run_until_complete
294294
async def test_check_version(self):
295295
kafka_version = tuple(int(x) for x in self.kafka_version.split("."))

0 commit comments

Comments
 (0)