diff --git a/docs/flink-connector/flink-authentication-with-gravitino.md b/docs/flink-connector/flink-authentication-with-gravitino.md index da5c099fd3b..93455a12a0e 100644 --- a/docs/flink-connector/flink-authentication-with-gravitino.md +++ b/docs/flink-connector/flink-authentication-with-gravitino.md @@ -39,7 +39,7 @@ table.catalog-store.gravitino.gravitino.metalake: my_metalake table.catalog-store.gravitino.gravitino.client.auth.type: oauth2 table.catalog-store.gravitino.gravitino.client.oauth2.serverUri: https://oauth-server.example.com table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath: /oauth/token -table.catalog-store.gravitino.gravitino.client.oauth2.credential: your-client-credentials +table.catalog-store.gravitino.gravitino.client.oauth2.credential: client-id:client-secret table.catalog-store.gravitino.gravitino.client.oauth2.scope: your-scope ``` diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java index bd355ab6f51..7eeb4fab71a 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java @@ -25,9 +25,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; -import org.apache.gravitino.auth.AuthenticatorType; import org.apache.gravitino.client.DefaultOAuth2TokenProvider; import org.apache.gravitino.client.GravitinoAdminClient; import org.apache.gravitino.client.GravitinoMetalake; @@ -66,7 +64,7 @@ private GravitinoCatalogManager( // Only OAuth is explicitly configured; otherwise follow Flink security (Kerberos if enabled, // simple auth otherwise). - if (AuthenticatorType.OAUTH.name().equalsIgnoreCase(authType)) { + if (GravitinoCatalogStoreFactoryOptions.OAUTH2.equalsIgnoreCase(authType)) { this.gravitinoClient = buildOAuthClient(gravitinoUri, gravitinoClientConfig); } else { if (authType != null) { @@ -76,6 +74,11 @@ private GravitinoCatalogManager( authType, GravitinoCatalogStoreFactoryOptions.AUTH_TYPE)); } + LOG.info( + "Flink security enabled: {}, Current user: {}", + UserGroupInformation.isSecurityEnabled(), + getUgi().getUserName()); + if (UserGroupInformation.isSecurityEnabled()) { if (getUgi().getAuthenticationMethod() != UserGroupInformation.AuthenticationMethod.KERBEROS) { @@ -247,14 +250,21 @@ private static GravitinoAdminClient buildOAuthClient( String credential = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL); String path = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH); String scope = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE); - Preconditions.checkArgument( - StringUtils.isNoneBlank(serverUri, credential, path, scope), - String.format( - "OAuth2 authentication requires: %s, %s, %s, and %s", + + // Remove OAuth-specific config entries from the client config map. These keys are only + // used to construct the OAuth2 token provider and are not valid GravitinoAdminClient + // client configuration options; passing them to withClientConfig() could cause validation + // errors or other unexpected behavior. + Set oauthConfigKeys = + Sets.newHashSet( + GravitinoCatalogStoreFactoryOptions.AUTH_TYPE, GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI, GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL, GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH, - GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE)); + GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE); + for (String key : oauthConfigKeys) { + config.remove(key); + } DefaultOAuth2TokenProvider provider = DefaultOAuth2TokenProvider.builder() diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java index f989c3d1381..cda42978817 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java @@ -48,6 +48,7 @@ private GravitinoCatalogStoreFactoryOptions() {} .withDescription("The config of Gravitino client"); public static final String AUTH_TYPE = "gravitino.client.auth.type"; + public static final String OAUTH2 = "oauth2"; // OAuth2 config keys public static final String OAUTH2_SERVER_URI = "gravitino.client.oauth2.serverUri";