Skip to content

Commit 4e19415

Browse files
committed
Update checkpointer to commit periodically when there's no data
#12
1 parent 9b7ab41 commit 4e19415

File tree

4 files changed

+157
-40
lines changed

4 files changed

+157
-40
lines changed

checkpoints.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type checkpointer struct {
3232
finalSequenceNumber string
3333
updateSequencer chan struct{}
3434
lastUpdate int64
35+
commitIntervalCounter time.Duration
36+
lastRecordPassed time.Time
3537
}
3638

3739
type checkpointRecord struct {
@@ -59,7 +61,6 @@ func capture(
5961
maxAgeForClientRecord time.Duration,
6062
stats StatReceiver) (*checkpointer, error) {
6163

62-
// TODO: solve duplicate problem: If capture can grab shards as soon as maxAgeForClientRecord is reached, there are more chances for duplicates.
6364
cutoff := time.Now().Add(-maxAgeForClientRecord).UnixNano()
6465

6566
// Grab the entry from dynamo assuming there is one
@@ -146,12 +147,20 @@ func capture(
146147
// commit writes the latest SequenceNumber consumed to dynamo and updates LastUpdate.
147148
// Returns true if we set Finished in dynamo because the library user finished consuming the shard.
148149
// Once that has happened, the checkpointer should be released and never grabbed again.
149-
func (cp *checkpointer) commit() (bool, error) {
150+
func (cp *checkpointer) commit(commitFrequency time.Duration) (bool, error) {
150151
cp.mutex.Lock()
151152
defer cp.mutex.Unlock()
153+
152154
if !cp.dirty && !cp.finished {
153-
return false, nil
155+
cp.commitIntervalCounter += commitFrequency
156+
157+
// If we have recently passed a record to the user, don't update the table when we don't have a new sequence number
158+
// If we haven't, update at a rate of maxAgeForClientRecord/2
159+
if (time.Now().Sub(cp.lastRecordPassed) < cp.maxAgeForClientRecord/2) || (cp.commitIntervalCounter < cp.maxAgeForClientRecord/2) {
160+
return false, nil
161+
}
154162
}
163+
cp.commitIntervalCounter = 0 // Reset the counter if we're registering a commit
155164
now := time.Now()
156165

157166
sn := &cp.sequenceNumber
@@ -193,12 +202,7 @@ func (cp *checkpointer) commit() (bool, error) {
193202
ConditionExpression: aws.String("OwnerID = :ownerID"),
194203
ExpressionAttributeValues: attrVals,
195204
}); err != nil {
196-
// TODO: Add logging for these cases
197205
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == conditionalFail && cp.lastUpdate < time.Now().Add(-cp.maxAgeForClientRecord).UnixNano() {
198-
199-
// TODO: investigate if not marking cp.dirty = false here causes duplicates
200-
201-
// If we failed conditional check, and the record has expired, assume ownership has legitimately changed, and don't return the error.
202206
return false, nil
203207
}
204208

@@ -238,12 +242,8 @@ func (cp *checkpointer) release() error {
238242
ConditionExpression: aws.String("OwnerID = :ownerID"),
239243
ExpressionAttributeValues: attrVals,
240244
}); err != nil {
241-
// TODO: Add logging for these cases
242245
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == conditionalFail && cp.lastUpdate < time.Now().Add(-cp.maxAgeForClientRecord).UnixNano() {
243-
244-
// TODO: Investigate if not marking cp.captured = false here causes duplicates
245-
246-
// If we failed conditional check, and the record has expired, assume ownership has legitimately changed, and don't return the error.
246+
// If we failed conditional check, and the record has expired, assume that another client has legitimately siezed the shard.
247247
return nil
248248
}
249249
return fmt.Errorf("error releasing checkpoint: %s", err)

checkpoints_test.go

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestCheckpointer(t *testing.T) {
3434

3535
// Now actually commit.
3636
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq1)", func() {
37-
if _, err = cp.commit(); err != nil {
37+
if _, err = cp.commit(50 * time.Millisecond); err != nil {
3838
t.Errorf("commit seq1 err=%q", err)
3939
}
4040
})
@@ -44,7 +44,7 @@ func TestCheckpointer(t *testing.T) {
4444

4545
// Since the sequence number hasn't changed, committing shouldn't make a request.
4646
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "commit unchanged sequence number", func() {
47-
if _, err = cp.commit(); err != nil {
47+
if _, err = cp.commit(50 * time.Millisecond); err != nil {
4848
t.Errorf("commit unchanged err=%q", err)
4949
}
5050
})
@@ -54,7 +54,7 @@ func TestCheckpointer(t *testing.T) {
5454

5555
// committing should trigger a request
5656
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq2)", func() {
57-
if _, err = cp.commit(); err != nil {
57+
if _, err = cp.commit(50 * time.Millisecond); err != nil {
5858
t.Errorf("commit seq2 err=%q", err)
5959
}
6060
})
@@ -65,7 +65,7 @@ func TestCheckpointer(t *testing.T) {
6565

6666
// This should still trigger an update
6767
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq3)", func() {
68-
if _, err = cp.commit(); err != nil {
68+
if _, err = cp.commit(50 * time.Millisecond); err != nil {
6969
t.Errorf("commit seq3 err=%q", err)
7070
}
7171
})
@@ -104,3 +104,55 @@ func TestCheckpointer(t *testing.T) {
104104
}
105105
*/
106106
}
107+
108+
func TestCheckpointer2(t *testing.T) {
109+
table := "checkpoints"
110+
mock := mocks.NewMockDynamo([]string{table})
111+
stats := &NoopStatReceiver{}
112+
113+
cp, err := capture("shard", table, mock, "ownerName", "ownerId", 3*time.Minute, stats)
114+
115+
// Initially, we expect that there is no record, so our new record should have no sequence number
116+
if err != nil {
117+
t.Errorf("current 1 err=%q", err)
118+
}
119+
if cp == nil {
120+
t.Errorf("Should always be able to capture the shard if there is no entry in dynamo")
121+
}
122+
if cp.sequenceNumber != "" {
123+
t.Errorf("sequence number should initially be an empty string")
124+
}
125+
126+
// Update the sequence number. This shouldn't cause any external request.
127+
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "update(seq1)", func() {
128+
cp.update("seq1")
129+
})
130+
131+
// Now actually commit.
132+
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq1)", func() {
133+
if _, err = cp.commit(50 * time.Millisecond); err != nil {
134+
t.Errorf("commit seq1 err=%q", err)
135+
}
136+
})
137+
138+
// Call update, but keep the same sequence number
139+
cp.update("seq1")
140+
141+
// Since the sequence number hasn't changed, committing shouldn't make a request.
142+
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "commit unchanged sequence number", func() {
143+
if _, err = cp.commit(50 * time.Millisecond); err != nil {
144+
t.Errorf("commit unchanged err=%q", err)
145+
}
146+
})
147+
148+
// Set cp information to mimic conditions to retain the shard with no data.
149+
cp.maxAgeForClientRecord = 1 * time.Second
150+
cp.lastRecordPassed = time.Now().Add(-501 * time.Millisecond)
151+
152+
// committing should trigger a request
153+
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq2)", func() {
154+
if _, err = cp.commit(501 * time.Millisecond); err != nil {
155+
t.Errorf("commit seq2 err=%q", err)
156+
}
157+
})
158+
}

shard_consumer.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ func (k *Kinsumer) consume(shardID string) {
156156

157157
retryCount := 0
158158

159+
// lastSeqToCheckp is used to check if we have more data to checkpoint before we exit
160+
var lastSeqToCheckp string
161+
// lastSeqNum is used to check if a batch of data is the last in the stream
159162
var lastSeqNum string
160163
mainloop:
161164
for {
@@ -170,7 +173,7 @@ mainloop:
170173
case <-k.stop:
171174
break mainloop
172175
case <-commitTicker.C:
173-
finishCommitted, err := checkpointer.commit()
176+
finishCommitted, err := checkpointer.commit(k.config.commitFrequency)
174177
if err != nil {
175178
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
176179
return
@@ -226,7 +229,7 @@ mainloop:
226229
for {
227230
select {
228231
case <-commitTicker.C:
229-
finishCommitted, err := checkpointer.commit()
232+
finishCommitted, err := checkpointer.commit(k.config.commitFrequency)
230233
if err != nil {
231234
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
232235
return
@@ -241,6 +244,8 @@ mainloop:
241244
checkpointer: checkpointer,
242245
retrievedAt: retrievedAt,
243246
}:
247+
checkpointer.lastRecordPassed = time.Now() // Mark the time so we don't retain shards when we're too slow to do so
248+
lastSeqToCheckp = aws.StringValue(record.SequenceNumber)
244249
break RecordLoop
245250
}
246251
}
@@ -255,24 +260,37 @@ mainloop:
255260

256261
// commit first in case the checkpointer has been updates since the last commit.
257262
checkpointer.commitIntervalCounter = 0 // Reset commitIntervalCounter to avoid retaining ownership if there's no new sequence number
258-
checkpointer.commit()
259-
263+
_, err1 := checkpointer.commit(0 * time.Millisecond)
264+
if err1 != nil {
265+
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
266+
return
267+
}
260268
// Resume commit loop for some time, ensuring that we don't retain ownership unless there's a new sequence number.
261269
timeoutCounter := 0
262270

271+
// If we have committed the last sequence number returned to the user, just return. Otherwise, keep committing until we reach that state
272+
if !checkpointer.dirty && checkpointer.sequenceNumber == lastSeqToCheckp {
273+
return
274+
}
275+
263276
for {
264277
select {
265278
case <-commitTicker.C:
266279
timeoutCounter += int(k.config.commitFrequency)
267280
checkpointer.commitIntervalCounter = 0
268-
finishCommitted, err := checkpointer.commit()
281+
// passing 0 to commit ensures we no longer retain the shard.
282+
finishCommitted, err := checkpointer.commit(0 * time.Millisecond)
269283
if err != nil {
270284
k.shardErrors <- shardConsumerError{shardID: shardID, action: "checkpointer.commit", err: err}
271285
return
272286
}
273287
if finishCommitted {
274288
return
275289
}
290+
// Once we have committed the last sequence Number we passed to the user, return.
291+
if !checkpointer.dirty && checkpointer.sequenceNumber == lastSeqToCheckp {
292+
return
293+
}
276294
if timeoutCounter >= int(k.maxAgeForClientRecord/2) {
277295
return
278296
}

shard_consumer_test.go

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,10 @@ func TestForcefulOwnershipChange(t *testing.T) {
8585
maxAge2 := 500 * time.Millisecond
8686
config2 := config.WithClientRecordMaxAge(&maxAge2)
8787

88-
kinsumer1, err := NewWithInterfaces(k, dynamo, streamName, *applicationName, "client_1", config1)
89-
kinsumer2, err := NewWithInterfaces(k, dynamo, streamName, *applicationName, "client_2", config2)
88+
kinsumer1, err1 := NewWithInterfaces(k, dynamo, streamName, *applicationName, "client_1", config1)
89+
kinsumer2, err2 := NewWithInterfaces(k, dynamo, streamName, *applicationName, "client_2", config2)
90+
require.NoError(t, err1)
91+
require.NoError(t, err2)
9092

9193
desc, err := k.DescribeStream(&kinesis.DescribeStreamInput{
9294
StreamName: &streamName,
@@ -103,33 +105,83 @@ func TestForcefulOwnershipChange(t *testing.T) {
103105

104106
kinsumer1ResultBeforeOwnerChange := readEventsToSlice(kinsumer1.records, 5*time.Second)
105107

106-
assert.Equal(t, 2000, len(kinsumer1ResultBeforeOwnerChange)) // Assert that we get 100 results
108+
assert.Equal(t, 2000, len(kinsumer1ResultBeforeOwnerChange))
107109

108110
kinsumer2.waitGroup.Add(1) // consume will mark waitgroup as done on exit, so we add to it to avoid a panic
109111
go kinsumer2.consume(shard)
110112

111-
time.Sleep(1500 * time.Millisecond) // Sleep for long enough that an ownership change will happen
113+
// Because we retain the shard if no data is coming through, we mimic a stale client scenario by sending data but not acking
114+
lastK1Record := &consumedRecord{
115+
checkpointer: &checkpointer{},
116+
}
117+
OwnerChangeLoop:
118+
for {
119+
spamStreamModified(t, k, 1, streamName, 9999)
120+
getEventLoop:
121+
select {
122+
case k1record := <-kinsumer1.records: // if kinsumer1 gets it, don't ack
123+
lastK1Record = k1record
124+
break getEventLoop
125+
case k2record := <-kinsumer2.records: // if kisumer2 gets it, ownership has changed. Ack then move on to the test.
126+
k2record.checkpointer.update(aws.StringValue(k2record.record.SequenceNumber))
127+
// because this may be called with no genuine record to k1, we use the k2 sequence number.
128+
// this shouldn't make a difference since this commit will fail.
129+
lastK1Record.checkpointer.update(aws.StringValue(k2record.record.SequenceNumber)) // Ack the last k1 record we have, to instigate behaviour we would see for that client
130+
break OwnerChangeLoop
131+
}
132+
time.Sleep(120 * time.Millisecond)
133+
}
112134

113-
go spamStreamModified(t, k, 2000, streamName, 4000)
135+
time.Sleep(300 * time.Millisecond)
136+
137+
go spamStreamModified(t, k, 1000, streamName, 5000)
114138

115139
resultsAfterOwnerChange := readMultipleToSlice([]chan *consumedRecord{kinsumer1.records, kinsumer2.records}, 5*time.Second)
116-
kinsumer1ResultAfterOwnerChange := resultsAfterOwnerChange[0]
117-
kinsumer2ResultAfterOwnerChange := resultsAfterOwnerChange[1]
140+
kinsumer1ResultAfterOwnerChangePreClean := resultsAfterOwnerChange[0]
141+
kinsumer2ResultAfterOwnerChangePreClean := resultsAfterOwnerChange[1]
142+
143+
// clean out the records we just used to instigate a change in ownership
144+
kinsumer1ResultAfterOwnerChange := make([]*consumedRecord, 0)
145+
for _, val := range kinsumer1ResultAfterOwnerChangePreClean {
146+
if string(val.record.Data) != "9999" {
147+
kinsumer1ResultAfterOwnerChange = append(kinsumer1ResultAfterOwnerChange, val)
148+
}
149+
}
150+
151+
kinsumer2ResultAfterOwnerChange := make([]*consumedRecord, 0)
152+
for _, val := range kinsumer2ResultAfterOwnerChangePreClean {
153+
if string(val.record.Data) != "9999" {
154+
kinsumer2ResultAfterOwnerChange = append(kinsumer2ResultAfterOwnerChange, val)
155+
}
156+
}
118157

119158
/*
120-
// Leaving this here but commented out since it's useful in inspecting the behaviour when something does look off.
121-
investigationSlice := make([]string, 0)
122-
for _, record := range kinsumer1ResultAfterOwnerChange {
123-
investigationSlice = append(investigationSlice, string(record.record.Data))
159+
// Leaving this here but commented out since it's useful in inspecting the behaviour when something does look off.
160+
if len(resultsAfterOwnerChange) > 0 {
161+
investigationSlice := make([]string, 0)
162+
for _, record := range kinsumer1ResultAfterOwnerChange {
163+
investigationSlice = append(investigationSlice, string(record.record.Data))
164+
}
165+
fmt.Println(investigationSlice)
124166
}
125167
*/
126168

127169
assert.Equal(t, 0, len(kinsumer1ResultAfterOwnerChange))
128-
assert.Equal(t, 2000, len(kinsumer2ResultAfterOwnerChange))
170+
assert.Equal(t, 1000, len(kinsumer2ResultAfterOwnerChange))
171+
172+
dupes := make([]string, 0)
173+
174+
for _, val1 := range kinsumer1ResultAfterOwnerChange {
175+
for _, val2 := range kinsumer2ResultAfterOwnerChange {
176+
if string(val1.record.Data) == string(val2.record.Data) {
177+
dupes = append(dupes, string(val1.record.Data))
178+
}
179+
}
180+
}
129181

130182
// Check that every expected value is present in the results
131183
missingIntegers := make([]int, 0)
132-
for i := 4000; i < 6000; i++ {
184+
for i := 5000; i < 6000; i++ {
133185
present := false
134186
for _, val := range kinsumer2ResultAfterOwnerChange {
135187
if string(val.record.Data) == fmt.Sprint(i) {
@@ -519,8 +571,6 @@ func TestConsumerStopStart(t *testing.T) {
519571

520572
}
521573

522-
// TODO: For some reason, using the readMultipleToSlice function in this test causes us to receive no data for the first two consumers. Figure out why and resolve.
523-
// (ideally we start to read the data first then we start to stop and consumers)
524574
// TestMultipleConsumerStopStart tests the same thing as TestConsumerStopStart, but for the scenario where there are multiple clients vying for control of the same shard.
525575
// This is a common scenario when shards are merged, because the reported shard count will change relatively slowly over time (seconds), and for a period different clients will report different shard counts
526576
// The aim of this test is to give us some more robust assurance that there are no additional issues for multiple consumers on a shard which aren't caught when we only have one consumer at a time.
@@ -609,7 +659,6 @@ func TestMultipleConsumerStopStart(t *testing.T) {
609659
}
610660
}()
611661

612-
// TODO: Investigate why starting this process earlier results in an empty result set for one of the consumers.
613662
results := readMultipleToSlice([]chan *consumedRecord{kinsumer1.records, kinsumer2.records, kinsumer3.records}, 10*time.Second)
614663

615664
kinsumer1Result := results[0]
@@ -621,8 +670,6 @@ func TestMultipleConsumerStopStart(t *testing.T) {
621670
assert.NotEqual(t, 0, len(kinsumer2Result))
622671
assert.NotEqual(t, 0, len(kinsumer3Result))
623672

624-
fmt.Println("Lengths: ", len(kinsumer1Result), len(kinsumer2Result), len(kinsumer3Result)) // TODO: Remove this
625-
626673
// Check for dupes within each client's results
627674
kinsumer1Dupes := getDupesFromSlice(kinsumer1Result)
628675
kinsumer2Dupes := getDupesFromSlice(kinsumer2Result)

0 commit comments

Comments
 (0)