Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion cfg/kafka_client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package cfg

import (
"context"
"errors"
"os"

"github.com/ozontech/file.d/xoauth"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/oauth"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
Expand All @@ -27,6 +31,8 @@ type KafkaClientSaslConfig struct {
SaslMechanism string
SaslUsername string
SaslPassword string

SaslOAuth KafkaClientOAuthConfig
}

type KafkaClientSslConfig struct {
Expand All @@ -36,7 +42,63 @@ type KafkaClientSslConfig struct {
SslSkipVerify bool
}

func GetKafkaClientOptions(c KafkaClientConfig, l *zap.Logger) []kgo.Opt {
type KafkaClientOAuthConfig struct {
// static
Token string `json:"token"`

// dynamic
ClientID string `json:"client_id"`
ClientSecret string `json:"client_secret"`
TokenURL string `json:"token_url"`
Scopes []string `json:"scopes" slice:"true"`
AuthStyle string `json:"auth_style" default:"params" options:"params|header"`
}

func (c *KafkaClientOAuthConfig) isStatic() bool {
return c.Token != ""
}

func (c *KafkaClientOAuthConfig) isDynamic() bool {
return c.ClientID != "" && c.ClientSecret != "" && c.TokenURL != ""
}

func (c *KafkaClientOAuthConfig) isValid() bool {
return c.isStatic() || c.isDynamic()
}

func GetKafkaClientOAuthTokenSource(ctx context.Context, cfg KafkaClientConfig) (xoauth.TokenSource, error) {
saslCfg := cfg.GetSaslConfig()

if !cfg.IsSaslEnabled() || saslCfg.SaslMechanism != "OAUTHBEARER" {
return nil, nil
}

saslOAuth := saslCfg.SaslOAuth
if !saslOAuth.isValid() {
return nil, errors.New("invalid SASL OAUTHBEARER config")
}

if saslOAuth.isDynamic() {
authStyle := xoauth.AuthStyleInParams
if saslOAuth.AuthStyle == "header" {
authStyle = xoauth.AuthStyleInHeader
}

return xoauth.NewReuseTokenSource(ctx, &xoauth.Config{
ClientID: saslOAuth.ClientID,
ClientSecret: saslOAuth.ClientSecret,
TokenURL: saslOAuth.TokenURL,
Scopes: saslOAuth.Scopes,
AuthStyle: authStyle,
})
}

return xoauth.NewStaticTokenSource(&xoauth.Token{
AccessToken: saslOAuth.Token,
}), nil
}

func GetKafkaClientOptions(c KafkaClientConfig, l *zap.Logger, tokenSource xoauth.TokenSource) []kgo.Opt {
opts := []kgo.Opt{
kgo.SeedBrokers(c.GetBrokers()...),
kgo.ClientID(c.GetClientID()),
Expand Down Expand Up @@ -66,6 +128,18 @@ func GetKafkaClientOptions(c KafkaClientConfig, l *zap.Logger) []kgo.Opt {
AccessKey: saslConfig.SaslUsername,
SecretKey: saslConfig.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
case "OAUTHBEARER":
authFn := func(ctx context.Context) (oauth.Auth, error) {
if tokenSource == nil {
return oauth.Auth{}, errors.New("uninitialized token source")
}
t := tokenSource.Token(ctx)
if t == nil {
return oauth.Auth{}, errors.New("empty token from token source")
}
return oauth.Auth{Token: t.AccessToken}, nil
}
opts = append(opts, kgo.SASL(oauth.Oauth(authFn)))
}
}

Expand Down
7 changes: 4 additions & 3 deletions e2e/kafka_auth/kafka_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_out.NewClient(config,
kafka_out.NewClient(context.Background(), config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
nil,
)
},
func() {
Expand Down Expand Up @@ -154,9 +155,9 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_in.NewClient(config,
kafka_in.NewClient(context.Background(), config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
Consumer{},
Consumer{}, nil,
)
},
}
Expand Down
3 changes: 2 additions & 1 deletion e2e/kafka_file/kafka_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ func (c *Config) Send(t *testing.T) {
BatchSize_: c.Count,
}

client := kafka_out.NewClient(config,
client := kafka_out.NewClient(context.Background(), config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
nil,
)
adminClient := kadm.NewClient(client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.Topics[0])
Expand Down
4 changes: 2 additions & 2 deletions e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
HeartbeatInterval_: 10 * time.Second,
}

c.client = kafka_in.NewClient(config,
c.client = kafka_in.NewClient(context.Background(), config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
Consumer{},
Consumer{}, nil,
)

adminClient := kadm.NewClient(c.client)
Expand Down
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ require (
github.com/twmb/franz-go/plugin/kzap v1.1.2
github.com/twmb/tlscfg v1.2.1
github.com/valyala/fasthttp v1.48.0
github.com/xdg-go/scram v1.1.2
go.uber.org/atomic v1.11.0
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.27.0
golang.org/x/net v0.47.0
golang.org/x/oauth2 v0.27.0
google.golang.org/protobuf v1.36.5
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -131,8 +131,6 @@ require (
github.com/twmb/franz-go/pkg/kmsg v1.12.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
Expand All @@ -143,7 +141,6 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
Expand Down
7 changes: 0 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,6 @@ github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79
github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down Expand Up @@ -499,7 +493,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
Expand Down
18 changes: 17 additions & 1 deletion plugin/input/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ If set, the plugin will use SASL authentications mechanism.

<br>

**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|AWS_MSK_IAM`*
**`sasl_mechanism`** *`string`* *`default=SCRAM-SHA-512`* *`options=PLAIN|SCRAM-SHA-256|SCRAM-SHA-512|AWS_MSK_IAM|OAUTHBEARER`*

SASL mechanism to use.

Expand All @@ -158,6 +158,22 @@ SASL password.

<br>

**`sasl_oauth`** *`cfg.KafkaClientOAuthConfig`*

SASL OAUTHBEARER config. It works only if `sasl_mechanism:"OAUTHBEARER"`.
> There are 2 options - a static token or a dynamically updated.

`OAuthConfig` params:
* **`token`** *`string`* - static token
---
* **`client_id`** *`string`* - client ID
* **`client_secret`** *`string`* - client secret
* **`token_url`** *`string`* - resource server's token endpoint URL
* **`scopes`** *`string`* - optional requested permissions
* **`auth_style`** *`string`* - specifies how the endpoint wants the client ID & client secret sent

<br>

**`is_ssl_enabled`** *`bool`* *`default=false`*

If set, the plugin will use SSL/TLS connections method.
Expand Down
9 changes: 5 additions & 4 deletions plugin/input/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/xoauth"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
)

func NewClient(c *Config, l *zap.Logger, s Consumer) *kgo.Client {
opts := cfg.GetKafkaClientOptions(c, l)
func NewClient(ctx context.Context, c *Config, l *zap.Logger, s Consumer, tokenSource xoauth.TokenSource) *kgo.Client {
opts := cfg.GetKafkaClientOptions(c, l, tokenSource)
opts = append(opts, []kgo.Opt{
kgo.ConsumerGroup(c.ConsumerGroup),
kgo.ConsumeTopics(c.Topics...),
Expand Down Expand Up @@ -53,10 +54,10 @@ func NewClient(c *Config, l *zap.Logger, s Consumer) *kgo.Client {
l.Fatal("can't create kafka client", zap.Error(err))
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
pingCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

err = client.Ping(ctx)
err = client.Ping(pingCtx)
if err != nil {
l.Fatal("can't connect to kafka", zap.Error(err))
}
Expand Down
Loading