Skip to content

Commit 82695b0

Browse files
vmaurinVincent Maurinods
authored
Implement KIP-204 : DeleteRecords API (#969)
* Implement KIP-202 : DeleteRecords API When doing stream processing, it is convinient to use "transient" topic : * retention time is infinite * records get deleted when consumed The java kafka streams client is using the deleteRecords of the admin client to perform this operation. It is lacking in aiokafka The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API refs #967 * Use common method to get metadata * Explain the unpacking catch all * Remove usage of TaggedFields TaggedFields doesn't seem to work properly at the moment. Maybe they should be replaced by an implementation closer to the java client with their "flexibleVersions" * Fix linting errors (format) * Add change log --------- Co-authored-by: Vincent Maurin <[email protected]> Co-authored-by: Denis Otkidach <[email protected]>
1 parent f8d0d15 commit 82695b0

File tree

8 files changed

+238
-4
lines changed

8 files changed

+238
-4
lines changed

CHANGES/969.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement DeleteRecords API (KIP-204) (pr #969 by @vmaurin)

aiokafka/admin/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .client import AIOKafkaAdminClient
22
from .new_partitions import NewPartitions
33
from .new_topic import NewTopic
4+
from .records_to_delete import RecordsToDelete
45

5-
__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
6+
__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"]

aiokafka/admin/client.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@
66

77
from aiokafka import __version__
88
from aiokafka.client import AIOKafkaClient
9-
from aiokafka.errors import IncompatibleBrokerVersion, for_code
9+
from aiokafka.errors import (
10+
IncompatibleBrokerVersion,
11+
LeaderNotAvailableError,
12+
NotLeaderForPartitionError,
13+
for_code,
14+
)
1015
from aiokafka.protocol.admin import (
1116
AlterConfigsRequest,
1217
ApiVersionRequest_v0,
1318
CreatePartitionsRequest,
1419
CreateTopicsRequest,
20+
DeleteRecordsRequest,
1521
DeleteTopicsRequest,
1622
DescribeConfigsRequest,
1723
DescribeGroupsRequest,
@@ -24,6 +30,7 @@
2430

2531
from .config_resource import ConfigResource, ConfigResourceType
2632
from .new_topic import NewTopic
33+
from .records_to_delete import RecordsToDelete
2734

2835
log = logging.getLogger(__name__)
2936

@@ -605,3 +612,56 @@ async def list_consumer_group_offsets(
605612
offset_plus_meta = OffsetAndMetadata(offset, metadata)
606613
response_dict[tp] = offset_plus_meta
607614
return response_dict
615+
616+
async def delete_records(
617+
self,
618+
records_to_delete: Dict[TopicPartition, RecordsToDelete],
619+
timeout_ms: Optional[int] = None,
620+
) -> Dict[TopicPartition, int]:
621+
"""Delete records from partitions.
622+
623+
:param records_to_delete: A map of RecordsToDelete for each TopicPartition
624+
:param timeout_ms: Milliseconds to wait for the deletion to complete.
625+
:return: Appropriate version of DeleteRecordsResponse class.
626+
"""
627+
version = self._matching_api_version(DeleteRecordsRequest)
628+
629+
metadata = await self._get_cluster_metadata()
630+
631+
self._client.cluster.update_metadata(metadata)
632+
633+
requests = defaultdict(lambda: defaultdict(list))
634+
responses = {}
635+
636+
for tp, records in records_to_delete.items():
637+
leader = self._client.cluster.leader_for_partition(tp)
638+
if leader is None:
639+
raise NotLeaderForPartitionError()
640+
elif leader == -1:
641+
raise LeaderNotAvailableError()
642+
requests[leader][tp.topic].append((tp.partition, records))
643+
644+
req_cls = DeleteRecordsRequest[version]
645+
646+
for leader, delete_request in requests.items():
647+
request = req_cls(
648+
self._convert_records_to_delete(delete_request),
649+
timeout_ms or self._request_timeout_ms,
650+
)
651+
response = await self._client.send(leader, request)
652+
for topic, partitions in response.topics:
653+
for partition_index, low_watermark, error_code in partitions:
654+
if error_code:
655+
err = for_code(error_code)
656+
raise err
657+
responses[TopicPartition(topic, partition_index)] = low_watermark
658+
return responses
659+
660+
@staticmethod
661+
def _convert_records_to_delete(
662+
records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]],
663+
):
664+
return [
665+
(topic, [(partition, rec.before_offset) for partition, rec in records])
666+
for topic, records in records_to_delete.items()
667+
]
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
class RecordsToDelete:
2+
"""A class for deleting records on existing topics.
3+
Arguments:
4+
before_offset (int):
5+
delete all the records before the given offset
6+
"""
7+
8+
def __init__(
9+
self,
10+
before_offset,
11+
):
12+
self.before_offset = before_offset

aiokafka/cluster.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ def update_metadata(self, metadata):
254254
error_type = Errors.for_code(error_code)
255255
if error_type is Errors.NoError:
256256
_new_partitions[topic] = {}
257-
for p_error, partition, leader, replicas, isr in partitions:
257+
# Starting with v5, MetadataResponse contains more than 5 fields
258+
for p_error, partition, leader, replicas, isr, *_ in partitions:
258259
_new_partitions[topic][partition] = PartitionMetadata(
259260
topic=topic,
260261
partition=partition,

aiokafka/protocol/admin.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,3 +1276,126 @@ class ListPartitionReassignmentsRequest_v0(Request):
12761276
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]
12771277

12781278
ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]
1279+
1280+
1281+
class DeleteRecordsResponse_v0(Response):
1282+
API_KEY = 21
1283+
API_VERSION = 0
1284+
SCHEMA = Schema(
1285+
("throttle_time_ms", Int32),
1286+
(
1287+
"topics",
1288+
Array(
1289+
("name", String("utf-8")),
1290+
(
1291+
"partitions",
1292+
Array(
1293+
("partition_index", Int32),
1294+
("low_watermark", Int64),
1295+
("error_code", Int16),
1296+
),
1297+
),
1298+
),
1299+
),
1300+
)
1301+
1302+
1303+
class DeleteRecordsResponse_v1(Response):
1304+
API_KEY = 21
1305+
API_VERSION = 1
1306+
SCHEMA = DeleteRecordsResponse_v0.SCHEMA
1307+
1308+
1309+
class DeleteRecordsResponse_v2(Response):
1310+
API_KEY = 21
1311+
API_VERSION = 2
1312+
SCHEMA = Schema(
1313+
("throttle_time_ms", Int32),
1314+
(
1315+
"topics",
1316+
CompactArray(
1317+
("name", CompactString("utf-8")),
1318+
(
1319+
"partitions",
1320+
CompactArray(
1321+
("partition_index", Int32),
1322+
("low_watermark", Int64),
1323+
("error_code", Int16),
1324+
("tags", TaggedFields),
1325+
),
1326+
),
1327+
("tags", TaggedFields),
1328+
),
1329+
),
1330+
("tags", TaggedFields),
1331+
)
1332+
1333+
1334+
class DeleteRecordsRequest_v0(Request):
1335+
API_KEY = 21
1336+
API_VERSION = 0
1337+
RESPONSE_TYPE = DeleteRecordsResponse_v0
1338+
SCHEMA = Schema(
1339+
(
1340+
"topics",
1341+
Array(
1342+
("name", String("utf-8")),
1343+
(
1344+
"partitions",
1345+
Array(
1346+
("partition_index", Int32),
1347+
("offset", Int64),
1348+
),
1349+
),
1350+
),
1351+
),
1352+
("timeout_ms", Int32),
1353+
)
1354+
1355+
1356+
class DeleteRecordsRequest_v1(Request):
1357+
API_KEY = 21
1358+
API_VERSION = 1
1359+
RESPONSE_TYPE = DeleteRecordsResponse_v1
1360+
SCHEMA = DeleteRecordsRequest_v0.SCHEMA
1361+
1362+
1363+
class DeleteRecordsRequest_v2(Request):
1364+
API_KEY = 21
1365+
API_VERSION = 2
1366+
FLEXIBLE_VERSION = True
1367+
RESPONSE_TYPE = DeleteRecordsResponse_v2
1368+
SCHEMA = Schema(
1369+
(
1370+
"topics",
1371+
CompactArray(
1372+
("name", CompactString("utf-8")),
1373+
(
1374+
"partitions",
1375+
CompactArray(
1376+
("partition_index", Int32),
1377+
("offset", Int64),
1378+
("tags", TaggedFields),
1379+
),
1380+
),
1381+
("tags", TaggedFields),
1382+
),
1383+
),
1384+
("timeout_ms", Int32),
1385+
("tags", TaggedFields),
1386+
)
1387+
1388+
1389+
DeleteRecordsRequest = [
1390+
DeleteRecordsRequest_v0,
1391+
DeleteRecordsRequest_v1,
1392+
# FIXME: We have some problems with `TaggedFields`
1393+
# DeleteRecordsRequest_v2,
1394+
]
1395+
1396+
DeleteRecordsResponse = [
1397+
DeleteRecordsResponse_v0,
1398+
DeleteRecordsResponse_v1,
1399+
# FIXME: We have some problems with `TaggedFields`
1400+
# DeleteRecordsResponse_v2,
1401+
]

aiokafka/protocol/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ def encode(self, value):
313313
return UnsignedVarInt32.encode(len(value) + 1) + value
314314

315315

316+
# FIXME: TaggedFields doesn't seem to work properly so they should be avoided
316317
class TaggedFields(AbstractType):
317318
@classmethod
318319
def decode(cls, data):

tests/test_admin.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22

3-
from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic
3+
from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete
44
from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType
55
from aiokafka.consumer import AIOKafkaConsumer
66
from aiokafka.producer import AIOKafkaProducer
@@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self):
201201
assert resp[tp].offset == msg.offset + 1
202202
resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp])
203203
assert resp[tp].offset == msg.offset + 1
204+
205+
@kafka_versions(">=1.1.0")
206+
@run_until_complete
207+
async def test_delete_records(self):
208+
admin = await self.create_admin()
209+
210+
await admin.create_topics([NewTopic(self.topic, 1, 1)])
211+
212+
async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer:
213+
first_message = await producer.send_and_wait(
214+
self.topic, partition=0, value=b"some-message"
215+
)
216+
await producer.send_and_wait(
217+
self.topic, partition=0, value=b"other-message"
218+
)
219+
220+
await admin.delete_records(
221+
{
222+
TopicPartition(self.topic, 0): RecordsToDelete(
223+
before_offset=first_message.offset + 1
224+
)
225+
}
226+
)
227+
228+
consumer = AIOKafkaConsumer(
229+
self.topic,
230+
bootstrap_servers=self.hosts,
231+
enable_auto_commit=False,
232+
auto_offset_reset="earliest",
233+
)
234+
await consumer.start()
235+
self.add_cleanup(consumer.stop)
236+
237+
msg = await consumer.getone()
238+
assert msg.value == b"other-message"

0 commit comments

Comments
 (0)