Skip to content

Commit e3f85e4

Browse files
committed
added unit test
1 parent 9e067bc commit e3f85e4

File tree

2 files changed

+49
-4
lines changed

2 files changed

+49
-4
lines changed

kinesumer.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -693,16 +693,21 @@ func (k *Kinesumer) consumeOnce(stream string, shard *Shard) ([]*kinesis.Record,
693693
}
694694
defer k.nextIters[stream].Store(shard.ID, output.NextShardIterator) // Update iter.
695695

696-
// set the shard closed state.
697-
// If shard has no data, NextShardIterator will be nil.
698-
// Reference: https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html#output
699-
shard.Closed = output.NextShardIterator == nil
696+
shard.Closed = getShardStatus(output)
700697

701698
// outer function has the for loop that takes care of the empty records case
702699
// so not needed to check it here.
703700
return output.Records, shard.Closed
704701
}
705702

703+
// getShardStatus returns whether the shard is closed or not.
704+
func getShardStatus(output *kinesis.GetRecordsOutput) bool {
705+
// set the shard closed state.
706+
// If shard has no data, NextShardIterator will be nil.
707+
// Reference: https://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html#output
708+
return output.NextShardIterator == nil
709+
}
710+
706711
func (k *Kinesumer) getNextShardIterator(
707712
ctx context.Context, stream, shardID string,
708713
) (*string, error) {

kinesumer_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,3 +682,43 @@ func TestKinesumer_cleanupOffsetsWorksFine(t *testing.T) {
682682
})
683683
}
684684
}
685+
686+
func Test_getShardStatus(t *testing.T) {
687+
tests := []struct {
688+
name string // description of this test case
689+
input struct {
690+
output *kinesis.GetRecordsOutput
691+
}
692+
want bool
693+
}{
694+
{
695+
name: "when shard is closed",
696+
input: struct {
697+
output *kinesis.GetRecordsOutput
698+
}{
699+
output: &kinesis.GetRecordsOutput{
700+
NextShardIterator: nil,
701+
},
702+
},
703+
want: true,
704+
},
705+
706+
{
707+
name: "when shard is open",
708+
input: struct {
709+
output *kinesis.GetRecordsOutput
710+
}{
711+
output: &kinesis.GetRecordsOutput{
712+
NextShardIterator: aws.String("shardIterator"),
713+
},
714+
},
715+
want: false,
716+
},
717+
}
718+
for _, tt := range tests {
719+
t.Run(tt.name, func(t *testing.T) {
720+
got := getShardStatus(tt.input.output)
721+
assert.Equal(t, tt.want, got, "Should be equal")
722+
})
723+
}
724+
}

0 commit comments

Comments
 (0)