Skip to content

Commit fa7ad99

Browse files
authored
Merge branch 'master' into master
2 parents 301a438 + 3b35b32 commit fa7ad99

File tree

4 files changed

+60
-6
lines changed

4 files changed

+60
-6
lines changed

auth.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package kinetic
2+
3+
import (
4+
"errors"
5+
gokinesis "github.com/rewardStyle/go-kinesis"
6+
)
7+
8+
var MetaAuthenticationErr = errors.New("Authentication error: failed to auth from meta. Your IAM roles are bad, or you need to specify an AccessKey and SecretKey")
9+
10+
func authenticate(accessKey, secretKey string) (auth *gokinesis.AuthCredentials, err error) {
11+
if accessKey == "" || secretKey == "" {
12+
if auth, err = gokinesis.NewAuthFromMetadata(); err != nil {
13+
return nil, MetaAuthenticationErr
14+
}
15+
} else {
16+
auth = gokinesis.NewAuth(accessKey, secretKey)
17+
}
18+
return
19+
}

firehose.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,13 @@ func (p *Producer) Firehose() (*Producer, error) {
6565
}
6666
p.setConcurrency(conf.Concurrency.Producer)
6767
p.initChannels()
68-
68+
auth, err := authenticate(conf.AWS.AccessKey, conf.AWS.SecretKey)
6969
p.kinesis = &kinesis{
7070
stream: conf.Firehose.Stream,
71-
client: gokinesis.NewWithEndpoint(gokinesis.NewAuth(conf.AWS.AccessKey, conf.AWS.SecretKey), conf.AWS.Region, fmt.Sprintf(firehoseURL, conf.AWS.Region)),
71+
client: gokinesis.NewWithEndpoint(auth, conf.AWS.Region, fmt.Sprintf(firehoseURL, conf.AWS.Region)),
72+
}
73+
if err != nil {
74+
return p, err
7275
}
7376

7477
p.producerType = firehoseType
@@ -87,10 +90,13 @@ func (p *Producer) FirehoseC(stream, accessKey, secretKey, region string, concur
8790

8891
p.setConcurrency(concurrency)
8992
p.initChannels()
90-
93+
auth, err := authenticate(accessKey, secretKey)
9194
p.kinesis = &kinesis{
9295
stream: stream,
93-
client: gokinesis.NewWithEndpoint(gokinesis.NewAuth(accessKey, secretKey), region, fmt.Sprintf(firehoseURL, region)),
96+
client: gokinesis.NewWithEndpoint(auth, region, fmt.Sprintf(firehoseURL, region)),
97+
}
98+
if err != nil {
99+
return p, err
94100
}
95101

96102
p.producerType = firehoseType

kinesis.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,19 @@ type kinesis struct {
6565
}
6666

6767
func (k *kinesis) init(stream, shard, shardIteratorType, accessKey, secretKey, region string) (*kinesis, error) {
68+
69+
auth, err := authenticate(accessKey, secretKey)
6870
k = &kinesis{
6971
stream: stream,
7072
shard: shard,
7173
shardIteratorType: shardIteratorType,
72-
client: gokinesis.New(gokinesis.NewAuth(accessKey, secretKey), region),
74+
client: gokinesis.New(auth, region),
75+
}
76+
if err != nil {
77+
return k, err
7378
}
7479

75-
err := k.initShardIterator()
80+
err = k.initShardIterator()
7681
if err != nil {
7782
return k, err
7883
}

kinesis_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package kinetic
2+
3+
import (
4+
. "github.com/smartystreets/goconvey/convey"
5+
"testing"
6+
)
7+
8+
// TestKineticCreation tests to make sure that the Kinetic creation doesn't return early or return a nil.
9+
func TestKineticCreation(t *testing.T) {
10+
kinesisKinetic, err := new(kinesis).init("fake", "ShardId-00000001", "TRIM_HORIZON", "BADaccessKey", "BADsecretKey", "region")
11+
12+
Convey("Given an badly configured init-ed kinetic", t, func() {
13+
Convey("the error returned should not be nil", func() {
14+
So(err, ShouldNotBeNil)
15+
})
16+
Convey("the returned kinesis struct pointer should not be nil", func() {
17+
So(kinesisKinetic, ShouldNotBeNil)
18+
})
19+
Convey("it should also have some data in it", func() {
20+
So(kinesisKinetic.stream, ShouldEqual, "fake")
21+
So(kinesisKinetic.shard, ShouldEqual, "ShardId-00000001")
22+
})
23+
})
24+
}

0 commit comments

Comments
 (0)