Skip to content

Commit 38c420d

Browse files
authored
[#9564] fix(authn): Fix Flink connector oauth2 mode (#9618)
### What changes were proposed in this pull request? Fix Flink connector oauth2 mode. ### Why are the changes needed? This is a follow up PR. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By hand.
1 parent 3a2dbce commit 38c420d

File tree

3 files changed

+20
-9
lines changed

3 files changed

+20
-9
lines changed

docs/flink-connector/flink-authentication-with-gravitino.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ table.catalog-store.gravitino.gravitino.metalake: my_metalake
3939
table.catalog-store.gravitino.gravitino.client.auth.type: oauth2
4040
table.catalog-store.gravitino.gravitino.client.oauth2.serverUri: https://oauth-server.example.com
4141
table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath: /oauth/token
42-
table.catalog-store.gravitino.gravitino.client.oauth2.credential: your-client-credentials
42+
table.catalog-store.gravitino.gravitino.client.oauth2.credential: client-id:client-secret
4343
table.catalog-store.gravitino.gravitino.client.oauth2.scope: your-scope
4444
```
4545

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
import java.util.Arrays;
2626
import java.util.Map;
2727
import java.util.Set;
28-
import org.apache.commons.lang3.StringUtils;
2928
import org.apache.gravitino.Catalog;
30-
import org.apache.gravitino.auth.AuthenticatorType;
3129
import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
3230
import org.apache.gravitino.client.GravitinoAdminClient;
3331
import org.apache.gravitino.client.GravitinoMetalake;
@@ -66,7 +64,7 @@ private GravitinoCatalogManager(
6664

6765
// Only OAuth is explicitly configured; otherwise follow Flink security (Kerberos if enabled,
6866
// simple auth otherwise).
69-
if (AuthenticatorType.OAUTH.name().equalsIgnoreCase(authType)) {
67+
if (GravitinoCatalogStoreFactoryOptions.OAUTH2.equalsIgnoreCase(authType)) {
7068
this.gravitinoClient = buildOAuthClient(gravitinoUri, gravitinoClientConfig);
7169
} else {
7270
if (authType != null) {
@@ -76,6 +74,11 @@ private GravitinoCatalogManager(
7674
authType, GravitinoCatalogStoreFactoryOptions.AUTH_TYPE));
7775
}
7876

77+
LOG.info(
78+
"Flink security enabled: {}, Current user: {}",
79+
UserGroupInformation.isSecurityEnabled(),
80+
getUgi().getUserName());
81+
7982
if (UserGroupInformation.isSecurityEnabled()) {
8083
if (getUgi().getAuthenticationMethod()
8184
!= UserGroupInformation.AuthenticationMethod.KERBEROS) {
@@ -247,14 +250,21 @@ private static GravitinoAdminClient buildOAuthClient(
247250
String credential = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
248251
String path = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH);
249252
String scope = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
250-
Preconditions.checkArgument(
251-
StringUtils.isNoneBlank(serverUri, credential, path, scope),
252-
String.format(
253-
"OAuth2 authentication requires: %s, %s, %s, and %s",
253+
254+
// Remove OAuth-specific config entries from the client config map. These keys are only
255+
// used to construct the OAuth2 token provider and are not valid GravitinoAdminClient
256+
// client configuration options; passing them to withClientConfig() could cause validation
257+
// errors or other unexpected behavior.
258+
Set<String> oauthConfigKeys =
259+
Sets.newHashSet(
260+
GravitinoCatalogStoreFactoryOptions.AUTH_TYPE,
254261
GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI,
255262
GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL,
256263
GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH,
257-
GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE));
264+
GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
265+
for (String key : oauthConfigKeys) {
266+
config.remove(key);
267+
}
258268

259269
DefaultOAuth2TokenProvider provider =
260270
DefaultOAuth2TokenProvider.builder()

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ private GravitinoCatalogStoreFactoryOptions() {}
4848
.withDescription("The config of Gravitino client");
4949

5050
public static final String AUTH_TYPE = "gravitino.client.auth.type";
51+
public static final String OAUTH2 = "oauth2";
5152

5253
// OAuth2 config keys
5354
public static final String OAUTH2_SERVER_URI = "gravitino.client.oauth2.serverUri";

0 commit comments

Comments
 (0)