Skip to content

Commit 5306aa4

Browse files
committed
kafka: handle off-by-one read bug
1 parent 5f6b8aa commit 5306aa4

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

ingestr/src/kafka/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def kafka_consumer(
8383
# read messages up to the maximum offsets,
8484
# not waiting for new messages
8585
with closing(consumer):
86-
while tracker.has_unread:
86+
while True:
8787
messages = consumer.consume(batch_size, timeout=batch_timeout)
8888
if not messages:
8989
break
@@ -101,3 +101,6 @@ def kafka_consumer(
101101
tracker.renew(msg)
102102

103103
yield batch
104+
105+
if tracker.has_unread is False:
106+
return

0 commit comments

Comments
 (0)