Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions kinesumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,10 @@ func (k *Kinesumer) listShards(stream string) (Shards, error) {
}
var shards []*Shard
for _, shard := range output.Shards {
// TODO(mingrammer): handle CLOSED shards.
// Closed shards will be handled while consuming the records.
if shard.SequenceNumberRange.EndingSequenceNumber == nil {
shards = append(shards, &Shard{
ID: *shard.ShardId,
Closed: shard.SequenceNumberRange.EndingSequenceNumber != nil,
ID: *shard.ShardId,
})
}
}
Expand All @@ -330,11 +329,10 @@ func (k *Kinesumer) listShards(stream string) (Shards, error) {
return nil, errors.WithStack(err)
}
for _, shard := range output.Shards {
// Skip CLOSED shards.
// Closed shards will be handled while consuming the records.
if shard.SequenceNumberRange.EndingSequenceNumber == nil {
shards = append(shards, &Shard{
ID: *shard.ShardId,
Closed: shard.SequenceNumberRange.EndingSequenceNumber != nil,
ID: *shard.ShardId,
})
}
}
Expand All @@ -345,7 +343,8 @@ func (k *Kinesumer) listShards(stream string) (Shards, error) {

// Consume consumes messages from Kinesis.
func (k *Kinesumer) Consume(
streams []string) (<-chan *Record, error) {
streams []string,
) (<-chan *Record, error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my "gofmt" tool's change, will change if not needed

k.streams = streams

ctx := context.Background()
Expand Down Expand Up @@ -635,15 +634,8 @@ func (k *Kinesumer) consumeLoop(stream string, shard *Shard) {
default:
time.Sleep(k.scanInterval)
records, closed := k.consumeOnce(stream, shard)
if closed {
k.cleanupOffsets(stream, shard)
return // Close consume loop if shard is CLOSED and has no data.
}

n := len(records)
if n == 0 {
continue
}

for i, record := range records {
r := &Record{
Expand All @@ -656,6 +648,13 @@ func (k *Kinesumer) consumeLoop(stream string, shard *Shard) {
if k.autoCommit && i == n-1 {
k.MarkRecord(r)
}

// Closed shard may have remaining data,
// so clean up is executed, once the records are pushed to the channel.
if closed {
k.cleanupOffsets(stream, shard)
return // Close consume loop if shard is CLOSED and has no data.
}
}
}
}
Expand Down Expand Up @@ -694,17 +693,24 @@ func (k *Kinesumer) consumeOnce(stream string, shard *Shard) ([]*kinesis.Record,
}
defer k.nextIters[stream].Store(shard.ID, output.NextShardIterator) // Update iter.

n := len(output.Records)
// We no longer care about shards that have no records left and are in the "CLOSED" state.
if n == 0 {
return nil, shard.Closed
}
shard.Closed = getShardStatus(output)

// outer function has the for loop that takes care of the empty records case
// so not needed to check it here.
return output.Records, shard.Closed
}

return output.Records, false
// getShardStatus returns whether the shard is closed or not.
func getShardStatus(output *kinesis.GetRecordsOutput) bool {
// set the shard closed state.
// If shard has no data, NextShardIterator will be nil.
// Reference: https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html#output
return output.NextShardIterator == nil
}

func (k *Kinesumer) getNextShardIterator(
ctx context.Context, stream, shardID string) (*string, error) {
ctx context.Context, stream, shardID string,
) (*string, error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my "gofmt" tool's change, will change if not needed

if iter, ok := k.nextIters[stream].Load(shardID); ok {
return iter.(*string), nil
}
Expand All @@ -729,7 +735,7 @@ func (k *Kinesumer) getNextShardIterator(
}

func (k *Kinesumer) commitPeriodically() {
var checkPointTicker = time.NewTicker(k.commitInterval)
checkPointTicker := time.NewTicker(k.commitInterval)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my "gofmt" tool's change, will change if not needed


for {
select {
Expand Down
40 changes: 40 additions & 0 deletions kinesumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,43 @@ func TestKinesumer_cleanupOffsetsWorksFine(t *testing.T) {
})
}
}

func Test_getShardStatus(t *testing.T) {
tests := []struct {
name string // description of this test case
input struct {
output *kinesis.GetRecordsOutput
}
want bool
}{
{
name: "when shard is closed",
input: struct {
output *kinesis.GetRecordsOutput
}{
output: &kinesis.GetRecordsOutput{
NextShardIterator: nil,
},
},
want: true,
},

{
name: "when shard is open",
input: struct {
output *kinesis.GetRecordsOutput
}{
output: &kinesis.GetRecordsOutput{
NextShardIterator: aws.String("shardIterator"),
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getShardStatus(tt.input.output)
assert.Equal(t, tt.want, got, "Should be equal")
})
}
}