diff --git a/config/burrow.toml b/config/burrow.toml index ea2dc4fc..55dc5fe9 100644 --- a/config/burrow.toml +++ b/config/burrow.toml @@ -68,3 +68,18 @@ template-close="conf/default-http-delete.tmpl" method-close="DELETE" send-close=true threshold=1 + +# TLS+IAM example +# This example assumes EKS pod identity; otherwise, one needs to +# provide the role-arn value explicitly. +#[iam.eks] +#region = "us-west-1" +### Not needed with pod identity +### role-arn = "arn:aws:iam::123456789012:role/burrow-eks-role" + + +#[client-profile.msk-iam] +#kafka-version = "2.0.0" +#client-id = "burrow" +#tls = "msk-tls" +#iam = "eks" diff --git a/core/internal/helpers/iam.go b/core/internal/helpers/iam.go new file mode 100644 index 00000000..6ce95c22 --- /dev/null +++ b/core/internal/helpers/iam.go @@ -0,0 +1,42 @@ +package helpers + +import ( + "context" + + sarama "github.com/IBM/sarama" + "github.com/aws/aws-msk-iam-sasl-signer-go/signer" +) + +var ( + signerGenerateAuthToken = signer.GenerateAuthToken + signerGenerateAuthTokenFromRole = signer.GenerateAuthTokenFromRole + signerGenerateAuthTokenFromProfile = signer.GenerateAuthTokenFromProfile +) + +type iamTokenProvider struct { + region, roleArn, profile string +} + +func (p *iamTokenProvider) Token() (*sarama.AccessToken, error) { + var tok string + var err error + + switch { + case p.roleArn != "": + tok, _, err = signerGenerateAuthTokenFromRole( + context.TODO(), p.region, p.roleArn, "burrow-session") + + case p.profile != "": + tok, _, err = signerGenerateAuthTokenFromProfile( + context.TODO(), p.region, p.profile) + + default: + tok, _, err = signerGenerateAuthToken( + context.TODO(), p.region) + } + + if err != nil { + return nil, err + } + return &sarama.AccessToken{Token: tok}, nil +} diff --git a/core/internal/helpers/iam_test.go b/core/internal/helpers/iam_test.go new file mode 100644 index 00000000..87875c0f --- /dev/null +++ b/core/internal/helpers/iam_test.go @@ -0,0 +1,156 @@ +package helpers + +import ( + "context" + "errors" + "testing" + + sarama "github.com/IBM/sarama" +) + +// stubAll replaces the three signer helpers and guarantees they are restored. +func stubAll( + t *testing.T, + authFn func(ctx context.Context, region string) (string, int64, error), + roleFn func(ctx context.Context, region, roleArn, session string) (string, int64, error), + profFn func(ctx context.Context, region, profile string) (string, int64, error), +) func() { + origAuth := signerGenerateAuthToken + origRole := signerGenerateAuthTokenFromRole + origProf := signerGenerateAuthTokenFromProfile + + signerGenerateAuthToken = authFn + signerGenerateAuthTokenFromRole = roleFn + signerGenerateAuthTokenFromProfile = profFn + + return func() { + signerGenerateAuthToken = origAuth + signerGenerateAuthTokenFromRole = origRole + signerGenerateAuthTokenFromProfile = origProf + } +} + +// helpers that must *not* be invoked in a given path +func mustNotCallAuth(t *testing.T) func(ctx context.Context, region string) (string, int64, error) { + return func(context.Context, string) (string, int64, error) { + t.Fatalf("unexpected call to GenerateAuthToken") + return "", 0, nil + } +} +func mustNotCallRole(t *testing.T) func(context.Context, string, string, string) (string, int64, error) { + return func(context.Context, string, string, string) (string, int64, error) { + t.Fatalf("unexpected call to GenerateAuthTokenFromRole") + return "", 0, nil + } +} +func mustNotCallProf(t *testing.T) func(context.Context, string, string) (string, int64, error) { + return func(context.Context, string, string) (string, int64, error) { + t.Fatalf("unexpected call to GenerateAuthTokenFromProfile") + return "", 0, nil + } +} + +func TestIamTokenProvider(t *testing.T) { + tests := []struct { + name string + provider iamTokenProvider + setupStub func(t *testing.T) (restore func()) + wantTok string + wantErr bool + }{ + { + name: "default credential chain", + provider: iamTokenProvider{region: "eu-central-1"}, + setupStub: func(t *testing.T) func() { + return stubAll( + t, + func(_ context.Context, region string) (string, int64, error) { + if region != "eu-central-1" { + t.Fatalf("region = %s, want eu-central-1", region) + } + return "tok-default", 0, nil + }, + mustNotCallRole(t), + mustNotCallProf(t), + ) + }, + wantTok: "tok-default", + }, + { + name: "assume‑role path overrides profile", + provider: iamTokenProvider{region: "us-east-1", roleArn: "arn:aws:iam::123456789012:role/test", profile: "ignored"}, + setupStub: func(t *testing.T) func() { + return stubAll( + t, + mustNotCallAuth(t), + func(_ context.Context, region, role, sess string) (string, int64, error) { + if region != "us-east-1" || role != "arn:aws:iam::123456789012:role/test" { + t.Fatalf("bad args to role helper") + } + return "tok-role", 0, nil + }, + mustNotCallProf(t), + ) + }, + wantTok: "tok-role", + }, + { + name: "named profile path", + provider: iamTokenProvider{region: "ap-south-1", profile: "burrow"}, + setupStub: func(t *testing.T) func() { + return stubAll( + t, + mustNotCallAuth(t), + mustNotCallRole(t), + func(_ context.Context, region, profile string) (string, int64, error) { + if region != "ap-south-1" || profile != "burrow" { + t.Fatalf("bad args to profile helper") + } + return "tok-profile", 0, nil + }, + ) + }, + wantTok: "tok-profile", + }, + { + name: "signer returns error", + provider: iamTokenProvider{region: "eu-west-1"}, + setupStub: func(t *testing.T) func() { + return stubAll( + t, + func(context.Context, string) (string, int64, error) { + return "", 0, errors.New("boom") + }, + mustNotCallRole(t), + mustNotCallProf(t), + ) + }, + wantErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + restore := tc.setupStub(t) + defer restore() + + got, err := tc.provider.Token() + + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got == nil || got.Token != tc.wantTok { + t.Fatalf("token = %v, want %s", got, tc.wantTok) + } + if _, ok := interface{}(got).(*sarama.AccessToken); !ok { + t.Fatalf("return type is not *sarama.AccessToken") + } + }) + } +} diff --git a/core/internal/helpers/sarama.go b/core/internal/helpers/sarama.go index 97713ba7..a95dae62 100644 --- a/core/internal/helpers/sarama.go +++ b/core/internal/helpers/sarama.go @@ -12,6 +12,7 @@ package helpers import ( "crypto/tls" "crypto/x509" + "fmt" "os" "time" @@ -134,6 +135,29 @@ func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config { saramaConfig.Net.SASL.Password = viper.GetString("sasl." + saslName + ".password") } + if iamName := viper.GetString(configRoot + ".iam"); iamName != "" { + iamRoot := "iam." + iamName + region := viper.GetString(iamRoot + ".region") + if region == "" { + panic(fmt.Sprintf("iam.%s: region is required", iamName)) + } + + // IAM auth *requires* TLS + if !saramaConfig.Net.TLS.Enable { + panic(fmt.Sprintf("client-profile %s uses iam.%s but has no tls profile", + profileName, iamName)) + } + + saramaConfig.Net.SASL.Enable = true + saramaConfig.Net.SASL.Handshake = true + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + saramaConfig.Net.SASL.TokenProvider = &iamTokenProvider{ + region: region, + roleArn: viper.GetString(iamRoot + ".role-arn"), + profile: viper.GetString(iamRoot + ".profile"), + } + } + // Timeout for the initial connection if viper.IsSet(configRoot + ".dial-timeout") { saramaConfig.Net.DialTimeout = time.Duration(viper.GetInt(configRoot+".dial-timeout")) * time.Second diff --git a/go.mod b/go.mod index a42e89cd..09b152d3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24 require ( github.com/IBM/sarama v1.45.2 github.com/OneOfOne/xxhash v1.2.8 + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 github.com/julienschmidt/httprouter v1.3.0 github.com/karrick/goswarm v1.10.0 github.com/linkedin/go-zk v0.1.4 @@ -21,6 +22,19 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.32.4 // indirect + github.com/aws/aws-sdk-go-v2/config v1.28.2 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.43 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.4 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect + github.com/aws/smithy-go v1.22.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 65ff9ec2..3a7f7140 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,34 @@ github.com/IBM/sarama v1.45.2 h1:8m8LcMCu3REcwpa7fCP6v2fuPuzVwXDAM2DOv3CBrKw= github.com/IBM/sarama v1.45.2/go.mod h1:ppaoTcVdGv186/z6MEKsMm70A5fwJfRTpstI37kVn3Y= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4 h1:2jAwFwA0Xgcx94dUId+K24yFabsKYDtAhCgyMit6OqE= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.4/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI= +github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE= +github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2/config v1.28.2 h1:FLvWA97elBiSPdIol4CXfIAY1wlq3KzoSgkMuZSuSe8= +github.com/aws/aws-sdk-go-v2/config v1.28.2/go.mod h1:hNmQsKfUqpKz2yfnZUB60GCemPmeqAalVTui0gOxjAE= +github.com/aws/aws-sdk-go-v2/credentials v1.17.43 h1:SEGdVOOE1Wyr2XFKQopQ5GYjym3nYHcphesdt78rNkY= +github.com/aws/aws-sdk-go-v2/credentials v1.17.43/go.mod h1:3aiza5kSyAE4eujSanOkSkAmX/RnVqslM+GRQ/Xvv4c= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 h1:woXadbf0c7enQ2UGCi8gW/WuKmE0xIzxBF/eD94jMKQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19/go.mod h1:zminj5ucw7w0r65bP6nhyOd3xL6veAUMc3ElGMoLVb4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 h1:tHxQi/XHPK0ctd/wdOw0t7Xrc2OxcRCnVzv8lwWPu0c= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4/go.mod h1:4GQbF1vJzG60poZqWatZlhP31y8PGCCVTvIGPdaaYJ0= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.4 h1:BqE3NRG6bsODh++VMKMsDmFuJTHrdD4rJZqHjDeF6XI= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.4/go.mod h1:wrMCEwjFPms+V86TCQQeOxQF/If4vT44FGIOFiMC2ck= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 h1:zcx9LiGWZ6i6pjdcoE9oXAB6mUdeyC36Ia/QEiIvYdg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4/go.mod h1:Tp/ly1cTjRLGBBmNccFumbZ8oqpZlpdhFf80SrRh4is= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGIxF+on3KOISbK5IU= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/iam-auth.md b/iam-auth.md new file mode 100644 index 00000000..e78ab531 --- /dev/null +++ b/iam-auth.md @@ -0,0 +1,83 @@ +# AWS MSK IAM authentication for Burrow + +Burrow can now connect to **Amazon MSK clusters that use IAM (SASL/OAUTHBEARER) + TLS** +without proxies or sidecars. +The feature is built on: + +* [`github.com/aws/aws-msk-iam-sasl-signer-go` v1.0.4] – generates the SigV4 + bearer token expected by the broker. +* [`github.com/IBM/sarama` ≥ v1.45.2] – already exposes the generic + *OAUTHBEARER* mechanism and `AccessTokenProvider` hook we plug into. + +Supported credential sources +---------------------------- + +| Credential chain node | Works? | Notes | +|-----------------------|--------|-------| +| EC2 / EKS instance profile | ✅ | No extra config needed. | +| **EKS Pod Identity** | ✅ | Preferred on EKS ≥ 1.29. | +| **IRSA (IAM Roles for SA)** | ✅ | Works via the same default chain. | +| Shared credentials / static keys | ✅ | Standard `~/.aws/credentials` or env vars. | +| STS AssumeRole w/ session tags | ✅ | Add `role-arn` in the TOML (requires `sts:AssumeRole` + `sts:TagSession`). | + +> IAM auth **requires** the broker listener `TLS + IAM` (port **9098** or **9198**) to be enabled. + +Quick-start +----------- + +1. Enable the *SASL/IAM* and *TLS* security settings on your MSK cluster. +2. Make sure Burrow’s container can reach the IAM listener (security groups, NACLs, etc.). +3. Add the sections shown in `examples/burrow.toml` (below) and restart Burrow. + +Example configuration +--------------------- + +```toml +####################################################################### +# TLS: trust the Amazon root CAs shipped in most Linux distros +####################################################################### +[tls.msk-tls] +# For Debian/Ubuntu/RHEL/… the CA bundle is already present: +cacert = "/etc/ssl/certs/ca-certificates.crt" + +####################################################################### +# IAM: pick one of the credential modes below +####################################################################### + +# --- A) Use the pod / instance credentials directly --------------- +[iam.msk-iam] +region = "us-west-1" + +# --- B) Re-assume a dedicated read-only role ---------------------- +#[iam.msk-iam] +#region = "us-west-1" +#role-arn = "arn:aws:iam::123456789012:role/burrow-readonly" +#profile = "burrow" # optional named profile + +####################################################################### +[client-profile.msk-iam] +kafka-version = "2.8.0" +client-id = "burrow" +tls = "msk-tls" +iam = "msk-iam" + +####################################################################### +# Cluster + consumer definitions (use the TLS+IAM listener) +####################################################################### +[cluster.prod] +class-name = "kafka" +client-profile = "msk-iam" +servers = [ + "b-1.prod.abcd.c2.kafka.us-west-1.amazonaws.com:9098", + "b-2.prod.abcd.c2.kafka.us-west-1.amazonaws.com:9098", + "b-3.prod.abcd.c2.kafka.us-west-1.amazonaws.com:9098" +] + +[consumer.prod] +class-name = "kafka" +client-profile = "msk-iam" +cluster = "prod" +start-latest = true +group-denylist = '^(console-consumer-|.*\s.*).*$' +servers = ${cluster.prod.servers} # reuse same list +```