Skip to content

Commit f70b82f

Browse files
committed
kafkatest: VerifiableConsumer must send shutdown message even on failure
Needed for fenced static consumer test to correctly calculate active members.
1 parent cb5f882 commit f70b82f

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

confluent_kafka/kafkatest/verifiable_consumer.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ def to_dict(self):
275275
vc.consumer.subscribe(args['topic'],
276276
on_assign=vc.on_assign, on_revoke=vc.on_revoke)
277277

278+
failed = False
279+
278280
try:
279281
while vc.run:
280282
msg = vc.consumer.poll(timeout=1.0)
@@ -295,13 +297,22 @@ def to_dict(self):
295297
vc.run = False
296298
pass
297299

300+
except Exception as e:
301+
vc.dbg('Terminating on exception: %s' % str(e))
302+
failed = True
303+
298304
vc.dbg('Closing consumer')
299305
vc.send_records_consumed(immediate=True)
300-
if not vc.use_auto_commit:
301-
vc.do_commit(immediate=True, asynchronous=False)
302306

303-
vc.consumer.close()
307+
if not failed:
308+
try:
309+
if not vc.use_auto_commit:
310+
vc.do_commit(immediate=True, asynchronous=False)
311+
vc.consumer.close()
312+
except Exception as e:
313+
vc.dbg('Ignoring exception while closing: %s' % str(e))
314+
failed = True
304315

305-
vc.send({'name': 'shutdown_complete'})
316+
vc.send({'name': 'shutdown_complete', 'failed': failed})
306317

307318
vc.dbg('All done')

0 commit comments

Comments
 (0)