Skip to content

Commit a244a5f

Browse files
committed
Fix tagged fields handling
* Use RequestHeader_v1/ResponseHeader_v1 for flexible version * Fix DeleteRecords for version 2+
1 parent e8383ea commit a244a5f

File tree

4 files changed

+35
-20
lines changed

4 files changed

+35
-20
lines changed

aiokafka/admin/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -649,8 +649,9 @@ async def delete_records(
649649
timeout_ms or self._request_timeout_ms,
650650
)
651651
response = await self._client.send(leader, request)
652-
for topic, partitions in response.topics:
653-
for partition_index, low_watermark, error_code in partitions:
652+
# Starting with v2, DeleteRecordsResponse contains extra field with tags
653+
for topic, partitions, *_ in response.topics:
654+
for partition_index, low_watermark, error_code, *_ in partitions:
654655
if error_code:
655656
err = for_code(error_code)
656657
raise err

aiokafka/conn.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import functools
55
import hashlib
66
import hmac
7+
import io
78
import logging
89
import random
910
import socket
@@ -24,7 +25,6 @@
2425
SaslAuthenticateRequest,
2526
SaslHandShakeRequest,
2627
)
27-
from aiokafka.protocol.api import RequestHeader
2828
from aiokafka.protocol.commit import (
2929
GroupCoordinatorResponse_v0 as GroupCoordinatorResponse,
3030
)
@@ -459,10 +459,8 @@ def send(self, request, expect_response=True):
459459
)
460460

461461
correlation_id = self._next_correlation_id()
462-
header = RequestHeader(
463-
request,
464-
correlation_id=correlation_id,
465-
client_id=self._client_id,
462+
header = request.build_request_header(
463+
correlation_id=correlation_id, client_id=self._client_id
466464
)
467465
message = header.encode() + request.encode()
468466
size = struct.pack(">i", len(message))
@@ -480,7 +478,7 @@ def send(self, request, expect_response=True):
480478
return self._writer.drain()
481479
fut = self._loop.create_future()
482480
self._requests.append(
483-
(correlation_id, request.RESPONSE_TYPE, fut),
481+
(correlation_id, request, fut),
484482
)
485483
return wait_for(fut, self._request_timeout)
486484

@@ -569,39 +567,41 @@ async def _read(self_ref):
569567
del self
570568

571569
def _handle_frame(self, resp):
572-
correlation_id, resp_type, fut = self._requests[0]
570+
correlation_id, request, fut = self._requests[0]
573571

574572
if correlation_id is None: # Is a SASL packet, just pass it though
575573
if not fut.done():
576574
fut.set_result(resp)
577575
else:
578-
(recv_correlation_id,) = struct.unpack_from(">i", resp, 0)
576+
resp = io.BytesIO(resp)
577+
response_header = request.parse_response_header(resp)
578+
resp_type = request.RESPONSE_TYPE
579579

580580
if (
581581
self._api_version == (0, 8, 2)
582582
and resp_type is GroupCoordinatorResponse
583583
and correlation_id != 0
584-
and recv_correlation_id == 0
584+
and response_header.correlation_id == 0
585585
):
586586
log.warning(
587587
"Kafka 0.8.2 quirk -- GroupCoordinatorResponse"
588-
" coorelation id does not match request. This"
588+
" correlation id does not match request. This"
589589
" should go away once at least one topic has been"
590590
" initialized on the broker"
591591
)
592592

593-
elif correlation_id != recv_correlation_id:
593+
elif response_header.correlation_id != correlation_id:
594594
error = Errors.CorrelationIdError(
595595
f"Correlation ids do not match: sent {correlation_id},"
596-
f" recv {recv_correlation_id}"
596+
f" recv {response_header.correlation_id}"
597597
)
598598
if not fut.done():
599599
fut.set_exception(error)
600600
self.close(reason=CloseReason.OUT_OF_SYNC)
601601
return
602602

603603
if not fut.done():
604-
response = resp_type.decode(resp[4:])
604+
response = resp_type.decode(resp)
605605
log.debug("%s Response %d: %s", self, correlation_id, response)
606606
fut.set_result(response)
607607

aiokafka/protocol/admin.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,17 +1385,32 @@ class DeleteRecordsRequest_v2(Request):
13851385
("tags", TaggedFields),
13861386
)
13871387

1388+
def __init__(self, topics, timeout_ms, tags=None):
1389+
super().__init__(
1390+
[
1391+
(
1392+
topic,
1393+
[
1394+
(partition, before_offset, {})
1395+
for partition, before_offset in partitions
1396+
],
1397+
{},
1398+
)
1399+
for (topic, partitions) in topics
1400+
],
1401+
timeout_ms,
1402+
tags or {},
1403+
)
1404+
13881405

13891406
DeleteRecordsRequest = [
13901407
DeleteRecordsRequest_v0,
13911408
DeleteRecordsRequest_v1,
1392-
# FIXME: We have some problems with `TaggedFields`
1393-
# DeleteRecordsRequest_v2,
1409+
DeleteRecordsRequest_v2,
13941410
]
13951411

13961412
DeleteRecordsResponse = [
13971413
DeleteRecordsResponse_v0,
13981414
DeleteRecordsResponse_v1,
1399-
# FIXME: We have some problems with `TaggedFields`
1400-
# DeleteRecordsResponse_v2,
1415+
DeleteRecordsResponse_v2,
14011416
]

aiokafka/protocol/types.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,6 @@ def encode(self, value):
313313
return UnsignedVarInt32.encode(len(value) + 1) + value
314314

315315

316-
# FIXME: TaggedFields doesn't seem to work properly so they should be avoided
317316
class TaggedFields(AbstractType):
318317
@classmethod
319318
def decode(cls, data):

0 commit comments

Comments
 (0)