Skip to content

Commit 60de619

Browse files
roryqiCopilot
andauthored
[#9564] feat(flink): Flink connector supports user authentication (#9565)
### What changes were proposed in this pull request? Flink connector supports user authentication. ### Why are the changes needed? Fix: #9564 ### Does this PR introduce _any_ user-facing change? Add the documents. ### How was this patch tested? Add integration tests. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 15379bf commit 60de619

File tree

9 files changed

+622
-43
lines changed

9 files changed

+622
-43
lines changed

clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java

Lines changed: 168 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
import java.io.File;
2525
import java.io.IOException;
2626
import java.nio.charset.StandardCharsets;
27+
import java.time.Instant;
2728
import java.util.Base64;
2829
import java.util.List;
2930
import java.util.Set;
3031
import java.util.concurrent.Callable;
32+
import javax.security.auth.Subject;
33+
import javax.security.auth.kerberos.KerberosKey;
34+
import javax.security.auth.kerberos.KerberosPrincipal;
3135
import javax.security.auth.kerberos.KerberosTicket;
3236
import javax.security.auth.login.LoginContext;
3337
import javax.security.auth.login.LoginException;
@@ -49,7 +53,7 @@ public final class KerberosTokenProvider implements AuthDataProvider {
4953
private String clientPrincipal;
5054
private String keytabFile;
5155
private String host = "localhost";
52-
private LoginContext loginContext;
56+
private SubjectProvider subjectProvider;
5357

5458
private KerberosTokenProvider() {}
5559

@@ -82,22 +86,11 @@ public byte[] getTokenData() {
8286
private byte[] getTokenInternal() throws Exception {
8387
@SuppressWarnings("null")
8488
List<String> principalComponents = Splitter.on('@').splitToList(clientPrincipal);
85-
// Gravitino server's principal must start with HTTP. This restriction follows
86-
// the style of Apache Hadoop.
8789
String serverPrincipal = "HTTP/" + host + "@" + principalComponents.get(1);
88-
89-
synchronized (this) {
90-
if (loginContext == null) {
91-
loginContext = KerberosUtils.login(clientPrincipal, keytabFile);
92-
} else if (isLoginTicketExpired() && keytabFile != null) {
93-
// We only support use keytab to re-login context
94-
loginContext.logout();
95-
loginContext = KerberosUtils.login(clientPrincipal, keytabFile);
96-
}
97-
}
90+
Subject currentSubject = subjectProvider.get();
9891

9992
return KerberosUtils.doAs(
100-
loginContext.getSubject(),
93+
currentSubject,
10194
new Callable<byte[]>() {
10295
@Override
10396
public byte[] call() throws Exception {
@@ -127,24 +120,11 @@ public byte[] call() throws Exception {
127120
});
128121
}
129122

130-
@SuppressWarnings("JavaUtilDate")
131-
private boolean isLoginTicketExpired() {
132-
Set<KerberosTicket> tickets =
133-
loginContext.getSubject().getPrivateCredentials(KerberosTicket.class);
134-
135-
if (tickets.isEmpty()) {
136-
return false;
137-
}
138-
139-
return tickets.iterator().next().getEndTime().getTime() < System.currentTimeMillis();
140-
}
141-
142-
/** Closes the KerberosTokenProvider and releases any underlying resources. */
143123
@Override
144124
public void close() throws IOException {
145125
try {
146-
if (loginContext != null) {
147-
loginContext.logout();
126+
if (subjectProvider != null) {
127+
subjectProvider.close();
148128
}
149129
} catch (LoginException le) {
150130
throw new IOException("Fail to close login context", le);
@@ -155,6 +135,113 @@ void setHost(String host) {
155135
this.host = host;
156136
}
157137

138+
/**
139+
* Strategy interface for providing Kerberos Subject credentials.
140+
*
141+
* <p>There are two strategies:
142+
*
143+
* <ul>
144+
* <li>{@link ExistingSubjectProvider} - Reuses credentials maintained by the framework (e.g.,
145+
* Flink)
146+
* <li>{@link LoginSubjectProvider} - Creates and manages new credentials using keytab and
147+
* principal
148+
* </ul>
149+
*/
150+
private interface SubjectProvider {
151+
Subject get() throws LoginException;
152+
153+
void close() throws LoginException;
154+
}
155+
156+
/**
157+
* SubjectProvider that reuses existing Kerberos credentials from the framework.
158+
*
159+
* <p>When Flink (or another framework) has already logged in using keytab and principal, this
160+
* provider reuses those credentials instead of creating new ones. This approach:
161+
*
162+
* <ul>
163+
* <li>Avoids redundant Kerberos logins
164+
* <li>Lets the framework manage credential lifecycle (renewal, expiration)
165+
* <li>Reduces complexity - the client doesn't need to maintain credentials
166+
* </ul>
167+
*
168+
* <p>The Subject is obtained from the current AccessControlContext and contains KerberosKey or
169+
* KerberosTicket credentials managed by the framework.
170+
*/
171+
private static final class ExistingSubjectProvider implements SubjectProvider {
172+
private final Subject subject;
173+
174+
ExistingSubjectProvider(Subject subject) {
175+
this.subject = subject;
176+
}
177+
178+
@Override
179+
public Subject get() {
180+
return subject;
181+
}
182+
183+
@Override
184+
public void close() {
185+
// no-op: The framework owns the Subject and is responsible for its lifecycle
186+
}
187+
}
188+
189+
/**
190+
* SubjectProvider that performs Kerberos login using keytab and principal.
191+
*
192+
* <p>This provider is used when no existing Kerberos credentials are available from the
193+
* framework. It:
194+
*
195+
* <ul>
196+
* <li>Performs initial login using the provided keytab file and principal
197+
* <li>Manages the LoginContext lifecycle
198+
* <li>Automatically re-authenticates when the TGT (Ticket Granting Ticket) expires
199+
* </ul>
200+
*
201+
* <p>This provider is responsible for credential lifecycle management including renewal and
202+
* cleanup.
203+
*/
204+
private static final class LoginSubjectProvider implements SubjectProvider {
205+
private final String principal;
206+
private final String keytabFile;
207+
private LoginContext loginContext;
208+
209+
LoginSubjectProvider(String principal, String keytabFile) {
210+
this.principal = principal;
211+
this.keytabFile = keytabFile;
212+
}
213+
214+
@Override
215+
public synchronized Subject get() throws LoginException {
216+
// Perform initial login if not already logged in
217+
if (loginContext == null) {
218+
loginContext = KerberosUtils.login(principal, keytabFile);
219+
} else if (keytabFile != null && isLoginTicketExpired(loginContext)) {
220+
// If the TGT (Ticket Granting Ticket) has expired, logout and re-authenticate
221+
// This ensures we always have valid credentials for GSS context negotiation
222+
loginContext.logout();
223+
loginContext = KerberosUtils.login(principal, keytabFile);
224+
}
225+
return loginContext.getSubject();
226+
}
227+
228+
@Override
229+
public void close() throws LoginException {
230+
if (loginContext != null) {
231+
loginContext.logout();
232+
}
233+
}
234+
235+
private boolean isLoginTicketExpired(LoginContext ctx) {
236+
Set<KerberosTicket> tickets = ctx.getSubject().getPrivateCredentials(KerberosTicket.class);
237+
if (tickets.isEmpty()) {
238+
return false;
239+
}
240+
// For one principal, there should be only one TGT ticket
241+
return tickets.iterator().next().getEndTime().toInstant().isBefore(Instant.now());
242+
}
243+
}
244+
158245
/**
159246
* Creates a new instance of the KerberosTokenProvider.Builder
160247
*
@@ -194,19 +281,41 @@ public Builder withKeyTabFile(File file) {
194281
/**
195282
* Builds the instance of the KerberosTokenProvider.
196283
*
284+
* <p>This method determines the authentication strategy:
285+
*
286+
* <ul>
287+
* <li>If Kerberos credentials already exist in the current context (e.g., Flink has logged
288+
* in), use {@link ExistingSubjectProvider} to reuse those credentials
289+
* <li>Otherwise, use {@link LoginSubjectProvider} to perform a new Kerberos login using the
290+
* provided keytab and principal
291+
* </ul>
292+
*
197293
* @return The built KerberosTokenProvider instance.
198294
*/
199-
@SuppressWarnings("null")
295+
@SuppressWarnings("removal")
200296
public KerberosTokenProvider build() {
201297
KerberosTokenProvider provider = new KerberosTokenProvider();
202298

203-
Preconditions.checkArgument(
204-
StringUtils.isNotBlank(clientPrincipal),
205-
"KerberosTokenProvider must set clientPrincipal");
206-
Preconditions.checkArgument(
207-
Splitter.on('@').splitToList(clientPrincipal).size() == 2,
208-
"Principal has the wrong format");
209-
provider.clientPrincipal = clientPrincipal;
299+
// Check if the framework (e.g., Flink) has already established Kerberos credentials
300+
java.security.AccessControlContext context = java.security.AccessController.getContext();
301+
Subject subject = Subject.getSubject(context);
302+
303+
// If credentials exist (KerberosKey or KerberosTicket), reuse them
304+
// This avoids redundant logins when Flink has already authenticated with Kerberos
305+
if (subject != null
306+
&& (!subject.getPrivateCredentials(KerberosKey.class).isEmpty()
307+
|| !subject.getPrivateCredentials(KerberosTicket.class).isEmpty())) {
308+
// Use ExistingSubjectProvider: framework manages the credentials
309+
provider.subjectProvider = new ExistingSubjectProvider(subject);
310+
311+
extractPrincipalFromSubject(subject);
312+
setProviderClientPrincipal(provider);
313+
314+
return provider;
315+
}
316+
317+
// No existing credentials found - we need to login ourselves
318+
setProviderClientPrincipal(provider);
210319

211320
if (keyTabFile != null) {
212321
Preconditions.checkArgument(
@@ -216,7 +325,29 @@ public KerberosTokenProvider build() {
216325
provider.keytabFile = keyTabFile.getAbsolutePath();
217326
}
218327

328+
// Use LoginSubjectProvider: client manages the credentials
329+
// This provider will login using keytab/principal and handle ticket renewal
330+
provider.subjectProvider =
331+
new LoginSubjectProvider(provider.clientPrincipal, provider.keytabFile);
219332
return provider;
220333
}
334+
335+
private void setProviderClientPrincipal(KerberosTokenProvider provider) {
336+
Preconditions.checkArgument(
337+
StringUtils.isNotBlank(clientPrincipal),
338+
"KerberosTokenProvider must set clientPrincipal");
339+
Preconditions.checkArgument(
340+
Splitter.on('@').splitToList(clientPrincipal).size() == 2,
341+
"Principal has the wrong format");
342+
provider.clientPrincipal = clientPrincipal;
343+
}
344+
345+
private void extractPrincipalFromSubject(Subject subject) {
346+
clientPrincipal =
347+
subject.getPrincipals(KerberosPrincipal.class).stream()
348+
.findFirst()
349+
.map(Object::toString)
350+
.orElse(null);
351+
}
221352
}
222353
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
---
2+
title: "Flink authentication with Gravitino server"
3+
slug: /flink-connector/flink-authentication
4+
keyword: flink connector authentication oauth2 kerberos
5+
license: "This software is licensed under the Apache License version 2."
6+
---
7+
8+
## Overview
9+
10+
Flink connector supports `simple`, `oauth2`, and `kerberos` authentication when accessing the Gravitino server.
11+
12+
| Property | Type | Default Value | Description | Required | Since Version |
13+
|-----------------------------------------------------------|--------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
14+
| table.catalog-store.gravitino.gravitino.client.auth.type | string | (none) | When explicitly set, only `oauth` is supported. If unset, Flink selects Kerberos or simple authentication based on its security settings. | No | 1.2.0 |
15+
16+
## Simple mode
17+
18+
In simple mode, the username originates from Flink. The resolution order is:
19+
1. `HADOOP_USER_NAME` environment variable
20+
2. The logged-in OS user
21+
22+
## OAuth2 mode
23+
24+
In OAuth2 mode, configure the following settings to fetch an OAuth2 token to access the Gravitino server:
25+
26+
| Property | Type | Default Value | Description | Required | Since Version |
27+
|-----------------------------------------------------------------------|--------|---------------|--------------------------------------------------|--------------------------------|---------------|
28+
| table.catalog-store.gravitino.gravitino.client.oauth2.serverUri | string | (none) | The OAuth2 server URI. | Yes, for OAuth2 mode | 1.2.0 |
29+
| table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath | string | (none) | The token endpoint path on the OAuth2 server. | Yes, for OAuth2 mode | 1.2.0 |
30+
| table.catalog-store.gravitino.gravitino.client.oauth2.credential | string | (none) | The credential used to request the OAuth2 token. | Yes, for OAuth2 mode | 1.2.0 |
31+
| table.catalog-store.gravitino.gravitino.client.oauth2.scope | string | (none) | The scope used to request the OAuth2 token. | Yes, for OAuth2 mode | 1.2.0 |
32+
33+
### OAuth2 Configuration Example
34+
35+
```yaml
36+
table.catalog-store.kind: gravitino
37+
table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
38+
table.catalog-store.gravitino.gravitino.metalake: my_metalake
39+
table.catalog-store.gravitino.gravitino.client.auth.type: oauth2
40+
table.catalog-store.gravitino.gravitino.client.oauth2.serverUri: https://oauth-server.example.com
41+
table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath: /oauth/token
42+
table.catalog-store.gravitino.gravitino.client.oauth2.credential: your-client-credentials
43+
table.catalog-store.gravitino.gravitino.client.oauth2.scope: your-scope
44+
```
45+
46+
## Kerberos mode
47+
48+
In Kerberos mode, use Flink security configurations to obtain a Kerberos ticket for accessing the Gravitino server. Configure `security.kerberos.login.principal` and `security.kerberos.login.keytab` for the Kerberos principal and keytab.
49+
50+
The Gravitino server principal follows the pattern `HTTP/$host@$realm`; ensure `$host` matches the host specified in the Gravitino server URI. Ensure `krb5.conf` is available to Flink, for example via `-Djava.security.krb5.conf=/path/to/krb5.conf` in Flink JVM options.

flink-connector/flink/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ dependencies {
9696
testImplementation(libs.testcontainers.mysql)
9797
testImplementation(libs.metrics.core)
9898
testImplementation(libs.flinkjdbc)
99+
testImplementation(libs.minikdc)
99100

100101
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
101102
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")

0 commit comments

Comments
 (0)