Skip to content

Commit 301a438

Browse files
Jorge Emrys LandivarJorge Emrys Landivar
authored andcommitted
return an error when concurrency is set to less than 1
1 parent f720db0 commit 301a438

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

firehose.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func (p *Producer) firehoseFlush(counter *int, timer *time.Time) bool {
6060

6161
// Initialize a producer for Kinesis Firehose with the params supplied in the configuration file
6262
func (p *Producer) Firehose() (*Producer, error) {
63+
if conf.Concurrency.Producer < 1 {
64+
return nil, BadConcurrencyError
65+
}
6366
p.setConcurrency(conf.Concurrency.Producer)
6467
p.initChannels()
6568

@@ -75,6 +78,9 @@ func (p *Producer) Firehose() (*Producer, error) {
7578

7679
// Initialize a producer for Kinesis Firehose with the specified params
7780
func (p *Producer) FirehoseC(stream, accessKey, secretKey, region string, concurrency int) (*Producer, error) {
81+
if concurrency < 1 {
82+
return nil, BadConcurrencyError
83+
}
7884
if stream == "" {
7985
return nil, NullStreamError
8086
}

listener.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ type Listener struct {
4343

4444
func (l *Listener) init(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (*Listener, error) {
4545
var err error
46-
46+
if concurrency < 1 {
47+
return nil, BadConcurrencyError
48+
}
4749
if stream == "" {
4850
return nil, NullStreamError
4951
}

producer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const (
2424
var (
2525
ThroughputExceededError = errors.New("Configured AWS Kinesis throughput has been exceeded!")
2626
KinesisFailureError = errors.New("AWS Kinesis internal failure.")
27+
BadConcurrencyError = errors.New("Concurrency must be greater than zero.")
2728
)
2829

2930
// Producer keeps a queue of messages on a channel and continually attempts
@@ -54,7 +55,9 @@ type Producer struct {
5455

5556
func (p *Producer) init(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (*Producer, error) {
5657
var err error
57-
58+
if concurrency < 1 {
59+
return nil, BadConcurrencyError
60+
}
5861
if stream == "" {
5962
return nil, NullStreamError
6063
}

0 commit comments

Comments
 (0)