Skip to content

Commit 9166c96

Browse files
authored
Merge pull request #972 from ods/fix-tagged-fields
Fix tagged fields handling
2 parents e8383ea + c54f257 commit 9166c96

File tree

6 files changed

+49
-32
lines changed

6 files changed

+49
-32
lines changed

aiokafka/admin/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -649,8 +649,9 @@ async def delete_records(
649649
timeout_ms or self._request_timeout_ms,
650650
)
651651
response = await self._client.send(leader, request)
652-
for topic, partitions in response.topics:
653-
for partition_index, low_watermark, error_code in partitions:
652+
# Starting with v2, DeleteRecordsResponse contains extra field with tags
653+
for topic, partitions, *_ in response.topics:
654+
for partition_index, low_watermark, error_code, *_ in partitions:
654655
if error_code:
655656
err = for_code(error_code)
656657
raise err

aiokafka/conn.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import functools
55
import hashlib
66
import hmac
7+
import io
78
import logging
89
import random
910
import socket
@@ -24,7 +25,6 @@
2425
SaslAuthenticateRequest,
2526
SaslHandShakeRequest,
2627
)
27-
from aiokafka.protocol.api import RequestHeader
2828
from aiokafka.protocol.commit import (
2929
GroupCoordinatorResponse_v0 as GroupCoordinatorResponse,
3030
)
@@ -459,10 +459,8 @@ def send(self, request, expect_response=True):
459459
)
460460

461461
correlation_id = self._next_correlation_id()
462-
header = RequestHeader(
463-
request,
464-
correlation_id=correlation_id,
465-
client_id=self._client_id,
462+
header = request.build_request_header(
463+
correlation_id=correlation_id, client_id=self._client_id
466464
)
467465
message = header.encode() + request.encode()
468466
size = struct.pack(">i", len(message))
@@ -480,7 +478,7 @@ def send(self, request, expect_response=True):
480478
return self._writer.drain()
481479
fut = self._loop.create_future()
482480
self._requests.append(
483-
(correlation_id, request.RESPONSE_TYPE, fut),
481+
(correlation_id, request, fut),
484482
)
485483
return wait_for(fut, self._request_timeout)
486484

@@ -569,39 +567,41 @@ async def _read(self_ref):
569567
del self
570568

571569
def _handle_frame(self, resp):
572-
correlation_id, resp_type, fut = self._requests[0]
570+
correlation_id, request, fut = self._requests[0]
573571

574572
if correlation_id is None: # Is a SASL packet, just pass it though
575573
if not fut.done():
576574
fut.set_result(resp)
577575
else:
578-
(recv_correlation_id,) = struct.unpack_from(">i", resp, 0)
576+
resp = io.BytesIO(resp)
577+
response_header = request.parse_response_header(resp)
578+
resp_type = request.RESPONSE_TYPE
579579

580580
if (
581581
self._api_version == (0, 8, 2)
582582
and resp_type is GroupCoordinatorResponse
583583
and correlation_id != 0
584-
and recv_correlation_id == 0
584+
and response_header.correlation_id == 0
585585
):
586586
log.warning(
587587
"Kafka 0.8.2 quirk -- GroupCoordinatorResponse"
588-
" coorelation id does not match request. This"
588+
" correlation id does not match request. This"
589589
" should go away once at least one topic has been"
590590
" initialized on the broker"
591591
)
592592

593-
elif correlation_id != recv_correlation_id:
593+
elif response_header.correlation_id != correlation_id:
594594
error = Errors.CorrelationIdError(
595595
f"Correlation ids do not match: sent {correlation_id},"
596-
f" recv {recv_correlation_id}"
596+
f" recv {response_header.correlation_id}"
597597
)
598598
if not fut.done():
599599
fut.set_exception(error)
600600
self.close(reason=CloseReason.OUT_OF_SYNC)
601601
return
602602

603603
if not fut.done():
604-
response = resp_type.decode(resp[4:])
604+
response = resp_type.decode(resp)
605605
log.debug("%s Response %d: %s", self, correlation_id, response)
606606
fut.set_result(response)
607607

aiokafka/protocol/admin.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,17 +1385,32 @@ class DeleteRecordsRequest_v2(Request):
13851385
("tags", TaggedFields),
13861386
)
13871387

1388+
def __init__(self, topics, timeout_ms, tags=None):
1389+
super().__init__(
1390+
[
1391+
(
1392+
topic,
1393+
[
1394+
(partition, before_offset, {})
1395+
for partition, before_offset in partitions
1396+
],
1397+
{},
1398+
)
1399+
for (topic, partitions) in topics
1400+
],
1401+
timeout_ms,
1402+
tags or {},
1403+
)
1404+
13881405

13891406
DeleteRecordsRequest = [
13901407
DeleteRecordsRequest_v0,
13911408
DeleteRecordsRequest_v1,
1392-
# FIXME: We have some problems with `TaggedFields`
1393-
# DeleteRecordsRequest_v2,
1409+
DeleteRecordsRequest_v2,
13941410
]
13951411

13961412
DeleteRecordsResponse = [
13971413
DeleteRecordsResponse_v0,
13981414
DeleteRecordsResponse_v1,
1399-
# FIXME: We have some problems with `TaggedFields`
1400-
# DeleteRecordsResponse_v2,
1415+
DeleteRecordsResponse_v2,
14011416
]

aiokafka/protocol/api.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from .types import Array, Int16, Int32, Schema, String, TaggedFields
55

66

7-
class RequestHeader(Struct):
7+
class RequestHeader_v0(Struct):
88
SCHEMA = Schema(
99
("api_key", Int16),
1010
("api_version", Int16),
@@ -13,12 +13,12 @@ class RequestHeader(Struct):
1313
)
1414

1515
def __init__(self, request, correlation_id=0, client_id="aiokafka"):
16-
super(RequestHeader, self).__init__(
16+
super().__init__(
1717
request.API_KEY, request.API_VERSION, correlation_id, client_id
1818
)
1919

2020

21-
class RequestHeaderV2(Struct):
21+
class RequestHeader_v1(Struct):
2222
# Flexible response / request headers end in field buffer
2323
SCHEMA = Schema(
2424
("api_key", Int16),
@@ -29,18 +29,18 @@ class RequestHeaderV2(Struct):
2929
)
3030

3131
def __init__(self, request, correlation_id=0, client_id="aiokafka", tags=None):
32-
super(RequestHeaderV2, self).__init__(
32+
super().__init__(
3333
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
3434
)
3535

3636

37-
class ResponseHeader(Struct):
37+
class ResponseHeader_v0(Struct):
3838
SCHEMA = Schema(
3939
("correlation_id", Int32),
4040
)
4141

4242

43-
class ResponseHeaderV2(Struct):
43+
class ResponseHeader_v1(Struct):
4444
SCHEMA = Schema(
4545
("correlation_id", Int32),
4646
("tags", TaggedFields),
@@ -81,15 +81,17 @@ def to_object(self):
8181

8282
def build_request_header(self, correlation_id, client_id):
8383
if self.FLEXIBLE_VERSION:
84-
return RequestHeaderV2(
84+
return RequestHeader_v1(
8585
self, correlation_id=correlation_id, client_id=client_id
8686
)
87-
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
87+
return RequestHeader_v0(
88+
self, correlation_id=correlation_id, client_id=client_id
89+
)
8890

8991
def parse_response_header(self, read_buffer):
9092
if self.FLEXIBLE_VERSION:
91-
return ResponseHeaderV2.decode(read_buffer)
92-
return ResponseHeader.decode(read_buffer)
93+
return ResponseHeader_v1.decode(read_buffer)
94+
return ResponseHeader_v0.decode(read_buffer)
9395

9496

9597
class Response(Struct):

aiokafka/protocol/types.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,6 @@ def encode(self, value):
313313
return UnsignedVarInt32.encode(len(value) + 1) + value
314314

315315

316-
# FIXME: TaggedFields doesn't seem to work properly so they should be avoided
317316
class TaggedFields(AbstractType):
318317
@classmethod
319318
def decode(cls, data):

tests/test_protocol.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import pytest
66

7-
from aiokafka.protocol.api import Request, RequestHeader, Response
7+
from aiokafka.protocol.api import Request, RequestHeader_v0, Response
88
from aiokafka.protocol.commit import GroupCoordinatorRequest
99
from aiokafka.protocol.fetch import FetchRequest, FetchResponse
1010
from aiokafka.protocol.message import Message, MessageSet, PartialMessage
@@ -188,7 +188,7 @@ def test_encode_message_header():
188188
)
189189

190190
req = GroupCoordinatorRequest[0]("foo")
191-
header = RequestHeader(req, correlation_id=4, client_id="client3")
191+
header = RequestHeader_v0(req, correlation_id=4, client_id="client3")
192192
assert header.encode() == expect
193193

194194

0 commit comments

Comments
 (0)