Skip to content

Commit 942f76f

Browse files
committed
Handle unclosed producer and consumer warnings (#1130)
Signed-off-by: Callan Gray <[email protected]>
1 parent 87dd905 commit 942f76f

File tree

3 files changed

+120
-98
lines changed

3 files changed

+120
-98
lines changed

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ Bugfixes:
1616

1717
* Make KafkaStorageError retriable after metadata refresh like in other
1818
implementations (pr #1115 by @omerhadari)
19+
* Fix producer and consumer requiring closing after `start` or `__aenter__` raise an exception.
20+
(issue #1130, pr #1131 by @calgray)
1921

2022

2123
Misc:

aiokafka/consumer/consumer.py

Lines changed: 83 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
import traceback
66
import warnings
7+
from contextlib import AsyncExitStack
78

89
from aiokafka import __version__
910
from aiokafka.abc import ConsumerRebalanceListener
@@ -335,7 +336,6 @@ def __init__(
335336

336337
if loop.get_debug():
337338
self._source_traceback = traceback.extract_stack(sys._getframe(1))
338-
self._closed = False
339339

340340
if topics:
341341
topics = self._validate_topics(topics)
@@ -368,83 +368,98 @@ async def start(self):
368368
self._loop is get_running_loop()
369369
), "Please create objects with the same loop as running with"
370370
assert self._fetcher is None, "Did you call `start` twice?"
371-
await self._client.bootstrap()
372-
await self._wait_topics()
373371

374-
if self._client.api_version < (0, 9):
375-
raise ValueError(f"Unsupported Kafka version: {self._client.api_version}")
372+
self._exit_stack = AsyncExitStack()
373+
try:
374+
await self._client.bootstrap()
375+
self._exit_stack.push_async_callback(self._client.close)
376+
await self._wait_topics()
376377

377-
if (
378-
self._isolation_level == "read_committed"
379-
and self._client.api_version < (0, 11) # fmt: skip
380-
):
381-
raise UnsupportedVersionError(
382-
"`read_committed` isolation_level available only for Brokers "
383-
"0.11 and above"
384-
)
378+
if self._client.api_version < (0, 9):
379+
raise ValueError(
380+
f"Unsupported Kafka version: {self._client.api_version}"
381+
)
385382

386-
self._fetcher = Fetcher(
387-
self._client,
388-
self._subscription,
389-
key_deserializer=self._key_deserializer,
390-
value_deserializer=self._value_deserializer,
391-
fetch_min_bytes=self._fetch_min_bytes,
392-
fetch_max_bytes=self._fetch_max_bytes,
393-
fetch_max_wait_ms=self._fetch_max_wait_ms,
394-
max_partition_fetch_bytes=self._max_partition_fetch_bytes,
395-
check_crcs=self._check_crcs,
396-
fetcher_timeout=self._consumer_timeout,
397-
retry_backoff_ms=self._retry_backoff_ms,
398-
auto_offset_reset=self._auto_offset_reset,
399-
isolation_level=self._isolation_level,
400-
)
383+
if (
384+
self._isolation_level == "read_committed"
385+
and self._client.api_version < (0, 11) # fmt: skip
386+
):
387+
raise UnsupportedVersionError(
388+
"`read_committed` isolation_level available only for Brokers "
389+
"0.11 and above"
390+
)
401391

402-
if self._group_id is not None:
403-
# using group coordinator for automatic partitions assignment
404-
self._coordinator = GroupCoordinator(
392+
self._fetcher = Fetcher(
405393
self._client,
406394
self._subscription,
407-
group_id=self._group_id,
408-
group_instance_id=self._group_instance_id,
409-
heartbeat_interval_ms=self._heartbeat_interval_ms,
410-
session_timeout_ms=self._session_timeout_ms,
395+
key_deserializer=self._key_deserializer,
396+
value_deserializer=self._value_deserializer,
397+
fetch_min_bytes=self._fetch_min_bytes,
398+
fetch_max_bytes=self._fetch_max_bytes,
399+
fetch_max_wait_ms=self._fetch_max_wait_ms,
400+
max_partition_fetch_bytes=self._max_partition_fetch_bytes,
401+
check_crcs=self._check_crcs,
402+
fetcher_timeout=self._consumer_timeout,
411403
retry_backoff_ms=self._retry_backoff_ms,
412-
enable_auto_commit=self._enable_auto_commit,
413-
auto_commit_interval_ms=self._auto_commit_interval_ms,
414-
assignors=self._partition_assignment_strategy,
415-
exclude_internal_topics=self._exclude_internal_topics,
416-
rebalance_timeout_ms=self._rebalance_timeout_ms,
417-
max_poll_interval_ms=self._max_poll_interval_ms,
404+
auto_offset_reset=self._auto_offset_reset,
405+
isolation_level=self._isolation_level,
418406
)
419-
if self._subscription.subscription is not None:
420-
if self._subscription.partitions_auto_assigned():
407+
self._exit_stack.push_async_callback(self._fetcher.close)
408+
409+
if self._group_id is not None:
410+
# using group coordinator for automatic partitions assignment
411+
self._coordinator = GroupCoordinator(
412+
self._client,
413+
self._subscription,
414+
group_id=self._group_id,
415+
group_instance_id=self._group_instance_id,
416+
heartbeat_interval_ms=self._heartbeat_interval_ms,
417+
session_timeout_ms=self._session_timeout_ms,
418+
retry_backoff_ms=self._retry_backoff_ms,
419+
enable_auto_commit=self._enable_auto_commit,
420+
auto_commit_interval_ms=self._auto_commit_interval_ms,
421+
assignors=self._partition_assignment_strategy,
422+
exclude_internal_topics=self._exclude_internal_topics,
423+
rebalance_timeout_ms=self._rebalance_timeout_ms,
424+
max_poll_interval_ms=self._max_poll_interval_ms,
425+
)
426+
self._exit_stack.push_async_callback(self._coordinator.close)
427+
428+
if self._subscription.subscription is not None:
429+
if self._subscription.partitions_auto_assigned():
430+
# Either we passed `topics` to constructor or `subscribe`
431+
# was called before `start`
432+
await self._subscription.wait_for_assignment()
433+
else:
434+
# `assign` was called before `start`. We did not start
435+
# this task on that call, as coordinator was yet to be
436+
# created
437+
self._coordinator.start_commit_offsets_refresh_task(
438+
self._subscription.subscription.assignment
439+
)
440+
else:
441+
# Using a simple assignment coordinator for reassignment on
442+
# metadata changes
443+
self._coordinator = NoGroupCoordinator(
444+
self._client,
445+
self._subscription,
446+
exclude_internal_topics=self._exclude_internal_topics,
447+
)
448+
self._exit_stack.push_async_callback(self._coordinator.close)
449+
450+
if (
451+
self._subscription.subscription is not None
452+
and self._subscription.partitions_auto_assigned()
453+
):
421454
# Either we passed `topics` to constructor or `subscribe`
422455
# was called before `start`
423-
await self._subscription.wait_for_assignment()
424-
else:
425-
# `assign` was called before `start`. We did not start
426-
# this task on that call, as coordinator was yet to be
427-
# created
428-
self._coordinator.start_commit_offsets_refresh_task(
429-
self._subscription.subscription.assignment
430-
)
431-
else:
432-
# Using a simple assignment coordinator for reassignment on
433-
# metadata changes
434-
self._coordinator = NoGroupCoordinator(
435-
self._client,
436-
self._subscription,
437-
exclude_internal_topics=self._exclude_internal_topics,
438-
)
456+
await self._client.force_metadata_update()
457+
self._coordinator.assign_all_partitions(check_unknown=True)
439458

440-
if (
441-
self._subscription.subscription is not None
442-
and self._subscription.partitions_auto_assigned()
443-
):
444-
# Either we passed `topics` to constructor or `subscribe`
445-
# was called before `start`
446-
await self._client.force_metadata_update()
447-
self._coordinator.assign_all_partitions(check_unknown=True)
459+
self._closed = False
460+
except:
461+
await self._exit_stack.aclose()
462+
raise
448463

449464
async def _wait_topics(self):
450465
if self._subscription.subscription is not None:
@@ -514,11 +529,7 @@ async def stop(self):
514529
return
515530
log.debug("Closing the KafkaConsumer.")
516531
self._closed = True
517-
if self._coordinator:
518-
await self._coordinator.close()
519-
if self._fetcher:
520-
await self._fetcher.close()
521-
await self._client.close()
532+
await self._exit_stack.aclose()
522533
log.debug("The KafkaConsumer has closed.")
523534

524535
async def commit(self, offsets=None):

aiokafka/producer/producer.py

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import sys
44
import traceback
55
import warnings
6+
from contextlib import AsyncExitStack
67

78
from aiokafka.client import AIOKafkaClient
89
from aiokafka.codec import has_gzip, has_lz4, has_snappy, has_zstd
@@ -324,8 +325,6 @@ def __init__(
324325
request_timeout_ms=request_timeout_ms,
325326
)
326327

327-
self._closed = False
328-
329328
# Warn if producer was not closed properly
330329
# We don't attempt to close the Consumer, as __del__ is synchronous
331330
def __del__(self, _warnings=warnings):
@@ -348,28 +347,39 @@ async def start(self):
348347
assert (
349348
self._loop is get_running_loop()
350349
), "Please create objects with the same loop as running with"
351-
log.debug("Starting the Kafka producer") # trace
352-
await self.client.bootstrap()
353-
354-
if self._compression_type == "lz4":
355-
assert self.client.api_version >= (0, 8, 2), (
356-
"LZ4 Requires >= Kafka 0.8.2 Brokers"
357-
) # fmt: skip
358-
elif self._compression_type == "zstd":
359-
assert self.client.api_version >= (2, 1, 0), (
360-
"Zstd Requires >= Kafka 2.1.0 Brokers"
361-
) # fmt: skip
362-
363-
if self._txn_manager is not None and self.client.api_version < (0, 11):
364-
raise UnsupportedVersionError(
365-
"Idempotent producer available only for Broker version 0.11"
366-
" and above"
367-
)
368350

369-
await self._sender.start()
370-
self._message_accumulator.set_api_version(self.client.api_version)
371-
self._producer_magic = 0 if self.client.api_version < (0, 10) else 1
372-
log.debug("Kafka producer started")
351+
self._exit_stack = AsyncExitStack()
352+
try:
353+
log.debug("Starting the Kafka producer") # trace
354+
355+
await self.client.bootstrap()
356+
self._exit_stack.push_async_callback(self.client.close)
357+
358+
if self._compression_type == "lz4":
359+
assert self.client.api_version >= (0, 8, 2), (
360+
"LZ4 Requires >= Kafka 0.8.2 Brokers"
361+
) # fmt: skip
362+
elif self._compression_type == "zstd":
363+
assert self.client.api_version >= (2, 1, 0), (
364+
"Zstd Requires >= Kafka 2.1.0 Brokers"
365+
) # fmt: skip
366+
367+
if self._txn_manager is not None and self.client.api_version < (0, 11):
368+
raise UnsupportedVersionError(
369+
"Idempotent producer available only for Broker version 0.11"
370+
" and above"
371+
)
372+
373+
await self._sender.start()
374+
self._exit_stack.push_async_callback(self._sender.close)
375+
376+
self._message_accumulator.set_api_version(self.client.api_version)
377+
self._producer_magic = 0 if self.client.api_version < (0, 10) else 1
378+
self._closed = False
379+
log.debug("Kafka producer started")
380+
except:
381+
await self._exit_stack.aclose()
382+
raise
373383

374384
async def flush(self):
375385
"""Wait until all batches are Delivered and futures resolved"""
@@ -379,6 +389,7 @@ async def stop(self):
379389
"""Flush all pending data and close all connections to kafka cluster"""
380390
if self._closed:
381391
return
392+
log.debug("Closing the KafkaProducer.")
382393
self._closed = True
383394

384395
# If the sender task is down there is no way for accumulator to flush
@@ -391,9 +402,7 @@ async def stop(self):
391402
return_when=asyncio.FIRST_COMPLETED,
392403
)
393404

394-
await self._sender.close()
395-
396-
await self.client.close()
405+
await self._exit_stack.aclose()
397406
log.debug("The Kafka producer has closed.")
398407

399408
async def partitions_for(self, topic):

0 commit comments

Comments
 (0)