Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions config/burrow.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,18 @@ template-close="conf/default-http-delete.tmpl"
method-close="DELETE"
send-close=true
threshold=1

# TLS+IAM example
# This example assumes EKS pod identity; otherwise, one needs to
# provide the role-arn value explicitly.
#[iam.eks]
#region = "us-west-1"
### Not needed with pod identity
### role-arn = "arn:aws:iam::123456789012:role/burrow-eks-role"


#[client-profile.msk-iam]
#kafka-version = "2.0.0"
#client-id = "burrow"
#tls = "msk-tls"
#iam = "eks"
42 changes: 42 additions & 0 deletions core/internal/helpers/iam.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package helpers

import (
"context"

sarama "github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
)

var (
signerGenerateAuthToken = signer.GenerateAuthToken
signerGenerateAuthTokenFromRole = signer.GenerateAuthTokenFromRole
signerGenerateAuthTokenFromProfile = signer.GenerateAuthTokenFromProfile
)

type iamTokenProvider struct {
region, roleArn, profile string
}

func (p *iamTokenProvider) Token() (*sarama.AccessToken, error) {
var tok string
var err error

switch {
case p.roleArn != "":
tok, _, err = signerGenerateAuthTokenFromRole(
context.TODO(), p.region, p.roleArn, "burrow-session")

case p.profile != "":
tok, _, err = signerGenerateAuthTokenFromProfile(
context.TODO(), p.region, p.profile)

default:
tok, _, err = signerGenerateAuthToken(
context.TODO(), p.region)
}

if err != nil {
return nil, err
}
return &sarama.AccessToken{Token: tok}, nil
}
156 changes: 156 additions & 0 deletions core/internal/helpers/iam_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package helpers

import (
"context"
"errors"
"testing"

sarama "github.com/IBM/sarama"
)

// stubAll replaces the three signer helpers and guarantees they are restored.
func stubAll(
t *testing.T,
authFn func(ctx context.Context, region string) (string, int64, error),
roleFn func(ctx context.Context, region, roleArn, session string) (string, int64, error),
profFn func(ctx context.Context, region, profile string) (string, int64, error),
) func() {
origAuth := signerGenerateAuthToken
origRole := signerGenerateAuthTokenFromRole
origProf := signerGenerateAuthTokenFromProfile

signerGenerateAuthToken = authFn
signerGenerateAuthTokenFromRole = roleFn
signerGenerateAuthTokenFromProfile = profFn

return func() {
signerGenerateAuthToken = origAuth
signerGenerateAuthTokenFromRole = origRole
signerGenerateAuthTokenFromProfile = origProf
}
}

// helpers that must *not* be invoked in a given path
func mustNotCallAuth(t *testing.T) func(ctx context.Context, region string) (string, int64, error) {
return func(context.Context, string) (string, int64, error) {
t.Fatalf("unexpected call to GenerateAuthToken")
return "", 0, nil
}
}
func mustNotCallRole(t *testing.T) func(context.Context, string, string, string) (string, int64, error) {
return func(context.Context, string, string, string) (string, int64, error) {
t.Fatalf("unexpected call to GenerateAuthTokenFromRole")
return "", 0, nil
}
}
func mustNotCallProf(t *testing.T) func(context.Context, string, string) (string, int64, error) {
return func(context.Context, string, string) (string, int64, error) {
t.Fatalf("unexpected call to GenerateAuthTokenFromProfile")
return "", 0, nil
}
}

func TestIamTokenProvider(t *testing.T) {
tests := []struct {
name string
provider iamTokenProvider
setupStub func(t *testing.T) (restore func())
wantTok string
wantErr bool
}{
{
name: "default credential chain",
provider: iamTokenProvider{region: "eu-central-1"},
setupStub: func(t *testing.T) func() {
return stubAll(
t,
func(_ context.Context, region string) (string, int64, error) {
if region != "eu-central-1" {
t.Fatalf("region = %s, want eu-central-1", region)
}
return "tok-default", 0, nil
},
mustNotCallRole(t),
mustNotCallProf(t),
)
},
wantTok: "tok-default",
},
{
name: "assume‑role path overrides profile",
provider: iamTokenProvider{region: "us-east-1", roleArn: "arn:aws:iam::123456789012:role/test", profile: "ignored"},
setupStub: func(t *testing.T) func() {
return stubAll(
t,
mustNotCallAuth(t),
func(_ context.Context, region, role, sess string) (string, int64, error) {
if region != "us-east-1" || role != "arn:aws:iam::123456789012:role/test" {
t.Fatalf("bad args to role helper")
}
return "tok-role", 0, nil
},
mustNotCallProf(t),
)
},
wantTok: "tok-role",
},
{
name: "named profile path",
provider: iamTokenProvider{region: "ap-south-1", profile: "burrow"},
setupStub: func(t *testing.T) func() {
return stubAll(
t,
mustNotCallAuth(t),
mustNotCallRole(t),
func(_ context.Context, region, profile string) (string, int64, error) {
if region != "ap-south-1" || profile != "burrow" {
t.Fatalf("bad args to profile helper")
}
return "tok-profile", 0, nil
},
)
},
wantTok: "tok-profile",
},
{
name: "signer returns error",
provider: iamTokenProvider{region: "eu-west-1"},
setupStub: func(t *testing.T) func() {
return stubAll(
t,
func(context.Context, string) (string, int64, error) {
return "", 0, errors.New("boom")
},
mustNotCallRole(t),
mustNotCallProf(t),
)
},
wantErr: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
restore := tc.setupStub(t)
defer restore()

got, err := tc.provider.Token()

if tc.wantErr {
if err == nil {
t.Fatalf("expected error, got nil")
}
return
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got == nil || got.Token != tc.wantTok {
t.Fatalf("token = %v, want %s", got, tc.wantTok)
}
if _, ok := interface{}(got).(*sarama.AccessToken); !ok {
t.Fatalf("return type is not *sarama.AccessToken")
}
})
}
}
24 changes: 24 additions & 0 deletions core/internal/helpers/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package helpers
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -134,6 +135,29 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config {
saramaConfig.Net.SASL.Password = viper.GetString("sasl." + saslName + ".password")
}

if iamName := viper.GetString(configRoot + ".iam"); iamName != "" {
iamRoot := "iam." + iamName
region := viper.GetString(iamRoot + ".region")
if region == "" {
panic(fmt.Sprintf("iam.%s: region is required", iamName))
}

// IAM auth *requires* TLS
if !saramaConfig.Net.TLS.Enable {
panic(fmt.Sprintf("client-profile %s uses iam.%s but has no tls profile",
profileName, iamName))
}

saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.Handshake = true
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaConfig.Net.SASL.TokenProvider = &iamTokenProvider{
region: region,
roleArn: viper.GetString(iamRoot + ".role-arn"),
profile: viper.GetString(iamRoot + ".profile"),
}
}

// Timeout for the initial connection
if viper.IsSet(configRoot + ".dial-timeout") {
saramaConfig.Net.DialTimeout = time.Duration(viper.GetInt(configRoot+".dial-timeout")) * time.Second
Expand Down
14 changes: 14 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.24
require (
github.com/IBM/sarama v1.45.2
github.com/OneOfOne/xxhash v1.2.8
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4
github.com/julienschmidt/httprouter v1.3.0
github.com/karrick/goswarm v1.10.0
github.com/linkedin/go-zk v0.1.4
Expand All @@ -21,6 +22,19 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.32.4 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.43 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
28 changes: 28 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,34 @@ github.com/IBM/sarama v1.45.2 h1:8m8LcMCu3REcwpa7fCP6v2fuPuzVwXDAM2DOv3CBrKw=
github.com/IBM/sarama v1.45.2/go.mod h1:ppaoTcVdGv186/z6MEKsMm70A5fwJfRTpstI37kVn3Y=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE=
github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/config v1.28.2 h1:FLvWA97elBiSPdIol4CXfIAY1wlq3KzoSgkMuZSuSe8=
github.com/aws/aws-sdk-go-v2/config v1.28.2/go.mod h1:hNmQsKfUqpKz2yfnZUB60GCemPmeqAalVTui0gOxjAE=
github.com/aws/aws-sdk-go-v2/credentials v1.17.43 h1:SEGdVOOE1Wyr2XFKQopQ5GYjym3nYHcphesdt78rNkY=
github.com/aws/aws-sdk-go-v2/credentials v1.17.43/go.mod h1:3aiza5kSyAE4eujSanOkSkAmX/RnVqslM+GRQ/Xvv4c=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 h1:woXadbf0c7enQ2UGCi8gW/WuKmE0xIzxBF/eD94jMKQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19/go.mod h1:zminj5ucw7w0r65bP6nhyOd3xL6veAUMc3ElGMoLVb4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 h1:tHxQi/XHPK0ctd/wdOw0t7Xrc2OxcRCnVzv8lwWPu0c=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4/go.mod h1:4GQbF1vJzG60poZqWatZlhP31y8PGCCVTvIGPdaaYJ0=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.4 h1:BqE3NRG6bsODh++VMKMsDmFuJTHrdD4rJZqHjDeF6XI=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.4/go.mod h1:wrMCEwjFPms+V86TCQQeOxQF/If4vT44FGIOFiMC2ck=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 h1:zcx9LiGWZ6i6pjdcoE9oXAB6mUdeyC36Ia/QEiIvYdg=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4/go.mod h1:Tp/ly1cTjRLGBBmNccFumbZ8oqpZlpdhFf80SrRh4is=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGIxF+on3KOISbK5IU=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down
Loading