@@ -3,6 +3,7 @@ package flink
33import (
44 "fmt"
55 "slices"
6+ "strings"
67 "time"
78
89 "github.com/samber/lo"
@@ -96,6 +97,7 @@ func AddConnectionSecretFlags(cmd *cobra.Command) {
9697 cmd .Flags ().String ("client-secret" , "" , fmt .Sprintf ("Specify OAuth2 client secret for the type: %s." , utils .ArrayToCommaDelimitedString (flink .ConnectionSecretTypeMapping ["client-secret" ], "or" )))
9798 cmd .Flags ().String ("scope" , "" , fmt .Sprintf ("Specify OAuth2 scope for the type: %s." , utils .ArrayToCommaDelimitedString (flink .ConnectionSecretTypeMapping ["scope" ], "or" )))
9899 cmd .Flags ().String ("sse-endpoint" , "" , fmt .Sprintf ("Specify SSE endpoint for the type: %s." , utils .ArrayToCommaDelimitedString (flink .ConnectionSecretTypeMapping ["sse-endpoint" ], "or" )))
100+ cmd .Flags ().String ("transport-type" , "" , fmt .Sprintf ("Specify transport type for the type: %s. Default: SSE." , utils .ArrayToCommaDelimitedString (flink .ConnectionSecretTypeMapping ["transport-type" ], "or" )))
99101 cmd .MarkFlagsRequiredTogether ("username" , "password" )
100102 cmd .MarkFlagsRequiredTogether ("aws-access-key" , "aws-secret-key" )
101103 cmd .MarkFlagsRequiredTogether ("token-endpoint" , "client-id" , "client-secret" , "scope" )
@@ -109,29 +111,51 @@ func validateConnectionType(connectionType string) error {
109111 return nil
110112}
111113
112- func validateConnectionSecrets (cmd * cobra.Command , connectionType string ) (map [string ]string , error ) {
113- var connectionSecrets []string
114- connectionSecrets = append (connectionSecrets , flink .ConnectionTypeSecretMapping [connectionType ]... )
114+ func validateSecretCompatibility (cmd * cobra.Command , connectionType string ) error {
115+ connectionSecrets := flink .ConnectionTypeSecretMapping [connectionType ]
115116
116117 for key := range flink .ConnectionSecretTypeMapping {
117118 secret , err := cmd .Flags ().GetString (key )
118119 if err != nil {
119- return nil , err
120+ return err
120121 }
121122 if secret != "" && ! slices .Contains (connectionSecrets , key ) {
122- return nil , errors .NewErrorWithSuggestions (fmt .Sprintf ("%s is invalid for connection %s." , key , connectionType ), fmt .Sprintf ("Valid secret types are %s." , utils .ArrayToCommaDelimitedString (connectionSecrets , "or" )))
123+ return errors .NewErrorWithSuggestions (
124+ fmt .Sprintf ("%s is invalid for connection %s." , key , connectionType ),
125+ fmt .Sprintf ("Valid secret types are %s." , utils .ArrayToCommaDelimitedString (connectionSecrets , "or" )))
123126 }
124127 }
128+ return nil
129+ }
125130
126- requiredSecretKeys := flink .ConnectionRequiredSecretMapping [connectionType ]
127- var optionalSecretKeys []string
128- for _ , secretKey := range flink .ConnectionTypeSecretMapping [connectionType ] {
129- if ! slices .Contains (requiredSecretKeys , secretKey ) {
130- optionalSecretKeys = append (optionalSecretKeys , secretKey )
131+ func validateSecretValues (cmd * cobra.Command ) error {
132+ for key , allowedValues := range flink .ConnectionSecretAllowedValues {
133+ secretValue , err := cmd .Flags ().GetString (key )
134+ if err != nil {
135+ return err
136+ }
137+ if secretValue != "" && ! containsCaseInsensitive (allowedValues , secretValue ) {
138+ return errors .NewErrorWithSuggestions (
139+ fmt .Sprintf ("%s is invalid value for flag %s." , secretValue , key ),
140+ fmt .Sprintf ("Valid values for flag %s are %s." , key , utils .ArrayToCommaDelimitedString (allowedValues , "or" )))
141+ }
142+ }
143+ return nil
144+ }
145+
146+ func containsCaseInsensitive (slice []string , item string ) bool {
147+ for _ , s := range slice {
148+ if strings .EqualFold (s , item ) {
149+ return true
131150 }
132151 }
152+ return false
153+ }
133154
155+ func buildRequiredSecretsMap (cmd * cobra.Command , connectionType string ) (map [string ]string , error ) {
156+ requiredSecretKeys := flink .ConnectionRequiredSecretMapping [connectionType ]
134157 secretMap := map [string ]string {}
158+
135159 for _ , requiredKey := range requiredSecretKeys {
136160 secret , err := cmd .Flags ().GetString (requiredKey )
137161 if err != nil {
@@ -146,23 +170,47 @@ func validateConnectionSecrets(cmd *cobra.Command, connectionType string) (map[s
146170 }
147171 secretMap [backendKey ] = secret
148172 }
173+ return secretMap , nil
174+ }
175+
176+ func addOptionalSecretsToMap (cmd * cobra.Command , connectionType string , secretMap map [string ]string ) error {
177+ requiredSecretKeys := flink .ConnectionRequiredSecretMapping [connectionType ]
178+ var optionalSecretKeys []string
179+
180+ for _ , secretKey := range flink .ConnectionTypeSecretMapping [connectionType ] {
181+ if ! slices .Contains (requiredSecretKeys , secretKey ) {
182+ optionalSecretKeys = append (optionalSecretKeys , secretKey )
183+ }
184+ }
149185
150186 for _ , optionalSecretKey := range optionalSecretKeys {
151187 secret , err := cmd .Flags ().GetString (optionalSecretKey )
152188 if err != nil {
153- return nil , err
189+ return err
154190 }
155191
156192 backendKey , ok := flink .ConnectionSecretBackendKeyMapping [optionalSecretKey ]
157193 if ! ok {
158- return nil , fmt .Errorf ("backend key not found for %s" , optionalSecretKey )
194+ return fmt .Errorf ("backend key not found for %s" , optionalSecretKey )
159195 }
160196
161197 if secret != "" {
162198 secretMap [backendKey ] = secret
163199 }
164200 }
201+ return nil
202+ }
203+
204+ func validateTransportTypeRules (secretMap map [string ]string ) error {
205+ if transportType , ok := secretMap ["TRANSPORT_TYPE" ]; ok && transportType == "STREAMABLE_HTTP" {
206+ if _ , ok := secretMap ["SSE_ENDPOINT" ]; ok {
207+ return fmt .Errorf ("sse-endpoint flag is not allowed for STREAMABLE_HTTP transport-type" )
208+ }
209+ }
210+ return nil
211+ }
165212
213+ func determineAndSetAuthType (secretMap map [string ]string ) {
166214 if _ , ok := secretMap ["API_KEY" ]; ok {
167215 secretMap [authType ] = "API_KEY"
168216 } else if _ , ok := secretMap ["USERNAME" ]; ok {
@@ -172,13 +220,52 @@ func validateConnectionSecrets(cmd *cobra.Command, connectionType string) (map[s
172220 } else if _ , ok := secretMap ["OAUTH2_CLIENT_ID" ]; ok {
173221 secretMap [authType ] = "OAUTH2"
174222 }
223+ }
175224
225+ func validateRequiredAuthSecrets (connectionType string , secretMap map [string ]string ) error {
176226 if secretMap [authType ] == "" && slices .Contains (types .GetKeys (flink .ConnectionOneOfRequiredSecretsMapping ), connectionType ) {
177- return nil , fmt .Errorf ("no secrets provided for type %s, one of the required secrets %s must be provided" , connectionType ,
227+ return fmt .Errorf ("no secrets provided for type %s, one of the required secrets %s must be provided" , connectionType ,
178228 utils .ArrayToCommaDelimitedString (lo .Map (flink .ConnectionOneOfRequiredSecretsMapping [connectionType ], func (item []string , _ int ) string {
179229 return fmt .Sprintf ("%s" , item )
180230 }), "or" ))
181231 }
232+ return nil
233+ }
234+
235+ func validateConnectionSecrets (cmd * cobra.Command , connectionType string ) (map [string ]string , error ) {
236+ // Validate secret compatibility with connection type
237+ if err := validateSecretCompatibility (cmd , connectionType ); err != nil {
238+ return nil , err
239+ }
240+
241+ // Validate secret values against allowed values
242+ if err := validateSecretValues (cmd ); err != nil {
243+ return nil , err
244+ }
245+
246+ // Build secret map from required secrets
247+ secretMap , err := buildRequiredSecretsMap (cmd , connectionType )
248+ if err != nil {
249+ return nil , err
250+ }
251+
252+ // Add optional secrets to the map
253+ if err := addOptionalSecretsToMap (cmd , connectionType , secretMap ); err != nil {
254+ return nil , err
255+ }
256+
257+ // Validate transport type specific rules
258+ if err := validateTransportTypeRules (secretMap ); err != nil {
259+ return nil , err
260+ }
261+
262+ // Determine and set authentication type
263+ determineAndSetAuthType (secretMap )
264+
265+ // Validate required authentication secrets
266+ if err := validateRequiredAuthSecrets (connectionType , secretMap ); err != nil {
267+ return nil , err
268+ }
182269
183270 return secretMap , nil
184271}
0 commit comments