Skip to content

Commit 0e4115b

Browse files
committed
Sanity checking offset commits.
1 parent 8660d05 commit 0e4115b

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

lib/kafka/offset_manager.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,20 @@ def set_default_offset(topic, default_offset)
5050
# @param offset [Integer] the offset of the message that should be marked as processed.
5151
# @return [nil]
5252
def mark_as_processed(topic, partition, offset)
53-
@uncommitted_offsets += 1
53+
unless @group.assigned_to?(topic, partition)
54+
@logger.debug "Not marking #{topic}/#{partition}:#{offset} as processed for partition not assigned to this consumer."
55+
return
56+
end
5457
@processed_offsets[topic] ||= {}
5558

59+
last_processed_offset = @processed_offsets[topic][partition] || -1
60+
if last_processed_offset > offset + 1
61+
@logger.debug "Not overwriting newer offset #{topic}/#{partition}:#{last_processed_offset-1} with older #{offset}"
62+
return
63+
end
64+
65+
@uncommitted_offsets += 1
66+
5667
# The committed offset should always be the offset of the next message that the
5768
# application will read, thus adding one to the last message processed.
5869
@processed_offsets[topic][partition] = offset + 1

0 commit comments

Comments
 (0)