Skip to content

Commit fdafe31

Browse files
authored
Merge pull request #22 from snowplow-devops/add-merge-upstream
Add merge upstream
2 parents 8cc8e5a + dd87bce commit fdafe31

File tree

3 files changed

+21
-2
lines changed

3 files changed

+21
-2
lines changed

config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ type Config struct {
4949
dynamoWriteCapacity int64
5050
// Time to wait between attempts to verify tables were created/deleted completely
5151
dynamoWaiterDelay time.Duration
52+
53+
// use ListShards to avoid LimitExceedException from DescribeStream
54+
useListShardsForKinesisStreamReady bool
5255
}
5356

5457
// NewConfig returns a default Config struct
@@ -148,6 +151,12 @@ func (c Config) WithLogger(logger Logger) Config {
148151
return c
149152
}
150153

154+
// WithUseListShardsForKinesisStreamReady returns a config with a modified useListShardsForKinesisStreamReady toggle
155+
func (c Config) WithUseListShardsForKinesisStreamReady(shouldUse bool) Config {
156+
c.useListShardsForKinesisStreamReady = shouldUse
157+
return c
158+
}
159+
151160
// Verify that a config struct has sane and valid values
152161
func validateConfig(c *Config) error {
153162
if c.throttleDelay < 200*time.Millisecond {

config_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ func TestConfigWithMethods(t *testing.T) {
6464
WithLeaderActionFrequency(1 * time.Second).
6565
WithThrottleDelay(1 * time.Second).
6666
WithStats(stats).
67-
WithIteratorStartTimestamp(&tstamp)
67+
WithIteratorStartTimestamp(&tstamp).
68+
WithUseListShardsForKinesisStreamReady(true)
6869

6970
err := validateConfig(&config)
7071
require.NoError(t, err)
@@ -76,4 +77,5 @@ func TestConfigWithMethods(t *testing.T) {
7677
require.Equal(t, 1*time.Second, config.leaderActionFrequency)
7778
require.Equal(t, stats, config.stats)
7879
require.Equal(t, &tstamp, config.iteratorStartTimestamp)
80+
require.Equal(t, true, config.useListShardsForKinesisStreamReady)
7981
}

kinsumer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,15 @@ func (k *Kinsumer) dynamoDeleteTableIfExists(name string) error {
325325

326326
// kinesisStreamReady returns an error if the given stream is not ACTIVE
327327
func (k *Kinsumer) kinesisStreamReady() error {
328+
if k.config.useListShardsForKinesisStreamReady {
329+
_, err := k.kinesis.ListShards(&kinesis.ListShardsInput{
330+
StreamName: aws.String(k.streamName),
331+
})
332+
if err != nil {
333+
return fmt.Errorf("error listing shards for stream %s: %v", k.streamName, err)
334+
}
335+
return nil
336+
}
328337
out, err := k.kinesis.DescribeStream(&kinesis.DescribeStreamInput{
329338
StreamName: aws.String(k.streamName),
330339
})
@@ -336,7 +345,6 @@ func (k *Kinsumer) kinesisStreamReady() error {
336345
if status != "ACTIVE" && status != "UPDATING" {
337346
return fmt.Errorf("stream %s exists but state '%s' is not 'ACTIVE' or 'UPDATING'", k.streamName, status)
338347
}
339-
340348
return nil
341349
}
342350

0 commit comments

Comments
 (0)