Skip to content

Commit ac11cb6

Browse files
committed
Handle unclosed producer and consumer warnings (#1130)
Signed-off-by: Callan Gray <[email protected]>
1 parent 87dd905 commit ac11cb6

File tree

3 files changed

+110
-97
lines changed

3 files changed

+110
-97
lines changed

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ Bugfixes:
1616

1717
* Make KafkaStorageError retriable after metadata refresh like in other
1818
implementations (pr #1115 by @omerhadari)
19+
* Fix producer and consumer requiring closing after `start` or `__aenter__` raise an exception.
20+
(issue #1130, pr #1131 by @calgray)
1921

2022

2123
Misc:

aiokafka/consumer/consumer.py

Lines changed: 80 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
import traceback
66
import warnings
7+
from contextlib import AsyncExitStack
78

89
from aiokafka import __version__
910
from aiokafka.abc import ConsumerRebalanceListener
@@ -335,7 +336,6 @@ def __init__(
335336

336337
if loop.get_debug():
337338
self._source_traceback = traceback.extract_stack(sys._getframe(1))
338-
self._closed = False
339339

340340
if topics:
341341
topics = self._validate_topics(topics)
@@ -368,83 +368,94 @@ async def start(self):
368368
self._loop is get_running_loop()
369369
), "Please create objects with the same loop as running with"
370370
assert self._fetcher is None, "Did you call `start` twice?"
371-
await self._client.bootstrap()
372-
await self._wait_topics()
373371

374-
if self._client.api_version < (0, 9):
375-
raise ValueError(f"Unsupported Kafka version: {self._client.api_version}")
372+
async with AsyncExitStack() as stack:
373+
await self._client.bootstrap()
374+
stack.push_async_callback(self._client.close)
375+
await self._wait_topics()
376376

377-
if (
378-
self._isolation_level == "read_committed"
379-
and self._client.api_version < (0, 11) # fmt: skip
380-
):
381-
raise UnsupportedVersionError(
382-
"`read_committed` isolation_level available only for Brokers "
383-
"0.11 and above"
384-
)
377+
if self._client.api_version < (0, 9):
378+
raise ValueError(
379+
f"Unsupported Kafka version: {self._client.api_version}"
380+
)
385381

386-
self._fetcher = Fetcher(
387-
self._client,
388-
self._subscription,
389-
key_deserializer=self._key_deserializer,
390-
value_deserializer=self._value_deserializer,
391-
fetch_min_bytes=self._fetch_min_bytes,
392-
fetch_max_bytes=self._fetch_max_bytes,
393-
fetch_max_wait_ms=self._fetch_max_wait_ms,
394-
max_partition_fetch_bytes=self._max_partition_fetch_bytes,
395-
check_crcs=self._check_crcs,
396-
fetcher_timeout=self._consumer_timeout,
397-
retry_backoff_ms=self._retry_backoff_ms,
398-
auto_offset_reset=self._auto_offset_reset,
399-
isolation_level=self._isolation_level,
400-
)
382+
if (
383+
self._isolation_level == "read_committed"
384+
and self._client.api_version < (0, 11) # fmt: skip
385+
):
386+
raise UnsupportedVersionError(
387+
"`read_committed` isolation_level available only for Brokers "
388+
"0.11 and above"
389+
)
401390

402-
if self._group_id is not None:
403-
# using group coordinator for automatic partitions assignment
404-
self._coordinator = GroupCoordinator(
391+
self._fetcher = Fetcher(
405392
self._client,
406393
self._subscription,
407-
group_id=self._group_id,
408-
group_instance_id=self._group_instance_id,
409-
heartbeat_interval_ms=self._heartbeat_interval_ms,
410-
session_timeout_ms=self._session_timeout_ms,
394+
key_deserializer=self._key_deserializer,
395+
value_deserializer=self._value_deserializer,
396+
fetch_min_bytes=self._fetch_min_bytes,
397+
fetch_max_bytes=self._fetch_max_bytes,
398+
fetch_max_wait_ms=self._fetch_max_wait_ms,
399+
max_partition_fetch_bytes=self._max_partition_fetch_bytes,
400+
check_crcs=self._check_crcs,
401+
fetcher_timeout=self._consumer_timeout,
411402
retry_backoff_ms=self._retry_backoff_ms,
412-
enable_auto_commit=self._enable_auto_commit,
413-
auto_commit_interval_ms=self._auto_commit_interval_ms,
414-
assignors=self._partition_assignment_strategy,
415-
exclude_internal_topics=self._exclude_internal_topics,
416-
rebalance_timeout_ms=self._rebalance_timeout_ms,
417-
max_poll_interval_ms=self._max_poll_interval_ms,
403+
auto_offset_reset=self._auto_offset_reset,
404+
isolation_level=self._isolation_level,
418405
)
419-
if self._subscription.subscription is not None:
420-
if self._subscription.partitions_auto_assigned():
406+
stack.push_async_callback(self._fetcher.close)
407+
408+
if self._group_id is not None:
409+
# using group coordinator for automatic partitions assignment
410+
self._coordinator = GroupCoordinator(
411+
self._client,
412+
self._subscription,
413+
group_id=self._group_id,
414+
group_instance_id=self._group_instance_id,
415+
heartbeat_interval_ms=self._heartbeat_interval_ms,
416+
session_timeout_ms=self._session_timeout_ms,
417+
retry_backoff_ms=self._retry_backoff_ms,
418+
enable_auto_commit=self._enable_auto_commit,
419+
auto_commit_interval_ms=self._auto_commit_interval_ms,
420+
assignors=self._partition_assignment_strategy,
421+
exclude_internal_topics=self._exclude_internal_topics,
422+
rebalance_timeout_ms=self._rebalance_timeout_ms,
423+
max_poll_interval_ms=self._max_poll_interval_ms,
424+
)
425+
stack.push_async_callback(self._coordinator.close)
426+
427+
if self._subscription.subscription is not None:
428+
if self._subscription.partitions_auto_assigned():
429+
# Either we passed `topics` to constructor or `subscribe`
430+
# was called before `start`
431+
await self._subscription.wait_for_assignment()
432+
else:
433+
# `assign` was called before `start`. We did not start
434+
# this task on that call, as coordinator was yet to be
435+
# created
436+
self._coordinator.start_commit_offsets_refresh_task(
437+
self._subscription.subscription.assignment
438+
)
439+
else:
440+
# Using a simple assignment coordinator for reassignment on
441+
# metadata changes
442+
self._coordinator = NoGroupCoordinator(
443+
self._client,
444+
self._subscription,
445+
exclude_internal_topics=self._exclude_internal_topics,
446+
)
447+
stack.push_async_callback(self._coordinator.close)
448+
449+
if (
450+
self._subscription.subscription is not None
451+
and self._subscription.partitions_auto_assigned()
452+
):
421453
# Either we passed `topics` to constructor or `subscribe`
422454
# was called before `start`
423-
await self._subscription.wait_for_assignment()
424-
else:
425-
# `assign` was called before `start`. We did not start
426-
# this task on that call, as coordinator was yet to be
427-
# created
428-
self._coordinator.start_commit_offsets_refresh_task(
429-
self._subscription.subscription.assignment
430-
)
431-
else:
432-
# Using a simple assignment coordinator for reassignment on
433-
# metadata changes
434-
self._coordinator = NoGroupCoordinator(
435-
self._client,
436-
self._subscription,
437-
exclude_internal_topics=self._exclude_internal_topics,
438-
)
439-
440-
if (
441-
self._subscription.subscription is not None
442-
and self._subscription.partitions_auto_assigned()
443-
):
444-
# Either we passed `topics` to constructor or `subscribe`
445-
# was called before `start`
446-
await self._client.force_metadata_update()
447-
self._coordinator.assign_all_partitions(check_unknown=True)
455+
await self._client.force_metadata_update()
456+
self._coordinator.assign_all_partitions(check_unknown=True)
457+
self._exit_stack = stack.pop_all()
458+
self._closed = False
448459

449460
async def _wait_topics(self):
450461
if self._subscription.subscription is not None:
@@ -514,11 +525,7 @@ async def stop(self):
514525
return
515526
log.debug("Closing the KafkaConsumer.")
516527
self._closed = True
517-
if self._coordinator:
518-
await self._coordinator.close()
519-
if self._fetcher:
520-
await self._fetcher.close()
521-
await self._client.close()
528+
await self._exit_stack.aclose()
522529
log.debug("The KafkaConsumer has closed.")
523530

524531
async def commit(self, offsets=None):

aiokafka/producer/producer.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import sys
44
import traceback
55
import warnings
6+
from contextlib import AsyncExitStack
67

78
from aiokafka.client import AIOKafkaClient
89
from aiokafka.codec import has_gzip, has_lz4, has_snappy, has_zstd
@@ -324,8 +325,6 @@ def __init__(
324325
request_timeout_ms=request_timeout_ms,
325326
)
326327

327-
self._closed = False
328-
329328
# Warn if producer was not closed properly
330329
# We don't attempt to close the Consumer, as __del__ is synchronous
331330
def __del__(self, _warnings=warnings):
@@ -349,26 +348,32 @@ async def start(self):
349348
self._loop is get_running_loop()
350349
), "Please create objects with the same loop as running with"
351350
log.debug("Starting the Kafka producer") # trace
352-
await self.client.bootstrap()
353-
354-
if self._compression_type == "lz4":
355-
assert self.client.api_version >= (0, 8, 2), (
356-
"LZ4 Requires >= Kafka 0.8.2 Brokers"
357-
) # fmt: skip
358-
elif self._compression_type == "zstd":
359-
assert self.client.api_version >= (2, 1, 0), (
360-
"Zstd Requires >= Kafka 2.1.0 Brokers"
361-
) # fmt: skip
362-
363-
if self._txn_manager is not None and self.client.api_version < (0, 11):
364-
raise UnsupportedVersionError(
365-
"Idempotent producer available only for Broker version 0.11"
366-
" and above"
367-
)
351+
async with AsyncExitStack() as stack:
352+
await self.client.bootstrap()
353+
stack.push_async_callback(self.client.close)
354+
355+
if self._compression_type == "lz4":
356+
assert self.client.api_version >= (0, 8, 2), (
357+
"LZ4 Requires >= Kafka 0.8.2 Brokers"
358+
) # fmt: skip
359+
elif self._compression_type == "zstd":
360+
assert self.client.api_version >= (2, 1, 0), (
361+
"Zstd Requires >= Kafka 2.1.0 Brokers"
362+
) # fmt: skip
363+
364+
if self._txn_manager is not None and self.client.api_version < (0, 11):
365+
raise UnsupportedVersionError(
366+
"Idempotent producer available only for Broker version 0.11"
367+
" and above"
368+
)
368369

369-
await self._sender.start()
370-
self._message_accumulator.set_api_version(self.client.api_version)
371-
self._producer_magic = 0 if self.client.api_version < (0, 10) else 1
370+
await self._sender.start()
371+
stack.push_async_callback(self._sender.close)
372+
373+
self._message_accumulator.set_api_version(self.client.api_version)
374+
self._producer_magic = 0 if self.client.api_version < (0, 10) else 1
375+
self._exit_stack = stack.pop_all()
376+
self._closed = False
372377
log.debug("Kafka producer started")
373378

374379
async def flush(self):
@@ -379,6 +384,7 @@ async def stop(self):
379384
"""Flush all pending data and close all connections to kafka cluster"""
380385
if self._closed:
381386
return
387+
log.debug("Closing the KafkaProducer.")
382388
self._closed = True
383389

384390
# If the sender task is down there is no way for accumulator to flush
@@ -391,9 +397,7 @@ async def stop(self):
391397
return_when=asyncio.FIRST_COMPLETED,
392398
)
393399

394-
await self._sender.close()
395-
396-
await self.client.close()
400+
await self._exit_stack.aclose()
397401
log.debug("The Kafka producer has closed.")
398402

399403
async def partitions_for(self, topic):

0 commit comments

Comments
 (0)