Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aiokafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .client import AIOKafkaAdminClient
from .new_partitions import NewPartitions
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic"]
__all__ = ["AIOKafkaAdminClient", "NewPartitions", "NewTopic", "RecordsToDelete"]
72 changes: 71 additions & 1 deletion aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
from aiokafka.errors import IncompatibleBrokerVersion, for_code
from aiokafka.errors import (
IncompatibleBrokerVersion,
LeaderNotAvailableError,
NotLeaderForPartitionError,
for_code,
)
from aiokafka.protocol.admin import (
AlterConfigsRequest,
ApiVersionRequest_v0,
CreatePartitionsRequest,
CreateTopicsRequest,
DeleteRecordsRequest,
DeleteTopicsRequest,
DescribeConfigsRequest,
DescribeGroupsRequest,
Expand All @@ -24,6 +30,7 @@

from .config_resource import ConfigResource, ConfigResourceType
from .new_topic import NewTopic
from .records_to_delete import RecordsToDelete

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -605,3 +612,66 @@ async def list_consumer_group_offsets(
offset_plus_meta = OffsetAndMetadata(offset, metadata)
response_dict[tp] = offset_plus_meta
return response_dict

async def delete_records(
self,
records_to_delete: Dict[TopicPartition, RecordsToDelete],
timeout_ms: Optional[int] = None,
) -> Dict[TopicPartition, int]:
"""Delete records from partitions.

:param records_to_delete: A map of RecordsToDelete for each TopicPartition
:param timeout_ms: Milliseconds to wait for the deletion to complete.
:return: Appropriate version of DeleteRecordsResponse class.
"""
version = self._matching_api_version(DeleteRecordsRequest)

if self._version_info[MetadataRequest[0].API_KEY] < (0, 10):
metadata_request = MetadataRequest[0]([])
else:
metadata_request = MetadataRequest[1](None)

metadata = await self._send_request(metadata_request)

self._client.cluster.update_metadata(metadata)

requests = defaultdict(lambda: defaultdict(list))
responses = {}

for tp, records in records_to_delete.items():
leader = self._client.cluster.leader_for_partition(tp)
if leader is None:
raise NotLeaderForPartitionError()
elif leader == -1:
raise LeaderNotAvailableError()
requests[leader][tp.topic].append((tp.partition, records))

req_cls = DeleteRecordsRequest[version]

for leader, delete_request in requests.items():
request = req_cls(
self._convert_records_to_delete(delete_request),
timeout_ms or self._request_timeout_ms,
{},
)
response = await self._client.send(leader, request)
for topic, partitions in response.topics:
for partition_index, low_watermark, error_code in partitions:
if error_code:
err = for_code(error_code)
raise err
responses[TopicPartition(topic, partition_index)] = low_watermark
return responses

@staticmethod
def _convert_records_to_delete(
records_to_delete: Dict[str, List[Tuple[int, RecordsToDelete]]],
):
return [
(
topic,
[(partition, rec.before_offset, {}) for partition, rec in records],
{},
)
for topic, records in records_to_delete.items()
]
12 changes: 12 additions & 0 deletions aiokafka/admin/records_to_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class RecordsToDelete:
"""A class for deleting records on existing topics.
Arguments:
before_offset (int):
delete all the records before the given offset
"""

def __init__(
self,
before_offset,
):
self.before_offset = before_offset
121 changes: 121 additions & 0 deletions aiokafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,3 +1276,124 @@ class ListPartitionReassignmentsRequest_v0(Request):
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]


class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
),
),
),
),
)


class DeleteRecordsResponse_v1(Response):
API_KEY = 21
API_VERSION = 1
SCHEMA = DeleteRecordsResponse_v0.SCHEMA


class DeleteRecordsResponse_v2(Response):
API_KEY = 21
API_VERSION = 2
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


class DeleteRecordsRequest_v0(Request):
API_KEY = 21
API_VERSION = 0
RESPONSE_TYPE = DeleteRecordsResponse_v0
SCHEMA = Schema(
(
"topics",
Array(
("name", String("utf-8")),
(
"partitions",
Array(
("partition_index", Int32),
("offset", Int64),
),
),
),
),
("timeout_ms", Int32),
)


class DeleteRecordsRequest_v1(Request):
API_KEY = 21
API_VERSION = 1
RESPONSE_TYPE = DeleteRecordsResponse_v1
SCHEMA = DeleteRecordsRequest_v0.SCHEMA


class DeleteRecordsRequest_v2(Request):
API_KEY = 21
API_VERSION = 2
FLEXIBLE_VERSION = True
RESPONSE_TYPE = DeleteRecordsResponse_v2
SCHEMA = Schema(
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("offset", Int64),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("timeout_ms", Int32),
("tags", TaggedFields),
)


DeleteRecordsRequest = [
DeleteRecordsRequest_v0,
DeleteRecordsRequest_v1,
DeleteRecordsRequest_v2,
]

DeleteRecordsResponse = [
DeleteRecordsResponse_v0,
DeleteRecordsResponse_v1,
DeleteRecordsResponse_v2,
]
37 changes: 36 additions & 1 deletion tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio

from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic
from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic, RecordsToDelete
from aiokafka.admin.config_resource import ConfigResource, ConfigResourceType
from aiokafka.consumer import AIOKafkaConsumer
from aiokafka.producer import AIOKafkaProducer
Expand Down Expand Up @@ -201,3 +201,38 @@ async def test_list_consumer_group_offsets(self):
assert resp[tp].offset == msg.offset + 1
resp = await admin.list_consumer_group_offsets(group_id, partitions=[tp])
assert resp[tp].offset == msg.offset + 1

@kafka_versions(">=1.1.0")
@run_until_complete
async def test_delete_records(self):
admin = await self.create_admin()

await admin.create_topics([NewTopic(self.topic, 1, 1)])

async with AIOKafkaProducer(bootstrap_servers=self.hosts) as producer:
first_message = await producer.send_and_wait(
self.topic, partition=0, value=b"some-message"
)
await producer.send_and_wait(
self.topic, partition=0, value=b"other-message"
)

await admin.delete_records(
{
TopicPartition(self.topic, 0): RecordsToDelete(
before_offset=first_message.offset + 1
)
}
)

consumer = AIOKafkaConsumer(
self.topic,
bootstrap_servers=self.hosts,
enable_auto_commit=False,
auto_offset_reset="earliest",
)
await consumer.start()
self.add_cleanup(consumer.stop)

msg = await consumer.getone()
assert msg.value == b"other-message"