Skip to content

Commit 3c4f36f

Browse files
authored
Merge pull request #25 from DocMerlin/master
return an error when concurrency is set to less than 1
2 parents 3b35b32 + fa7ad99 commit 3c4f36f

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

firehose.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func (p *Producer) firehoseFlush(counter *int, timer *time.Time) bool {
6060

6161
// Initialize a producer for Kinesis Firehose with the params supplied in the configuration file
6262
func (p *Producer) Firehose() (*Producer, error) {
63+
if conf.Concurrency.Producer < 1 {
64+
return nil, BadConcurrencyError
65+
}
6366
p.setConcurrency(conf.Concurrency.Producer)
6467
p.initChannels()
6568
auth, err := authenticate(conf.AWS.AccessKey, conf.AWS.SecretKey)
@@ -78,6 +81,9 @@ func (p *Producer) Firehose() (*Producer, error) {
7881

7982
// Initialize a producer for Kinesis Firehose with the specified params
8083
func (p *Producer) FirehoseC(stream, accessKey, secretKey, region string, concurrency int) (*Producer, error) {
84+
if concurrency < 1 {
85+
return nil, BadConcurrencyError
86+
}
8187
if stream == "" {
8288
return nil, NullStreamError
8389
}

listener.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ type Listener struct {
4343

4444
func (l *Listener) init(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (*Listener, error) {
4545
var err error
46-
46+
if concurrency < 1 {
47+
return nil, BadConcurrencyError
48+
}
4749
if stream == "" {
4850
return nil, NullStreamError
4951
}

producer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const (
2424
var (
2525
ThroughputExceededError = errors.New("Configured AWS Kinesis throughput has been exceeded!")
2626
KinesisFailureError = errors.New("AWS Kinesis internal failure.")
27+
BadConcurrencyError = errors.New("Concurrency must be greater than zero.")
2728
)
2829

2930
// Producer keeps a queue of messages on a channel and continually attempts
@@ -54,7 +55,9 @@ type Producer struct {
5455

5556
func (p *Producer) init(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (*Producer, error) {
5657
var err error
57-
58+
if concurrency < 1 {
59+
return nil, BadConcurrencyError
60+
}
5861
if stream == "" {
5962
return nil, NullStreamError
6063
}

0 commit comments

Comments
 (0)