Skip to content

Commit 501e053

Browse files
authored
Merge pull request #308 from bruin-data/kafka/fix-off-by-one-read
kafka: handle off-by-one read bug
2 parents 5f6b8aa + 5306aa4 commit 501e053

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

ingestr/src/kafka/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def kafka_consumer(
8383
# read messages up to the maximum offsets,
8484
# not waiting for new messages
8585
with closing(consumer):
86-
while tracker.has_unread:
86+
while True:
8787
messages = consumer.consume(batch_size, timeout=batch_timeout)
8888
if not messages:
8989
break
@@ -101,3 +101,6 @@ def kafka_consumer(
101101
tracker.renew(msg)
102102

103103
yield batch
104+
105+
if tracker.has_unread is False:
106+
return

0 commit comments

Comments
 (0)