Skip to content

Commit 69b5e7e

Browse files
author
Vincent Maurin
committed
Include "start" in the try/finally in docs
Both producer and consumer classes have a "start" and "stop" method. `start` initializes background co-routine, open connections and execute various tasks that need to be cleaned up with the `stop` method. But `stop` also hold a logic based on a boolean `_closed` that is initialized to `False` at the end of the `__init__` method. Thus, if one create a consumer or a producer and doesn't call `stop` even if `start` was never call, the finalizer in `__del__` will send a warning. A correct pattern of calling `stop` in a try/finally block seems to be then to do the `try` just after the `__init__` and not just after the `start` as an error or a task cancellation in between would leave the producer/consumer "open" and trigger the warning fixes #938
1 parent 5ec91e7 commit 69b5e7e

14 files changed

+28
-27
lines changed

README.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ Example of AIOKafkaProducer usage:
2727
2828
async def send_one():
2929
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
30-
# Get cluster layout and initial topic/partition leadership information
31-
await producer.start()
3230
try:
31+
# Get cluster layout and initial topic/partition leadership information
32+
await producer.start()
3333
# Produce message
3434
await producer.send_and_wait("my_topic", b"Super message")
3535
finally:
@@ -58,9 +58,9 @@ Example of AIOKafkaConsumer usage:
5858
'my_topic', 'my_other_topic',
5959
bootstrap_servers='localhost:9092',
6060
group_id="my-group")
61-
# Get cluster layout and join group `my-group`
62-
await consumer.start()
6361
try:
62+
# Get cluster layout and join group `my-group`
63+
await consumer.start()
6464
# Consume messages
6565
async for msg in consumer:
6666
print("consumed: ", msg.topic, msg.partition, msg.offset,

aiokafka/consumer/consumer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ async def stop(self):
509509
510510
* Commit last consumed message if autocommit enabled
511511
* Leave group if used Consumer Groups
512+
* Close the underlying Kafka client
512513
"""
513514
if self._closed:
514515
return

docs/consumer.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ from a Kafka cluster. Most simple usage would be::
1414
"my_topic",
1515
bootstrap_servers='localhost:9092'
1616
)
17-
await consumer.start()
1817
try:
18+
await consumer.start()
1919
async for msg in consumer:
2020
print(
2121
"{}:{:d}:{:d}: key={} value={} timestamp_ms={}".format(

docs/examples/local_state_consumer.rst

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,15 @@ Local State consumer:
113113
auto_offset_reset="none",
114114
key_deserializer=lambda key: key.decode("utf-8") if key else "",
115115
)
116-
await consumer.start()
117116
118117
local_state = LocalState()
119118
listener = RebalanceListener(consumer, local_state)
120-
consumer.subscribe(topics=["test"], listener=listener)
121119
122120
save_task = asyncio.create_task(save_state_every_second(local_state))
123121
124122
try:
125-
123+
await consumer.start()
124+
consumer.subscribe(topics=["test"], listener=listener)
126125
while True:
127126
try:
128127
msg_set = await consumer.getmany(timeout_ms=1000)

docs/examples/ssl_consume_produce.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ information.
2727
bootstrap_servers='localhost:9093',
2828
security_protocol="SSL", ssl_context=context)
2929
30-
await producer.start()
3130
try:
31+
await producer.start()
3232
msg = await producer.send_and_wait(
3333
'my_topic', b"Super Message", partition=0)
3434
finally:
@@ -37,8 +37,8 @@ information.
3737
consumer = AIOKafkaConsumer(
3838
"my_topic", bootstrap_servers='localhost:9093',
3939
security_protocol="SSL", ssl_context=context)
40-
await consumer.start()
4140
try:
41+
await consumer.start()
4242
consumer.seek(TopicPartition('my_topic', 0), msg.offset)
4343
fetch_msg = await consumer.getone()
4444
finally:

docs/examples/transaction_example.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,16 @@ process data and produce the resut to ``OUT_TOPIC`` in a transactional manner.
5050
group_id=GROUP_ID,
5151
isolation_level="read_committed" # <-- This will filter aborted txn's
5252
)
53-
await consumer.start()
5453
5554
producer = AIOKafkaProducer(
5655
bootstrap_servers=BOOTSTRAP_SERVERS,
5756
transactional_id=TRANSACTIONAL_ID
5857
)
59-
await producer.start()
6058
6159
try:
60+
await consumer.start()
61+
await producer.start()
62+
6263
while True:
6364
msg_batch = await consumer.getmany(timeout_ms=POLL_TIMEOUT)
6465

docs/index.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ Here's a consumer example:
4141
'my_topic', 'my_other_topic',
4242
bootstrap_servers='localhost:9092',
4343
group_id="my-group")
44-
# Get cluster layout and join group `my-group`
45-
await consumer.start()
4644
try:
45+
# Get cluster layout and join group `my-group`
46+
await consumer.start()
4747
# Consume messages
4848
async for msg in consumer:
4949
print("consumed: ", msg.topic, msg.partition, msg.offset,
@@ -71,9 +71,9 @@ Here's a producer example:
7171
async def send_one():
7272
producer = AIOKafkaProducer(
7373
bootstrap_servers='localhost:9092')
74-
# Get cluster layout and initial topic/partition leadership information
75-
await producer.start()
7674
try:
75+
# Get cluster layout and initial topic/partition leadership information
76+
await producer.start()
7777
# Produce message
7878
await producer.send_and_wait("my_topic", b"Super message")
7979
finally:

docs/producer.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ Producer client
99
to the Kafka cluster. Most simple usage would be::
1010

1111
producer = aiokafka.AIOKafkaProducer(bootstrap_servers="localhost:9092")
12-
await producer.start()
1312
try:
13+
await producer.start()
1414
await producer.send_and_wait("my_topic", b"Super message")
1515
finally:
1616
await producer.stop()
@@ -103,8 +103,8 @@ by passing the parameter ``enable_idempotence=True`` to :class:`~.AIOKafkaProduc
103103
producer = aiokafka.AIOKafkaProducer(
104104
bootstrap_servers='localhost:9092',
105105
enable_idempotence=True)
106-
await producer.start()
107106
try:
107+
await producer.start()
108108
await producer.send_and_wait("my_topic", b"Super message")
109109
finally:
110110
await producer.stop()
@@ -134,8 +134,8 @@ attendant APIs, you must set the ``transactional_id`` configuration property::
134134
producer = aiokafka.AIOKafkaProducer(
135135
bootstrap_servers='localhost:9092',
136136
transactional_id="transactional_test")
137-
await producer.start()
138137
try:
138+
await producer.start()
139139
async with producer.transaction():
140140
res = await producer.send_and_wait(
141141
"test-topic", b"Super transactional message")

examples/local_state_consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,15 @@ async def consume():
9494
auto_offset_reset="none",
9595
key_deserializer=lambda key: key.decode("utf-8") if key else "",
9696
)
97-
await consumer.start()
9897

9998
local_state = LocalState()
10099
listener = RebalanceListener(consumer, local_state)
101-
consumer.subscribe(topics=["test"], listener=listener)
102100

103101
save_task = asyncio.create_task(save_state_every_second(local_state))
104102

105103
try:
104+
await consumer.start()
105+
consumer.subscribe(topics=["test"], listener=listener)
106106

107107
while True:
108108
try:

examples/ssl_consume_produce.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ async def produce_and_consume():
1717
bootstrap_servers='localhost:9093',
1818
security_protocol="SSL", ssl_context=context)
1919

20-
await producer.start()
2120
try:
21+
await producer.start()
2222
msg = await producer.send_and_wait(
2323
'my_topic', b"Super Message", partition=0)
2424
finally:
@@ -27,8 +27,8 @@ async def produce_and_consume():
2727
consumer = AIOKafkaConsumer(
2828
"my_topic", bootstrap_servers='localhost:9093',
2929
security_protocol="SSL", ssl_context=context)
30-
await consumer.start()
3130
try:
31+
await consumer.start()
3232
consumer.seek(TopicPartition('my_topic', 0), msg.offset)
3333
fetch_msg = await consumer.getone()
3434
finally:

0 commit comments

Comments
 (0)