Skip to content
This repository was archived by the owner on Mar 15, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sinks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func ManufactureSink() (e EventSinkInterface) {
panic("s3 sink specified but s3SinkBucketDir not specified")
}

endpoint := viper.GetString("s3SinkEndpoint")

// By default the json is pushed to s3 in not flatenned rfc5424 write format
// The option to write to s3 is in the flattened json format which will help in
// using the data in redshift with least effort
Expand All @@ -124,7 +126,7 @@ func ManufactureSink() (e EventSinkInterface) {
bufferSize := viper.GetInt("s3SinkBufferSize")
overflow := viper.GetBool("s3SinkDiscardMessages")

s, err := NewS3Sink(accessKeyID, secretAccessKey, region, bucket, bucketDir, uploadInterval, overflow, bufferSize, outputFormat)
s, err := NewS3Sink(accessKeyID, secretAccessKey, region, bucket, bucketDir, uploadInterval, overflow, bufferSize, outputFormat, endpoint)
if err != nil {
panic(err.Error())
}
Expand Down
6 changes: 5 additions & 1 deletion sinks/s3sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ type S3Sink struct {
}

// NewS3Sink is the factory method constructing a new S3Sink
func NewS3Sink(awsAccessKeyID string, s3SinkSecretAccessKey string, s3SinkRegion string, s3SinkBucket string, s3SinkBucketDir string, s3SinkUploadInterval int, overflow bool, bufferSize int, outputFormat string) (*S3Sink, error) {
func NewS3Sink(awsAccessKeyID string, s3SinkSecretAccessKey string, s3SinkRegion string, s3SinkBucket string, s3SinkBucketDir string, s3SinkUploadInterval int, overflow bool, bufferSize int, outputFormat string, s3SinkEndpoint string) (*S3Sink, error) {
awsConfig := &aws.Config{
Region: aws.String(s3SinkRegion),
Credentials: credentials.NewStaticCredentials(awsAccessKeyID, s3SinkSecretAccessKey, ""),
}

if s3SinkEndpoint != "" {
awsConfig = awsConfig.WithEndpoint(s3SinkEndpoint)
}

awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true)
sess, err := session.NewSession(awsConfig)
if err != nil {
Expand Down