Skip to content

Commit 9b7ab41

Browse files
committed
Add fix for duplicates caused by consume exiting too early
#11
1 parent 727d23d commit 9b7ab41

File tree

1 file changed

+30
-2
lines changed

1 file changed

+30
-2
lines changed

shard_consumer.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func (k *Kinsumer) consume(shardID string) {
134134

135135
// finished means we have reached the end of the shard but haven't necessarily processed/committed everything
136136
finished := false
137+
137138
// Make sure we release the shard when we are done.
138139
defer func() {
139140
innerErr := checkpointer.release()
@@ -167,7 +168,7 @@ mainloop:
167168
// Handle async actions, and throttle requests to keep kinesis happy
168169
select {
169170
case <-k.stop:
170-
return
171+
break mainloop
171172
case <-commitTicker.C:
172173
finishCommitted, err := checkpointer.commit()
173174
if err != nil {
@@ -234,7 +235,7 @@ mainloop:
234235
return
235236
}
236237
case <-k.stop:
237-
return
238+
break mainloop
238239
case k.records <- &consumedRecord{
239240
record: record,
240241
checkpointer: checkpointer,
@@ -250,4 +251,31 @@ mainloop:
250251
}
251252
iterator = next
252253
}
254+
// Handle checkpointer updates which occur after a stop request comes in (whose originating records were before)
255+
256+
// commit first in case the checkpointer has been updates since the last commit.
257+
checkpointer.commitIntervalCounter = 0 // Reset commitIntervalCounter to avoid retaining ownership if there's no new sequence number
258+
checkpointer.commit()
259+
260+
// Resume commit loop for some time, ensuring that we don't retain ownership unless there's a new sequence number.
261+
timeoutCounter := 0
262+
263+
for {
264+
select {
265+
case <-commitTicker.C:
266+
timeoutCounter += int(k.config.commitFrequency)
267+
checkpointer.commitIntervalCounter = 0
268+
finishCommitted, err := checkpointer.commit()
269+
if err != nil {
270+
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
271+
return
272+
}
273+
if finishCommitted {
274+
return
275+
}
276+
if timeoutCounter >= int(k.maxAgeForClientRecord/2) {
277+
return
278+
}
279+
}
280+
}
253281
}

0 commit comments

Comments
 (0)