Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/1129.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement asynchronous context manager protocol on ``AIOKafkaAdminClient`` (PR #1129 by @PeterJCLaw)
14 changes: 14 additions & 0 deletions aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from collections import defaultdict
from collections.abc import Sequence
from ssl import SSLContext
from types import TracebackType
from typing import Any

import async_timeout
from typing_extensions import Self

from aiokafka import __version__
from aiokafka.client import AIOKafkaClient
Expand Down Expand Up @@ -160,6 +162,18 @@ async def start(self):
log.debug("AIOKafkaAdminClient started")
self._started = True

async def __aenter__(self) -> Self:
await self.start()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
await self.close()

def _matching_api_version(self, operation: Sequence[type[Request]]) -> int:
"""Find the latest version of the protocol operation
supported by both this library and the broker.
Expand Down
32 changes: 32 additions & 0 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,38 @@ async def test_metadata(self):
assert metadata.topics is not None
assert len(metadata.brokers) == 1

@kafka_versions(">=0.10.0.0")
@run_until_complete
async def test_context_manager(self):
async with AIOKafkaAdminClient(bootstrap_servers=self.hosts) as admin:
assert admin._started

# Arbitrary testing
metadata = await admin._get_cluster_metadata()
assert metadata.brokers is not None
assert metadata.topics is not None
assert len(metadata.brokers) == 1

assert admin._closed

# Test error case too
class FakeError:
pass

with pytest.raises(FakeError):
async with AIOKafkaAdminClient(bootstrap_servers=self.hosts) as admin:
assert admin._started

# Arbitrary testing
metadata = await admin._get_cluster_metadata()
assert metadata.brokers is not None
assert metadata.topics is not None
assert len(metadata.brokers) == 1

raise FakeError

assert admin._closed

@kafka_versions(">=0.10.1.0")
@run_until_complete
async def test_create_topics(self):
Expand Down