@@ -15,11 +15,13 @@ import (
1515
1616const (
1717 kinesisReaderMaxBatchSize = 10000
18+ kinesisReaderDefaultConcurrency = 5
1819)
1920
2021// kinesisReaderOptions a struct that holds all of the KinesisReader's configurable parameters.
2122type kinesisReaderOptions struct {
2223 batchSize int // maximum records per GetRecordsRequest call
24+ concurrency int // maximum number of concurrent GetRecord or GetRecords calls allowed
2325 shardIterator * ShardIterator // shard iterator for Kinesis stream
2426 responseReadTimeout time.Duration // response read time out for GetRecordsRequest API call
2527 logLevel aws.LogLevelType // log level for configuring the LogHelper's log level
@@ -30,6 +32,7 @@ type kinesisReaderOptions struct {
3032func defaultKinesisReaderOptions () * kinesisReaderOptions {
3133 return & kinesisReaderOptions {
3234 batchSize : kinesisReaderMaxBatchSize ,
35+ concurrency : kinesisReaderDefaultConcurrency ,
3336 shardIterator : NewShardIterator (),
3437 responseReadTimeout : time .Second ,
3538 Stats : & NilConsumerStatsCollector {},
@@ -52,6 +55,18 @@ func KinesisReaderBatchSize(size int) KinesisReaderOptionsFn {
5255 }
5356}
5457
58+ // KinesisReaderConcurrency is a functional option method for configuring the KinesisReader's
59+ // concurrency.
60+ func KinesisReaderConcurrency (count int ) KinesisReaderOptionsFn {
61+ return func (o * KinesisReader ) error {
62+ if count > 0 {
63+ o .concurrency = count
64+ return nil
65+ }
66+ return ErrInvalidConcurrency
67+ }
68+ }
69+
5570// KinesisReaderShardIterator is a functional option method for configuring the KinesisReader's
5671// shard iterator.
5772func KinesisReaderShardIterator (shardIterator * ShardIterator ) KinesisReaderOptionsFn {
@@ -109,13 +124,13 @@ func NewKinesisReader(c *aws.Config, stream string, shard string, optionFns ...K
109124 kinesisReaderOptions : defaultKinesisReaderOptions (),
110125 stream : stream ,
111126 shard : shard ,
112- throttleSem : make (chan empty , 5 ),
113127 client : kinesis .New (sess ),
114128 }
115129 for _ , optionFn := range optionFns {
116130 optionFn (kinesisReader )
117131 }
118132
133+ kinesisReader .throttleSem = make (chan empty , kinesisReader .concurrency )
119134 kinesisReader .LogHelper = & LogHelper {
120135 LogLevel : kinesisReader .logLevel ,
121136 Logger : c .Logger ,
0 commit comments