Skip to content

Commit c30af7d

Browse files
authored
Feat: Support client JWT auth method for kafka oidc (#4057)
Signed-off-by: Albert Callarisa <[email protected]>
1 parent 0ad2ae7 commit c30af7d

File tree

15 files changed

+792
-72
lines changed

15 files changed

+792
-72
lines changed

bindings/kafka/metadata.yaml

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ builtinAuthenticationProfiles:
3333
type: string
3434
required: false
3535
description: |
36-
This maintains backwards compatibility with existing fields.
36+
This maintains backwards compatibility with existing fields.
3737
It will be deprecated as of Dapr 1.17. Use 'accessKey' instead.
3838
If both fields are set, then 'accessKey' value will be used.
3939
AWS access key associated with an IAM account.
@@ -43,7 +43,7 @@ builtinAuthenticationProfiles:
4343
required: false
4444
sensitive: true
4545
description: |
46-
This maintains backwards compatibility with existing fields.
46+
This maintains backwards compatibility with existing fields.
4747
It will be deprecated as of Dapr 1.17. Use 'secretKey' instead.
4848
If both fields are set, then 'secretKey' value will be used.
4949
The secret key associated with the access key.
@@ -52,7 +52,7 @@ builtinAuthenticationProfiles:
5252
type: string
5353
sensitive: true
5454
description: |
55-
This maintains backwards compatibility with existing fields.
55+
This maintains backwards compatibility with existing fields.
5656
It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead.
5757
If both fields are set, then 'sessionToken' value will be used.
5858
AWS session token to use. A session token is only required if you are using temporary security credentials.
@@ -61,15 +61,15 @@ builtinAuthenticationProfiles:
6161
type: string
6262
required: false
6363
description: |
64-
This maintains backwards compatibility with existing fields.
64+
This maintains backwards compatibility with existing fields.
6565
It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead.
6666
If both fields are set, then 'assumeRoleArn' value will be used.
6767
IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
6868
example: '"arn:aws:iam::123456789:role/mskRole"'
6969
- name: awsStsSessionName
7070
type: string
7171
description: |
72-
This maintains backwards compatibility with existing fields.
72+
This maintains backwards compatibility with existing fields.
7373
It will be deprecated as of Dapr 1.17. Use 'sessionName' instead.
7474
If both fields are set, then 'sessionName' value will be used.
7575
Represents the session name for assuming a role.
@@ -78,7 +78,7 @@ builtinAuthenticationProfiles:
7878
authenticationProfiles:
7979
- title: "OIDC Authentication"
8080
description: |
81-
Authenticate using OpenID Connect.
81+
Authenticate using OpenID Connect providing a client secret.
8282
metadata:
8383
- name: authType
8484
type: string
@@ -121,6 +121,72 @@ authenticationProfiles:
121121
example: |
122122
{"cluster":"kafka","poolid":"kafkapool"}
123123
type: string
124+
- title: "OIDC Private Key JWT Authentication"
125+
description: |
126+
Authenticate using OpenID Connect providing a client certificate and private key.
127+
metadata:
128+
- name: authType
129+
type: string
130+
required: true
131+
description: |
132+
Authentication type.
133+
This must be set to "oidc_private_key_jwt" for this authentication profile.
134+
example: '"oidc_private_key_jwt"'
135+
allowedValues:
136+
- "oidc_private_key_jwt"
137+
- name: oidcTokenEndpoint
138+
type: string
139+
required: true
140+
description: |
141+
URL of the OAuth2 identity provider access token endpoint.
142+
example: '"https://identity.example.com/v1/token"'
143+
- name: oidcClientID
144+
description: |
145+
The OAuth2 client ID that has been provisioned in the identity provider.
146+
example: '"my-client-id"'
147+
type: string
148+
required: true
149+
- name: oidcClientAssertionCert
150+
type: string
151+
required: true
152+
description: |
153+
PEM-encoded X.509 certificate used to advertise the client certificate in the x5c header.
154+
example: |
155+
-----BEGIN CERTIFICATE-----\n...
156+
- name: oidcClientAssertionKey
157+
type: string
158+
required: true
159+
sensitive: true
160+
description: |
161+
PEM-encoded private key used to sign the client certificate.
162+
example: |
163+
-----BEGIN PRIVATE KEY-----\n...
164+
- name: oidcResource
165+
type: string
166+
required: false
167+
description: |
168+
Optional OAuth2 resource (audience) parameter to include in the token request when required by the identity provider.
169+
example: '"api://kafka"'
170+
- name: oidcAudience
171+
type: string
172+
required: false
173+
description: |
174+
Overrides the JWT client assertion audience (aud). If not set, the component uses the
175+
issuer derived from the token endpoint URL when available; otherwise, it falls back to the token URL.
176+
example: '"http://<idp-host>/realms/local"'
177+
- name: oidcScopes
178+
type: string
179+
description: |
180+
Comma-delimited list of OAuth2/OIDC scopes to request with the access token.
181+
Although not required, this field is recommended.
182+
example: '"openid,kafka-prod"'
183+
default: '"openid"'
184+
- name: oidcExtensions
185+
description: |
186+
String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.
187+
example: |
188+
{"cluster":"kafka","poolid":"kafkapool"}
189+
type: string
124190
- title: "SASL Authentication"
125191
description: |
126192
Authenticate using SASL.
@@ -348,7 +414,7 @@ metadata:
348414
type: bool
349415
required: false
350416
description: |
351-
Enables URL escaping of the message header values.
417+
Enables URL escaping of the message header values.
352418
It allows sending headers with special characters that are usually not allowed in HTTP headers.
353419
example: "true"
354420
default: "false"

common/component/kafka/auth.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,20 @@ func updateOidcAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error {
8888

8989
return nil
9090
}
91+
92+
func updateOidcPrivateKeyJWTAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error {
93+
tokenProvider := metadata.getOAuthTokenSourcePrivateKeyJWT()
94+
95+
if metadata.TLSCaCert != "" {
96+
err := tokenProvider.addCa(metadata.TLSCaCert)
97+
if err != nil {
98+
return fmt.Errorf("kafka: error setting oauth client trusted CA: %w", err)
99+
}
100+
}
101+
102+
config.Net.SASL.Enable = true
103+
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
104+
config.Net.SASL.TokenProvider = tokenProvider
105+
106+
return nil
107+
}

common/component/kafka/kafka.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
181181
if err != nil {
182182
return err
183183
}
184+
case oidcPrivateKeyJWTAuthType:
185+
k.logger.Info("Configuring SASL OAuth2/OIDC authentication with private key JWT")
186+
err = updateOidcPrivateKeyJWTAuthInfo(config, meta)
187+
if err != nil {
188+
return err
189+
}
184190
case passwordAuthType:
185191
k.logger.Info("Configuring SASL Password authentication")
186192
k.saslUsername = meta.SaslUsername

common/component/kafka/metadata.go

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
authType = "authType"
4343
passwordAuthType = "password"
4444
oidcAuthType = "oidc"
45+
oidcPrivateKeyJWTAuthType = "oidc_private_key_jwt"
4546
mtlsAuthType = "mtls"
4647
awsIAMAuthType = "awsiam"
4748
noAuthType = "none"
@@ -65,36 +66,40 @@ const (
6566
)
6667

6768
type KafkaMetadata struct {
68-
Brokers string `mapstructure:"brokers"`
69-
internalBrokers []string `mapstructure:"-"`
70-
ConsumerGroup string `mapstructure:"consumerGroup"`
71-
ClientID string `mapstructure:"clientId"`
72-
AuthType string `mapstructure:"authType"`
73-
SaslUsername string `mapstructure:"saslUsername"`
74-
SaslPassword string `mapstructure:"saslPassword"`
75-
SaslMechanism string `mapstructure:"saslMechanism"`
76-
InitialOffset string `mapstructure:"initialOffset"`
77-
internalInitialOffset int64 `mapstructure:"-"`
78-
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
79-
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
80-
OidcClientID string `mapstructure:"oidcClientID"`
81-
OidcClientSecret string `mapstructure:"oidcClientSecret"`
82-
OidcScopes string `mapstructure:"oidcScopes"`
83-
OidcExtensions string `mapstructure:"oidcExtensions"`
84-
internalOidcScopes []string `mapstructure:"-"`
85-
TLSDisable bool `mapstructure:"disableTls"`
86-
TLSSkipVerify bool `mapstructure:"skipVerify"`
87-
TLSCaCert string `mapstructure:"caCert"`
88-
TLSClientCert string `mapstructure:"clientCert"`
89-
TLSClientKey string `mapstructure:"clientKey"`
90-
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
91-
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
92-
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
93-
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
94-
Version string `mapstructure:"version"`
95-
EscapeHeaders bool `mapstructure:"escapeHeaders"`
96-
internalVersion sarama.KafkaVersion `mapstructure:"-"`
97-
internalOidcExtensions map[string]string `mapstructure:"-"`
69+
Brokers string `mapstructure:"brokers"`
70+
internalBrokers []string `mapstructure:"-"`
71+
ConsumerGroup string `mapstructure:"consumerGroup"`
72+
ClientID string `mapstructure:"clientId"`
73+
AuthType string `mapstructure:"authType"`
74+
SaslUsername string `mapstructure:"saslUsername"`
75+
SaslPassword string `mapstructure:"saslPassword"`
76+
SaslMechanism string `mapstructure:"saslMechanism"`
77+
InitialOffset string `mapstructure:"initialOffset"`
78+
internalInitialOffset int64 `mapstructure:"-"`
79+
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
80+
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
81+
OidcClientID string `mapstructure:"oidcClientID"`
82+
OidcClientSecret string `mapstructure:"oidcClientSecret"`
83+
OidcScopes string `mapstructure:"oidcScopes"`
84+
OidcExtensions string `mapstructure:"oidcExtensions"`
85+
OidcClientAssertionCert string `mapstructure:"oidcClientAssertionCert"`
86+
OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"`
87+
OidcResource string `mapstructure:"oidcResource"`
88+
OidcAudience string `mapstructure:"oidcAudience"`
89+
internalOidcScopes []string `mapstructure:"-"`
90+
TLSDisable bool `mapstructure:"disableTls"`
91+
TLSSkipVerify bool `mapstructure:"skipVerify"`
92+
TLSCaCert string `mapstructure:"caCert"`
93+
TLSClientCert string `mapstructure:"clientCert"`
94+
TLSClientKey string `mapstructure:"clientKey"`
95+
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
96+
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
97+
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
98+
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
99+
Version string `mapstructure:"version"`
100+
EscapeHeaders bool `mapstructure:"escapeHeaders"`
101+
internalVersion sarama.KafkaVersion `mapstructure:"-"`
102+
internalOidcExtensions map[string]string `mapstructure:"-"`
98103

99104
// configs for kafka client
100105
ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"`
@@ -251,6 +256,32 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
251256
}
252257
}
253258
k.logger.Debug("Configuring SASL token authentication via OIDC.")
259+
case oidcPrivateKeyJWTAuthType:
260+
if m.OidcTokenEndpoint == "" {
261+
return nil, errors.New("kafka error: missing OIDC Token Endpoint for authType 'oidc_private_key_jwt'")
262+
}
263+
if m.OidcClientID == "" {
264+
return nil, errors.New("kafka error: missing OIDC Client ID for authType 'oidc_private_key_jwt'")
265+
}
266+
if m.OidcClientAssertionCert == "" {
267+
return nil, errors.New("kafka error: missing OIDC Client Assertion Cert for authType 'oidc_private_key_jwt'")
268+
}
269+
if m.OidcClientAssertionKey == "" {
270+
return nil, errors.New("kafka error: missing OIDC Client Assertion Key for authType 'oidc_private_key_jwt'")
271+
}
272+
if m.OidcScopes != "" {
273+
m.internalOidcScopes = strings.Split(m.OidcScopes, ",")
274+
} else {
275+
k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This is a security risk for token reuse.")
276+
m.internalOidcScopes = []string{"openid"}
277+
}
278+
if m.OidcExtensions != "" {
279+
err = json.Unmarshal([]byte(m.OidcExtensions), &m.internalOidcExtensions)
280+
if err != nil || len(m.internalOidcExtensions) < 1 {
281+
return nil, errors.New("kafka error: improper OIDC Extensions format for authType 'oidc_private_key_jwt'")
282+
}
283+
}
284+
k.logger.Debug("Configuring SASL token authentication via OIDC with private_key_jwt.")
254285
case mtlsAuthType:
255286
if m.TLSClientCert != "" {
256287
if !isValidPEM(m.TLSClientCert) {

common/component/kafka/metadata_test.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func TestMissingSaslValuesOnUpgrade(t *testing.T) {
216216
require.Equal(t, fmt.Sprintf("kafka error: missing SASL Username for authType '%s'", passwordAuthType), err.Error())
217217
}
218218

219-
func TestMissingOidcValues(t *testing.T) {
219+
func TestMissingOidcClientSecretValues(t *testing.T) {
220220
k := getKafka()
221221
m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcAuthType}
222222
meta, err := k.getKafkaMetadata(m)
@@ -243,6 +243,38 @@ func TestMissingOidcValues(t *testing.T) {
243243
require.Contains(t, meta.internalOidcScopes, "openid")
244244
}
245245

246+
func TestMissingOidcPrivateKeyJwtValues(t *testing.T) {
247+
k := getKafka()
248+
m := map[string]string{"brokers": "akfak.com:9092", "authType": oidcPrivateKeyJWTAuthType}
249+
meta, err := k.getKafkaMetadata(m)
250+
require.Error(t, err)
251+
require.Nil(t, meta)
252+
require.Equal(t, "kafka error: missing OIDC Token Endpoint for authType 'oidc_private_key_jwt'", err.Error())
253+
254+
m["oidcTokenEndpoint"] = "https://sassa.fra/"
255+
meta, err = k.getKafkaMetadata(m)
256+
require.Error(t, err)
257+
require.Nil(t, meta)
258+
require.Equal(t, "kafka error: missing OIDC Client ID for authType 'oidc_private_key_jwt'", err.Error())
259+
260+
m["oidcClientID"] = "sassafras"
261+
meta, err = k.getKafkaMetadata(m)
262+
require.Error(t, err)
263+
require.Nil(t, meta)
264+
require.Equal(t, "kafka error: missing OIDC Client Assertion Cert for authType 'oidc_private_key_jwt'", err.Error())
265+
266+
m["oidcClientAssertionCert"] = "sassapass"
267+
meta, err = k.getKafkaMetadata(m)
268+
require.Error(t, err)
269+
require.Nil(t, meta)
270+
require.Equal(t, "kafka error: missing OIDC Client Assertion Key for authType 'oidc_private_key_jwt'", err.Error())
271+
272+
m["oidcClientAssertionKey"] = "sassapass"
273+
meta, err = k.getKafkaMetadata(m)
274+
require.NoError(t, err)
275+
require.Contains(t, meta.internalOidcScopes, "openid")
276+
}
277+
246278
func TestPresentSaslValues(t *testing.T) {
247279
k := getKafka()
248280
m := map[string]string{

common/component/kafka/sasl_oauthbearer.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ import (
1717
ctx "context"
1818
"crypto/tls"
1919
"crypto/x509"
20-
"encoding/pem"
2120
"errors"
2221
"fmt"
2322
"net/http"
2423
"time"
2524

25+
"github.com/dapr/kit/crypto/pem"
26+
2627
"github.com/IBM/sarama"
2728
"golang.org/x/oauth2"
2829
ccred "golang.org/x/oauth2/clientcredentials"
@@ -51,26 +52,21 @@ func (m KafkaMetadata) getOAuthTokenSource() *OAuthTokenSource {
5152
}
5253
}
5354

54-
var tokenRequestTimeout, _ = time.ParseDuration("30s")
55-
5655
func (ts *OAuthTokenSource) addCa(caPem string) error {
5756
pemBytes := []byte(caPem)
5857

59-
block, _ := pem.Decode(pemBytes)
60-
61-
if block == nil || block.Type != "CERTIFICATE" {
62-
return errors.New("PEM data not valid or not of a valid type (CERTIFICATE)")
63-
}
64-
65-
caCert, err := x509.ParseCertificate(block.Bytes)
58+
caCerts, err := pem.DecodePEMCertificates(pemBytes)
6659
if err != nil {
6760
return fmt.Errorf("error parsing PEM certificate: %w", err)
6861
}
62+
if len(caCerts) > 1 {
63+
return fmt.Errorf("expected 1 certificate, got %d", len(caCerts))
64+
}
6965

7066
if ts.trustedCas == nil {
7167
ts.trustedCas = make([]*x509.Certificate, 0)
7268
}
73-
ts.trustedCas = append(ts.trustedCas, caCert)
69+
ts.trustedCas = append(ts.trustedCas, caCerts[0])
7470

7571
return nil
7672
}
@@ -113,9 +109,15 @@ func (ts *OAuthTokenSource) Token() (*sarama.AccessToken, error) {
113109
return nil, errors.New("cannot generate token, OAuthTokenSource not fully configured")
114110
}
115111

116-
oidcCfg := ccred.Config{ClientID: ts.ClientID, ClientSecret: ts.ClientSecret, Scopes: ts.Scopes, TokenURL: ts.TokenEndpoint.TokenURL, AuthStyle: ts.TokenEndpoint.AuthStyle}
112+
oidcCfg := ccred.Config{
113+
ClientID: ts.ClientID,
114+
ClientSecret: ts.ClientSecret,
115+
Scopes: ts.Scopes,
116+
TokenURL: ts.TokenEndpoint.TokenURL,
117+
AuthStyle: ts.TokenEndpoint.AuthStyle,
118+
}
117119

118-
timeoutCtx, cancel := ctx.WithTimeout(ctx.TODO(), tokenRequestTimeout)
120+
timeoutCtx, cancel := ctx.WithTimeout(ctx.TODO(), 30*time.Second)
119121
defer cancel()
120122

121123
ts.configureClient()

0 commit comments

Comments
 (0)