@@ -42,6 +42,7 @@ const (
4242 authType = "authType"
4343 passwordAuthType = "password"
4444 oidcAuthType = "oidc"
45+ oidcPrivateKeyJWTAuthType = "oidc_private_key_jwt"
4546 mtlsAuthType = "mtls"
4647 awsIAMAuthType = "awsiam"
4748 noAuthType = "none"
@@ -65,36 +66,40 @@ const (
6566)
6667
6768type KafkaMetadata struct {
68- Brokers string `mapstructure:"brokers"`
69- internalBrokers []string `mapstructure:"-"`
70- ConsumerGroup string `mapstructure:"consumerGroup"`
71- ClientID string `mapstructure:"clientId"`
72- AuthType string `mapstructure:"authType"`
73- SaslUsername string `mapstructure:"saslUsername"`
74- SaslPassword string `mapstructure:"saslPassword"`
75- SaslMechanism string `mapstructure:"saslMechanism"`
76- InitialOffset string `mapstructure:"initialOffset"`
77- internalInitialOffset int64 `mapstructure:"-"`
78- MaxMessageBytes int `mapstructure:"maxMessageBytes"`
79- OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
80- OidcClientID string `mapstructure:"oidcClientID"`
81- OidcClientSecret string `mapstructure:"oidcClientSecret"`
82- OidcScopes string `mapstructure:"oidcScopes"`
83- OidcExtensions string `mapstructure:"oidcExtensions"`
84- internalOidcScopes []string `mapstructure:"-"`
85- TLSDisable bool `mapstructure:"disableTls"`
86- TLSSkipVerify bool `mapstructure:"skipVerify"`
87- TLSCaCert string `mapstructure:"caCert"`
88- TLSClientCert string `mapstructure:"clientCert"`
89- TLSClientKey string `mapstructure:"clientKey"`
90- ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
91- ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
92- HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
93- SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
94- Version string `mapstructure:"version"`
95- EscapeHeaders bool `mapstructure:"escapeHeaders"`
96- internalVersion sarama.KafkaVersion `mapstructure:"-"`
97- internalOidcExtensions map [string ]string `mapstructure:"-"`
69+ Brokers string `mapstructure:"brokers"`
70+ internalBrokers []string `mapstructure:"-"`
71+ ConsumerGroup string `mapstructure:"consumerGroup"`
72+ ClientID string `mapstructure:"clientId"`
73+ AuthType string `mapstructure:"authType"`
74+ SaslUsername string `mapstructure:"saslUsername"`
75+ SaslPassword string `mapstructure:"saslPassword"`
76+ SaslMechanism string `mapstructure:"saslMechanism"`
77+ InitialOffset string `mapstructure:"initialOffset"`
78+ internalInitialOffset int64 `mapstructure:"-"`
79+ MaxMessageBytes int `mapstructure:"maxMessageBytes"`
80+ OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
81+ OidcClientID string `mapstructure:"oidcClientID"`
82+ OidcClientSecret string `mapstructure:"oidcClientSecret"`
83+ OidcScopes string `mapstructure:"oidcScopes"`
84+ OidcExtensions string `mapstructure:"oidcExtensions"`
85+ OidcClientAssertionCert string `mapstructure:"oidcClientAssertionCert"`
86+ OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"`
87+ OidcResource string `mapstructure:"oidcResource"`
88+ OidcAudience string `mapstructure:"oidcAudience"`
89+ internalOidcScopes []string `mapstructure:"-"`
90+ TLSDisable bool `mapstructure:"disableTls"`
91+ TLSSkipVerify bool `mapstructure:"skipVerify"`
92+ TLSCaCert string `mapstructure:"caCert"`
93+ TLSClientCert string `mapstructure:"clientCert"`
94+ TLSClientKey string `mapstructure:"clientKey"`
95+ ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
96+ ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
97+ HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
98+ SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
99+ Version string `mapstructure:"version"`
100+ EscapeHeaders bool `mapstructure:"escapeHeaders"`
101+ internalVersion sarama.KafkaVersion `mapstructure:"-"`
102+ internalOidcExtensions map [string ]string `mapstructure:"-"`
98103
99104 // configs for kafka client
100105 ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"`
@@ -251,6 +256,32 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
251256 }
252257 }
253258 k .logger .Debug ("Configuring SASL token authentication via OIDC." )
259+ case oidcPrivateKeyJWTAuthType :
260+ if m .OidcTokenEndpoint == "" {
261+ return nil , errors .New ("kafka error: missing OIDC Token Endpoint for authType 'oidc_private_key_jwt'" )
262+ }
263+ if m .OidcClientID == "" {
264+ return nil , errors .New ("kafka error: missing OIDC Client ID for authType 'oidc_private_key_jwt'" )
265+ }
266+ if m .OidcClientAssertionCert == "" {
267+ return nil , errors .New ("kafka error: missing OIDC Client Assertion Cert for authType 'oidc_private_key_jwt'" )
268+ }
269+ if m .OidcClientAssertionKey == "" {
270+ return nil , errors .New ("kafka error: missing OIDC Client Assertion Key for authType 'oidc_private_key_jwt'" )
271+ }
272+ if m .OidcScopes != "" {
273+ m .internalOidcScopes = strings .Split (m .OidcScopes , "," )
274+ } else {
275+ k .logger .Warn ("Warning: no OIDC scopes specified, using default 'openid' scope only. This is a security risk for token reuse." )
276+ m .internalOidcScopes = []string {"openid" }
277+ }
278+ if m .OidcExtensions != "" {
279+ err = json .Unmarshal ([]byte (m .OidcExtensions ), & m .internalOidcExtensions )
280+ if err != nil || len (m .internalOidcExtensions ) < 1 {
281+ return nil , errors .New ("kafka error: improper OIDC Extensions format for authType 'oidc_private_key_jwt'" )
282+ }
283+ }
284+ k .logger .Debug ("Configuring SASL token authentication via OIDC with private_key_jwt." )
254285 case mtlsAuthType :
255286 if m .TLSClientCert != "" {
256287 if ! isValidPEM (m .TLSClientCert ) {
0 commit comments