Skip to content
11 changes: 11 additions & 0 deletions .changelog/3742.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:enhancement
resource/mongodbatlas_stream_connection: Adds new authentication mechanism(OIDC) to the Kafka connection
```

```release-note:enhancement
data-source/mongodbatlas_stream_connection: Adds new authentication mechanism(OIDC) to the Kafka connection
```

```release-note:enhancement
data-source/mongodbatlas_stream_connections: Adds new authentication mechanism(OIDC) to the Kafka connection
```
8 changes: 7 additions & 1 deletion docs/data-sources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ If `type` is of value `Https` the following additional attributes are defined:

### Authentication

* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, `SCRAM-512`, or `OAUTHBEARER`.
* `method` - SASL OAUTHBEARER authentication method. Can only be `OIDC` currently.
* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `token_endpoint_url` - OAUTH issuer(IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
* `client_id` - Public identifier for the Kafka client.
* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Kafka clients use this to specify the scope of the access request to the broker.
* `sasl_oauthbearer_extensions` - Additional information to be provided to the Kafka broker.

### Security

Expand Down
8 changes: 7 additions & 1 deletion docs/data-sources/stream_connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ If `type` is of value `Https` the following additional attributes are defined:

### Authentication

* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, `SCRAM-512`, or `OAUTHBEARER`.
* `method` - SASL OAUTHBEARER authentication method. Can only be `OIDC` currently.
* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `token_endpoint_url` - OAUTH issuer(IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
* `client_id` - Public identifier for the Kafka client. It must be unique across all clients that the authorization server handles.
* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Kafka clients use this to specify the scope of the access request to the broker.
* `sasl_oauthbearer_extensions` - Additional information to be provided to the Kafka broker.

### Security

Expand Down
38 changes: 38 additions & 0 deletions docs/resources/stream_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,38 @@ resource "mongodbatlas_stream_connection" "test" {
}
```

### Example Kafka SASL OAuthbearer Connection

```terraform
resource "mongodbatlas_stream_connection" "example-kafka-oauthbearer" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
connection_name = "KafkaOAuthbearerConnection"
type = "Kafka"
authentication = {
mechanism = "OAUTHBEARER"
method = "OIDC"
token_endpoint_url = "https://example.com/oauth/token"
client_id = "auth0Client"
client_secret = var.kafka_client_secret
scope = "read:messages write:messages"
sasl_oauthbearer_extensions = "logicalCluster=lkc-kmom,identityPoolId=pool-lAr"
}
bootstrap_servers = "localhost:9092,localhost:9092"
config = {
"auto.offset.reset" : "earliest"
}
security = {
protocol = "SASL_PLAINTEXT"
}
networking = {
access = {
type = "PUBLIC"
}
}
}
```

### Example Kafka SASL SSL Connection

```terraform
Expand Down Expand Up @@ -148,6 +180,12 @@ If `type` is of value `Https` the following additional attributes are defined:
* `mechanism` - Style of authentication. Can be one of `PLAIN`, `SCRAM-256`, or `SCRAM-512`.
* `username` - Username of the account to connect to the Kafka cluster.
* `password` - Password of the account to connect to the Kafka cluster.
* `method` - SASL OAUTHBEARER authentication method. Can only be `OIDC` currently.
* `token_endpoint_url` - OAUTH issuer(IdP provider) token endpoint HTTP(S) URI used to retrieve the token.
* `client_id` - Public identifier for the Kafka client.
* `client_secret` - Secret known only to the Kafka client and the authorization server.
* `scope` - Kafka clients use this to specify the scope of the access request to the broker.
* `sasl_oauthbearer_extensions` - Additional information to be provided to the Kafka broker.

### Security

Expand Down
28 changes: 28 additions & 0 deletions examples/mongodbatlas_stream_connection/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ resource "mongodbatlas_stream_connection" "example-kafka-plaintext" {
}
}

resource "mongodbatlas_stream_connection" "example-kafka-oauthbearer" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
connection_name = "KafkaOAuthbearerConnection"
type = "Kafka"
authentication = {
mechanism = "OAUTHBEARER"
method = "OIDC"
token_endpoint_url = "https://example.com/oauth/token"
client_id = "auth0Client"
client_secret = var.kafka_client_secret
scope = "read:messages write:messages"
sasl_oauthbearer_extensions = "logicalCluster=lkc-kmom,identityPoolId=pool-lAr"
}
bootstrap_servers = "localhost:9092,localhost:9092"
config = {
"auto.offset.reset" : "earliest"
}
security = {
protocol = "SASL_PLAINTEXT"
}
networking = {
access = {
type = "PUBLIC"
}
}
}

resource "mongodbatlas_stream_connection" "example-kafka-ssl" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
Expand Down
5 changes: 5 additions & 0 deletions examples/mongodbatlas_stream_connection/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ variable "kafka_password" {
type = string
}

variable "kafka_client_secret" {
description = "Secret known only to the Kafka client and the authorization server"
type = string
}

variable "kafka_ssl_cert" {
description = "Public certificate used for SASL_SSL configuration to connect to your Kafka cluster"
type = string
Expand Down
22 changes: 17 additions & 5 deletions internal/service/streamconnection/model_stream_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
return nil, diags
}
streamConnection.Authentication = &admin.StreamsKafkaAuthentication{
Mechanism: authenticationModel.Mechanism.ValueStringPointer(),
Password: authenticationModel.Password.ValueStringPointer(),
Username: authenticationModel.Username.ValueStringPointer(),
Mechanism: authenticationModel.Mechanism.ValueStringPointer(),
Method: authenticationModel.Method.ValueStringPointer(),

Check failure on line 31 in internal/service/streamconnection/model_stream_connection.go

View workflow job for this annotation

GitHub Actions / tf-validate

unknown field Method in struct literal of type "go.mongodb.org/atlas-sdk/v20250312007/admin".StreamsKafkaAuthentication

Check failure on line 31 in internal/service/streamconnection/model_stream_connection.go

View workflow job for this annotation

GitHub Actions / build

unknown field Method in struct literal of type "go.mongodb.org/atlas-sdk/v20250312007/admin".StreamsKafkaAuthentication

Check failure on line 31 in internal/service/streamconnection/model_stream_connection.go

View workflow job for this annotation

GitHub Actions / lint

unknown field Method in struct literal of type "go.mongodb.org/atlas-sdk/v20250312007/admin".StreamsKafkaAuthentication
Password: authenticationModel.Password.ValueStringPointer(),
Username: authenticationModel.Username.ValueStringPointer(),
TokenEndpointUrl: authenticationModel.TokenEndpointURL.ValueStringPointer(),
ClientId: authenticationModel.ClientID.ValueStringPointer(),
ClientSecret: authenticationModel.ClientSecret.ValueStringPointer(),
Scope: authenticationModel.Scope.ValueStringPointer(),
SaslOauthbearerExtensions: authenticationModel.SaslOauthbearerExtensions.ValueStringPointer(),
}
}
if !plan.Security.IsNull() {
Expand Down Expand Up @@ -215,8 +221,13 @@
func newTFConnectionAuthenticationModel(ctx context.Context, currAuthConfig *types.Object, authResp *admin.StreamsKafkaAuthentication) (*types.Object, diag.Diagnostics) {
if authResp != nil {
resultAuthModel := TFConnectionAuthenticationModel{
Mechanism: types.StringPointerValue(authResp.Mechanism),
Username: types.StringPointerValue(authResp.Username),
Mechanism: types.StringPointerValue(authResp.Mechanism),
Method: types.StringPointerValue(authResp.Method),

Check failure on line 225 in internal/service/streamconnection/model_stream_connection.go

View workflow job for this annotation

GitHub Actions / tf-validate

authResp.Method undefined (type *"go.mongodb.org/atlas-sdk/v20250312007/admin".StreamsKafkaAuthentication has no field or method Method)

Check failure on line 225 in internal/service/streamconnection/model_stream_connection.go

View workflow job for this annotation

GitHub Actions / build

authResp.Method undefined (type *"go.mongodb.org/atlas-sdk/v20250312007/admin".StreamsKafkaAuthentication has no field or method Method)

Check failure on line 225 in internal/service/streamconnection/model_stream_connection.go

View workflow job for this annotation

GitHub Actions / lint

authResp.Method undefined (type *"go.mongodb.org/atlas-sdk/v20250312007/admin".StreamsKafkaAuthentication has no field or method Method) (typecheck)
Username: types.StringPointerValue(authResp.Username),
TokenEndpointURL: types.StringPointerValue(authResp.TokenEndpointUrl),
ClientID: types.StringPointerValue(authResp.ClientId),
Scope: types.StringPointerValue(authResp.Scope),
SaslOauthbearerExtensions: types.StringPointerValue(authResp.SaslOauthbearerExtensions),
}

if currAuthConfig != nil && !currAuthConfig.IsNull() { // if config is available (create & update of resource) password value is set in new state
Expand All @@ -225,6 +236,7 @@
return nil, diags
}
resultAuthModel.Password = configAuthModel.Password
resultAuthModel.ClientSecret = configAuthModel.ClientSecret
}

resultObject, diags := types.ObjectValueFrom(ctx, ConnectionAuthenticationObjectType.AttrTypes, resultAuthModel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@ import (
)

const (
connectionName = "Connection"
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authUsername = "user1"
connectionName = "Connection"
typeValue = ""
clusterName = "Cluster0"
dummyProjectID = "111111111111111111111111"
instanceName = "InstanceName"
authMechanism = "PLAIN"
authMechanismOAuth = "OAUTHBEARER"
authUsername = "user1"
clientID = "auth0Client"
clientSecret = "secret"
// #nosec G101
tokenEndpointURL = "https://your-domain.com/oauth2/token"
scope = "read:messages write:messages"
saslOauthbearerExtentions = "logicalCluster=cluster-kmo17m,identityPoolId=pool-l7Arl"
method = "OIDC"
securityProtocol = "SASL_SSL"
bootstrapServers = "localhost:9092,another.host:9092"
dbRole = "customRole"
Expand Down Expand Up @@ -50,6 +58,7 @@ type sdkToTFModelTestCase struct {

func TestStreamConnectionSDKToTFModel(t *testing.T) {
var authConfigWithPasswordDefined = tfAuthenticationObject(t, authMechanism, authUsername, "raw password")
var authConfigWithOAuth = tfAuthenticationObjectForOAuth(t, authMechanismOAuth, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtentions, method)

testCases := []sdkToTFModelTestCase{
{
Expand Down Expand Up @@ -146,6 +155,44 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
Headers: types.MapNull(types.StringType),
},
},
{
name: "Kafka connection type SDK response for OAuthBearer authentication",
SDKResp: &admin.StreamsConnection{
Name: admin.PtrString(connectionName),
Type: admin.PtrString("Kafka"),
Authentication: &admin.StreamsKafkaAuthentication{
Mechanism: admin.PtrString(authMechanismOAuth),
Method: admin.PtrString(method),
ClientId: admin.PtrString(clientID),
TokenEndpointUrl: admin.PtrString(tokenEndpointURL),
Scope: admin.PtrString(scope),
SaslOauthbearerExtensions: admin.PtrString(saslOauthbearerExtentions),
},
BootstrapServers: admin.PtrString(bootstrapServers),
Config: &configMap,
Security: &admin.StreamsKafkaSecurity{
Protocol: admin.PtrString(securityProtocol),
BrokerPublicCertificate: admin.PtrString(DummyCACert),
},
},
providedProjID: dummyProjectID,
providedInstanceName: instanceName,
providedAuthConfig: &authConfigWithOAuth,
expectedTFModel: &streamconnection.TFStreamConnectionModel{
ProjectID: types.StringValue(dummyProjectID),
InstanceName: types.StringValue(instanceName),
ConnectionName: types.StringValue(connectionName),
Type: types.StringValue("Kafka"),
Authentication: tfAuthenticationObjectForOAuth(t, authMechanismOAuth, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtentions, method), // password value is obtained from config, not api resp.
BootstrapServers: types.StringValue(bootstrapServers),
Config: tfConfigMap(t, configMap),
Security: tfSecurityObject(t, DummyCACert, securityProtocol),
DBRoleToExecute: types.ObjectNull(streamconnection.DBRoleToExecuteObjectType.AttrTypes),
Networking: types.ObjectNull(streamconnection.NetworkingObjectType.AttrTypes),
AWS: types.ObjectNull(streamconnection.AWSObjectType.AttrTypes),
Headers: types.MapNull(types.StringType),
},
},
{
name: "Kafka connection type SDK response with no optional values provided",
SDKResp: &admin.StreamsConnection{
Expand Down Expand Up @@ -596,6 +643,23 @@ func tfAuthenticationObject(t *testing.T, mechanism, username, password string)
return auth
}

func tfAuthenticationObjectForOAuth(t *testing.T, mechanism, clientID, clientSecret, tokenEndpointURL, scope, saslOauthbearerExtensions, method string) types.Object {
t.Helper()
auth, diags := types.ObjectValueFrom(t.Context(), streamconnection.ConnectionAuthenticationObjectType.AttrTypes, streamconnection.TFConnectionAuthenticationModel{
Mechanism: types.StringValue(mechanism),
Method: types.StringValue(method),
ClientID: types.StringValue(clientID),
ClientSecret: types.StringValue(clientSecret),
TokenEndpointURL: types.StringValue(tokenEndpointURL),
Scope: types.StringValue(scope),
SaslOauthbearerExtensions: types.StringValue(saslOauthbearerExtensions),
})
if diags.HasError() {
t.Errorf("failed to create terraform data model: %s", diags.Errors()[0].Summary())
}
return auth
}

func tfAuthenticationObjectWithNoPassword(t *testing.T, mechanism, username string) types.Object {
t.Helper()
auth, diags := types.ObjectValueFrom(t.Context(), streamconnection.ConnectionAuthenticationObjectType.AttrTypes, streamconnection.TFConnectionAuthenticationModel{
Expand Down
19 changes: 19 additions & 0 deletions internal/service/streamconnection/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,32 @@ func ResourceSchema(ctx context.Context) schema.Schema {
"mechanism": schema.StringAttribute{
Optional: true,
},
"method": schema.StringAttribute{
Optional: true,
},
"password": schema.StringAttribute{
Optional: true,
Sensitive: true,
},
"username": schema.StringAttribute{
Optional: true,
},
"token_endpoint_url": schema.StringAttribute{
Optional: true,
},
"client_id": schema.StringAttribute{
Optional: true,
},
"client_secret": schema.StringAttribute{
Optional: true,
Sensitive: true,
},
"scope": schema.StringAttribute{
Optional: true,
},
"sasl_oauthbearer_extensions": schema.StringAttribute{
Optional: true,
},
},
},
"bootstrap_servers": schema.StringAttribute{
Expand Down
24 changes: 18 additions & 6 deletions internal/service/streamconnection/resource_stream_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,27 @@ type TFStreamConnectionModel struct {
}

type TFConnectionAuthenticationModel struct {
Mechanism types.String `tfsdk:"mechanism"`
Password types.String `tfsdk:"password"`
Username types.String `tfsdk:"username"`
Mechanism types.String `tfsdk:"mechanism"`
Password types.String `tfsdk:"password"`
Username types.String `tfsdk:"username"`
Method types.String `tfsdk:"method"`
TokenEndpointURL types.String `tfsdk:"token_endpoint_url"`
ClientID types.String `tfsdk:"client_id"`
ClientSecret types.String `tfsdk:"client_secret"`
Scope types.String `tfsdk:"scope"`
SaslOauthbearerExtensions types.String `tfsdk:"sasl_oauthbearer_extensions"`
}

var ConnectionAuthenticationObjectType = types.ObjectType{AttrTypes: map[string]attr.Type{
"mechanism": types.StringType,
"password": types.StringType,
"username": types.StringType,
"mechanism": types.StringType,
"method": types.StringType,
"password": types.StringType,
"username": types.StringType,
"token_endpoint_url": types.StringType,
"client_id": types.StringType,
"client_secret": types.StringType,
"scope": types.StringType,
"sasl_oauthbearer_extensions": types.StringType,
}}

type TFConnectionSecurityModel struct {
Expand Down
Loading
Loading