Skip to content

Commit ab310d5

Browse files
Add token refresh and token exchange OAuth2 flows to polaris-synchronizer tool. (#8)
* Add token refresh and token exchange auth flow * use credential instead of client id and client secret * Added token exchange flow * Remove spurious change * Addressed comments * Removed token exchnage * update comment * Services properly close resources and enforce closeability on abstractions * Removed credential and token from parent auth * addressed comments * Closed iceberg catalog service properly
1 parent 22c82fc commit ab310d5

File tree

15 files changed

+318
-360
lines changed

15 files changed

+318
-360
lines changed

polaris-synchronizer/README.md

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ with 10 concurrent catalog setup threads:
6161
java -jar cli/build/libs/polaris-synchronizer-cli.jar create-omnipotent-principal \
6262
--polaris-api-connection-properties base-url=http://localhost:8181 \
6363
--polaris-api-connection-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
64-
--polaris-api-connection-properties client-id=root \
65-
--polaris-api-connection-properties client-secret=<client_secret> \
64+
--polaris-api-connection-properties credential=<client_id>:<client_secret> \
6665
--polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \
6766
--replace \ # replace it if it already exists
6867
--concurrency 10 # 10 concurrent catalog setup threads
@@ -105,8 +104,7 @@ java -jar cli/build/libs/polaris-synchronizer-cli.jar \
105104
create-omnipotent-principal \
106105
--polaris-api-connection-properties base-url=http://localhost:8181 \
107106
--polaris-api-connection-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
108-
--polaris-api-connection-properties client-id=root \
109-
--polaris-api-connection-properties client-secret=<client_secret> \
107+
--polaris-api-connection-properties credential=<client_id>:<client_secret> \
110108
--polaris-api-connection-properties scope=PRINCIPAL_ROLE:ALL \
111109
--replace \ # replace if it already exists
112110
--concurrency 10 \ # 10 concurrent catalog setup threads
@@ -143,22 +141,18 @@ diff between the source and target Polaris instances. This can be achieved using
143141
> Polaris instance. The new credentials will be logged to stdout, ONLY for each newly created or overwritten principal.
144142
> Please note that this output should be securely managed, client credentials should only ever be stored in a secure vault.
145143
146-
**Example** Running the synchronization between source Polaris instance using an access token, and a target Polaris instance
144+
**Example** Running the synchronization between source Polaris instance using a bearer token, and a target Polaris instance
147145
using client credentials.
148146
```
149147
java -jar cli/build/libs/polaris-synchronizer-cli.jar sync-polaris \
150148
--source-properties base-url=http://localhost:8181 \
151-
--source-properties client-id=root \
152-
--source-properties client-secret=<client_secret> \
153-
--source-properties oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
154-
--source-properties scope=PRINCIPAL_ROLE:ALL \
149+
--source-properties token=<bearer_token> \
155150
--source-properties omnipotent-principal-name=omnipotent-principal-XXXXX \
156151
--source-properties omnipotent-principal-client-id=589550e8b23d271e \
157152
--source-properties omnipotent-principal-client-secret=<omni_client_secret> \
158153
--source-properties omnipotent-principal-oauth2-server-uri=http://localhost:8181/api/catalog/v1/oauth/tokens \
159154
--target-properties base-url=http://localhost:5858 \
160-
--target-properties client-id=root \
161-
--target-properties client-secret=<client_secret> \
155+
--target-properties credential=<client_id>:<client_secret> \
162156
--target-properties oauth2-server-uri=http://localhost:5858/api/catalog/v1/oauth/tokens \
163157
--target-properties scope=PRINCIPAL_ROLE:ALL \
164158
--target-properties omnipotent-principal-name=omnipotent-principal-YYYYY \

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -630,41 +630,30 @@ public void syncCatalogs() {
630630
}
631631

632632
for (Catalog catalog : catalogSyncPlan.entitiesToSyncChildren()) {
633-
IcebergCatalogService sourceIcebergCatalogService;
634633

635-
try {
636-
sourceIcebergCatalogService = source.initializeIcebergCatalogService(catalog.getName());
634+
try (IcebergCatalogService sourceIcebergCatalogService = source.initializeIcebergCatalogService(catalog.getName())) {
637635
clientLogger.info(
638-
"Initialized Iceberg REST catalog for Polaris catalog {} on source.",
639-
catalog.getName());
640-
} catch (Exception e) {
641-
if (haltOnFailure) throw e;
642-
clientLogger.error(
643-
"Failed to initialize Iceberg REST catalog for Polaris catalog {} on source.",
644-
catalog.getName(),
645-
e);
646-
continue;
647-
}
636+
"Initialized Iceberg REST catalog for Polaris catalog {} on source.",
637+
catalog.getName());
648638

649-
IcebergCatalogService targetIcebergCatalogService;
639+
try (IcebergCatalogService targetIcebergCatalogService = target.initializeIcebergCatalogService(catalog.getName())) {
640+
clientLogger.info(
641+
"Initialized Iceberg REST catalog for Polaris catalog {} on target.",
642+
catalog.getName());
643+
644+
syncNamespaces(
645+
catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService);
646+
}
650647

651-
try {
652-
targetIcebergCatalogService = target.initializeIcebergCatalogService(catalog.getName());
653-
clientLogger.info(
654-
"Initialized Iceberg REST catalog for Polaris catalog {} on target.",
655-
catalog.getName());
656648
} catch (Exception e) {
657-
if (haltOnFailure) throw e;
658649
clientLogger.error(
659-
"Failed to initialize Iceberg REST catalog for Polaris catalog {} on target.",
660-
catalog.getName(),
661-
e);
650+
"Failed to synchronize Iceberg REST catalog for Polaris catalog {}.",
651+
catalog.getName(),
652+
e);
653+
if (haltOnFailure) throw new RuntimeException(e);
662654
continue;
663655
}
664656

665-
syncNamespaces(
666-
catalog.getName(), Namespace.empty(), sourceIcebergCatalogService, targetIcebergCatalogService);
667-
668657
// NOTE: Grants are synced on a per catalog role basis, so we need to ensure that catalog roles
669658
// are only synced AFTER Iceberg catalog entities, because they may depend on the Iceberg catalog
670659
// entities already existing
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package org.apache.polaris.tools.sync.polaris.auth;
2+
3+
import org.apache.iceberg.rest.HTTPClient;
4+
import org.apache.iceberg.rest.RESTClient;
5+
import org.apache.iceberg.rest.auth.AuthConfig;
6+
import org.apache.iceberg.rest.auth.OAuth2Properties;
7+
import org.apache.iceberg.rest.auth.OAuth2Util;
8+
import org.apache.iceberg.util.ThreadPools;
9+
10+
import java.io.Closeable;
11+
import java.io.IOException;
12+
import java.util.Map;
13+
import java.util.UUID;
14+
import java.util.concurrent.ScheduledExecutorService;
15+
16+
/**
17+
* Wraps {@link OAuth2Util.AuthSession} to provide supported authentication flows.
18+
*/
19+
public class AuthenticationSessionWrapper implements Closeable {
20+
21+
private final RESTClient restClient;
22+
23+
private final OAuth2Util.AuthSession authSession;
24+
25+
private final ScheduledExecutorService executor;
26+
27+
public AuthenticationSessionWrapper(Map<String, String> properties) {
28+
this.restClient = HTTPClient.builder(Map.of())
29+
.uri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI))
30+
.build();
31+
this.authSession = this.newAuthSession(this.restClient, properties);
32+
this.executor = ThreadPools.newScheduledPool(UUID.randomUUID() + "-token-refresh", 1);
33+
}
34+
35+
/**
36+
* Initializes a new authentication session. Supports client_credentials and bearer token flow.
37+
* @param properties properties to initialize the session with
38+
* @return an authentication session, with token refresh if applicable
39+
*/
40+
private OAuth2Util.AuthSession newAuthSession(RESTClient restClient, Map<String, String> properties) {
41+
OAuth2Util.AuthSession parent = new OAuth2Util.AuthSession(
42+
Map.of(),
43+
AuthConfig.builder()
44+
.scope(properties.get(OAuth2Properties.SCOPE))
45+
.oauth2ServerUri(properties.get(OAuth2Properties.OAUTH2_SERVER_URI))
46+
.optionalOAuthParams(OAuth2Util.buildOptionalParam(properties))
47+
.build()
48+
);
49+
50+
// This is for client_credentials flow
51+
if (properties.containsKey(OAuth2Properties.CREDENTIAL)) {
52+
return OAuth2Util.AuthSession.fromCredential(
53+
restClient,
54+
// threads created here will be daemon threads, so termination of main program
55+
// will terminate the token refresh thread automatically
56+
this.executor,
57+
properties.get(OAuth2Properties.CREDENTIAL),
58+
parent
59+
);
60+
}
61+
62+
// This is for regular bearer token flow
63+
if (properties.containsKey(OAuth2Properties.TOKEN)) {
64+
return OAuth2Util.AuthSession.fromAccessToken(
65+
restClient,
66+
// threads created here will be daemon threads, so termination of main program
67+
// will terminate the token refresh thread automatically
68+
this.executor,
69+
properties.get(OAuth2Properties.TOKEN),
70+
null, /* defaultExpiresAtMillis */
71+
parent
72+
);
73+
}
74+
75+
throw new IllegalArgumentException("Unable to construct authenticated session with the provided properties.");
76+
}
77+
78+
/**
79+
* Get refreshed authentication headers for session.
80+
*/
81+
public Map<String, String> getSessionHeaders() {
82+
return this.authSession.headers();
83+
}
84+
85+
@Override
86+
public void close() throws IOException {
87+
try (restClient; executor) {}
88+
}
89+
90+
}

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/ETagManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* Generic interface to provide and store ETags for tables within catalogs. This allows the storage
2727
* of the ETag to be completely independent from the tool.
2828
*/
29-
public interface ETagManager {
29+
public interface ETagManager extends AutoCloseable {
3030

3131
/**
3232
* Used to initialize the instance for use. Should be called prior to

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/NoOpETagManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,8 @@ public String getETag(String catalogName, TableIdentifier tableIdentifier) {
3535

3636
@Override
3737
public void storeETag(String catalogName, TableIdentifier tableIdentifier, String etag) {}
38+
39+
@Override
40+
public void close() {}
41+
3842
}

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/catalog/PolarisCatalog.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.iceberg.rest.ResourcePaths;
4141
import org.apache.iceberg.rest.responses.LoadTableResponse;
4242
import org.apache.iceberg.rest.responses.LoadTableResponseParser;
43-
import org.apache.polaris.tools.sync.polaris.http.OAuth2Util;
43+
import org.apache.polaris.tools.sync.polaris.auth.AuthenticationSessionWrapper;
4444

4545
/**
4646
* Overrides loadTable default implementation to issue a custom loadTable request to the Polaris
@@ -58,14 +58,14 @@ public class PolarisCatalog extends RESTCatalog
5858

5959
private Map<String, String> properties = null;
6060

61-
private String accessToken = null;
62-
6361
private HttpClient httpClient = null;
6462

6563
private ObjectMapper objectMapper = null;
6664

6765
private ResourcePaths resourcePaths = null;
6866

67+
private AuthenticationSessionWrapper authenticationSession = null;
68+
6969
public PolarisCatalog() {
7070
super();
7171
}
@@ -82,22 +82,8 @@ public void initialize(String name, Map<String, String> props) {
8282

8383
super.initialize(name, props);
8484

85-
if (accessToken == null || httpClient == null || this.objectMapper == null) {
86-
String oauth2ServerUri = props.get("uri") + "/v1/oauth/tokens";
87-
String credential = props.get("credential");
88-
89-
String clientId = credential.split(":")[0];
90-
String clientSecret = credential.split(":")[1];
91-
92-
String scope = props.get("scope");
93-
94-
// TODO: Add token refresh
95-
try {
96-
this.accessToken = OAuth2Util.fetchToken(oauth2ServerUri, clientId, clientSecret, scope);
97-
} catch (IOException e) {
98-
throw new RuntimeException(e);
99-
}
100-
85+
if (authenticationSession == null || httpClient == null || this.objectMapper == null) {
86+
this.authenticationSession = new AuthenticationSessionWrapper(this.properties);
10187
this.httpClient = HttpClient.newBuilder().build();
10288
this.objectMapper = new ObjectMapper();
10389
}
@@ -127,9 +113,10 @@ public Table loadTable(TableIdentifier ident, String etag) {
127113
HttpRequest.Builder requestBuilder =
128114
HttpRequest.newBuilder()
129115
.uri(URI.create(tablePath))
130-
.header(HttpHeaders.AUTHORIZATION, "Bearer " + accessToken)
131116
.GET();
132117

118+
this.authenticationSession.getSessionHeaders().forEach(requestBuilder::header);
119+
133120
// specify last known etag in if-none-match header
134121
if (etag != null) {
135122
requestBuilder.header(HttpHeaders.IF_NONE_MATCH, etag);
@@ -171,4 +158,21 @@ public Table loadTable(TableIdentifier ident, String etag) {
171158

172159
return new BaseTable(ops, CatalogUtil.fullTableName(catalogName, ident));
173160
}
161+
162+
@Override
163+
public void close() throws IOException {
164+
final AuthenticationSessionWrapper session = this.authenticationSession;
165+
final HttpClient httpClient = this.httpClient;
166+
167+
try (session; httpClient) {
168+
super.close();
169+
} finally {
170+
this.authenticationSession = null;
171+
this.httpClient = null;
172+
this.objectMapper = null;
173+
this.resourcePaths = null;
174+
}
175+
176+
super.close();
177+
}
174178
}

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/http/HttpUtil.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)