@@ -11,13 +11,21 @@ import (
11
11
)
12
12
13
13
const (
14
- connectionName = "Connection"
15
- typeValue = ""
16
- clusterName = "Cluster0"
17
- dummyProjectID = "111111111111111111111111"
18
- instanceName = "InstanceName"
19
- authMechanism = "PLAIN"
20
- authUsername = "user1"
14
+ connectionName = "Connection"
15
+ typeValue = ""
16
+ clusterName = "Cluster0"
17
+ dummyProjectID = "111111111111111111111111"
18
+ instanceName = "InstanceName"
19
+ authMechanism = "PLAIN"
20
+ authMechanismOAuth = "OAUTHBEARER"
21
+ authUsername = "user1"
22
+ clientID = "auth0Client"
23
+ clientSecret = "secret"
24
+ // #nosec G101
25
+ tokenEndpointURL = "https://your-domain.com/oauth2/token"
26
+ scope = "read:messages write:messages"
27
+ saslOauthbearerExtentions = "logicalCluster=cluster-kmo17m,identityPoolId=pool-l7Arl"
28
+ httpsCaPem = "MHWER3343"
21
29
securityProtocol = "SASL_SSL"
22
30
bootstrapServers = "localhost:9092,another.host:9092"
23
31
dbRole = "customRole"
@@ -50,6 +58,7 @@ type sdkToTFModelTestCase struct {
50
58
51
59
func TestStreamConnectionSDKToTFModel (t * testing.T ) {
52
60
var authConfigWithPasswordDefined = tfAuthenticationObject (t , authMechanism , authUsername , "raw password" )
61
+ var authConfigWithOAuth = tfAuthenticationObjectForOAuth (t , authMechanismOAuth , clientID , clientSecret , tokenEndpointURL , scope , saslOauthbearerExtentions , httpsCaPem )
53
62
54
63
testCases := []sdkToTFModelTestCase {
55
64
{
@@ -146,6 +155,44 @@ func TestStreamConnectionSDKToTFModel(t *testing.T) {
146
155
Headers : types .MapNull (types .StringType ),
147
156
},
148
157
},
158
+ {
159
+ name : "Kafka connection type SDK response for OAuthBearer authentication" ,
160
+ SDKResp : & admin.StreamsConnection {
161
+ Name : admin .PtrString (connectionName ),
162
+ Type : admin .PtrString ("Kafka" ),
163
+ Authentication : & admin.StreamsKafkaAuthentication {
164
+ Mechanism : admin .PtrString (authMechanismOAuth ),
165
+ ClientId : admin .PtrString (clientID ),
166
+ TokenEndpointUrl : admin .PtrString (tokenEndpointURL ),
167
+ Scope : admin .PtrString (scope ),
168
+ SaslOauthbearerExtensions : admin .PtrString (saslOauthbearerExtentions ),
169
+ HttpsCaPem : admin .PtrString (httpsCaPem ),
170
+ },
171
+ BootstrapServers : admin .PtrString (bootstrapServers ),
172
+ Config : & configMap ,
173
+ Security : & admin.StreamsKafkaSecurity {
174
+ Protocol : admin .PtrString (securityProtocol ),
175
+ BrokerPublicCertificate : admin .PtrString (DummyCACert ),
176
+ },
177
+ },
178
+ providedProjID : dummyProjectID ,
179
+ providedInstanceName : instanceName ,
180
+ providedAuthConfig : & authConfigWithOAuth ,
181
+ expectedTFModel : & streamconnection.TFStreamConnectionModel {
182
+ ProjectID : types .StringValue (dummyProjectID ),
183
+ InstanceName : types .StringValue (instanceName ),
184
+ ConnectionName : types .StringValue (connectionName ),
185
+ Type : types .StringValue ("Kafka" ),
186
+ Authentication : tfAuthenticationObjectForOAuth (t , authMechanismOAuth , clientID , clientSecret , tokenEndpointURL , scope , saslOauthbearerExtentions , httpsCaPem ), // password value is obtained from config, not api resp.
187
+ BootstrapServers : types .StringValue (bootstrapServers ),
188
+ Config : tfConfigMap (t , configMap ),
189
+ Security : tfSecurityObject (t , DummyCACert , securityProtocol ),
190
+ DBRoleToExecute : types .ObjectNull (streamconnection .DBRoleToExecuteObjectType .AttrTypes ),
191
+ Networking : types .ObjectNull (streamconnection .NetworkingObjectType .AttrTypes ),
192
+ AWS : types .ObjectNull (streamconnection .AWSObjectType .AttrTypes ),
193
+ Headers : types .MapNull (types .StringType ),
194
+ },
195
+ },
149
196
{
150
197
name : "Kafka connection type SDK response with no optional values provided" ,
151
198
SDKResp : & admin.StreamsConnection {
@@ -596,6 +643,23 @@ func tfAuthenticationObject(t *testing.T, mechanism, username, password string)
596
643
return auth
597
644
}
598
645
646
+ func tfAuthenticationObjectForOAuth (t * testing.T , mechanism , clientID , clientSecret , tokenEndpointURL , scope , saslOauthbearerExtensions , httpsCaPem string ) types.Object {
647
+ t .Helper ()
648
+ auth , diags := types .ObjectValueFrom (t .Context (), streamconnection .ConnectionAuthenticationObjectType .AttrTypes , streamconnection.TFConnectionAuthenticationModel {
649
+ Mechanism : types .StringValue (mechanism ),
650
+ ClientID : types .StringValue (clientID ),
651
+ ClientSecret : types .StringValue (clientSecret ),
652
+ TokenEndpointURL : types .StringValue (tokenEndpointURL ),
653
+ Scope : types .StringValue (scope ),
654
+ SaslOauthbearerExtensions : types .StringValue (saslOauthbearerExtensions ),
655
+ HTTPSCaPem : types .StringValue (httpsCaPem ),
656
+ })
657
+ if diags .HasError () {
658
+ t .Errorf ("failed to create terraform data model: %s" , diags .Errors ()[0 ].Summary ())
659
+ }
660
+ return auth
661
+ }
662
+
599
663
func tfAuthenticationObjectWithNoPassword (t * testing.T , mechanism , username string ) types.Object {
600
664
t .Helper ()
601
665
auth , diags := types .ObjectValueFrom (t .Context (), streamconnection .ConnectionAuthenticationObjectType .AttrTypes , streamconnection.TFConnectionAuthenticationModel {
0 commit comments