Skip to content

Commit 671c5b4

Browse files
authored
[core] RESTCatalog: support DLFECSTokenLoader (#5312)
1 parent 92a6b27 commit 671c5b4

File tree

7 files changed

+395
-0
lines changed

7 files changed

+395
-0
lines changed

docs/content/concepts/rest-catalog.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,23 @@ WITH (
109109
);
110110
```
111111

112+
- DLF sts token using aliyun ecs role
113+
```sql
114+
CREATE CATALOG `paimon-rest-catalog`
115+
WITH (
116+
'type' = 'paimon',
117+
'uri' = '<catalog server url>',
118+
'metastore' = 'rest',
119+
'warehouse' = 'my_instance_name',
120+
'token.provider' = 'dlf',
121+
'dlf.token-loader' = 'ecs'
122+
'dlf.token-ecs-role-name' = 'my_ecs_role_name'
123+
);
124+
```
125+
112126
{{< hint info >}}
113127
The `'warehouse'` is your catalog instance name on the server, not the path.
128+
The `'dlf.token-ecs-role-name'` is an optional parameter,dlf token loader can automatically obtain it through ecs metadata service. More information about ecs role can be found [here](https://help.aliyun.com/zh/ecs/user-guide/attach-an-instance-ram-role-to-an-ecs-instance?spm=a2c4g.11186623.0.0.7e774adcRAJ0wK).
114129
{{< /hint >}}
115130

116131
## Conclusion

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,17 @@ public class RESTCatalogOptions {
8585
.stringType()
8686
.noDefaultValue()
8787
.withDescription("REST Catalog auth DLF token loader.");
88+
89+
public static final ConfigOption<String> DLF_TOKEN_ECS_METADATA_URL =
90+
ConfigOptions.key("dlf.token-ecs-metadata-url")
91+
.stringType()
92+
.defaultValue(
93+
"http://100.100.100.200/latest/meta-data/Ram/security-credentials/")
94+
.withDescription("REST Catalog auth DLF token ecs metadata url.");
95+
96+
public static final ConfigOption<String> DLF_TOKEN_ECS_ROLE_NAME =
97+
ConfigOptions.key("dlf.token-ecs-role-name")
98+
.stringType()
99+
.noDefaultValue()
100+
.withDescription("REST Catalog auth DLF token ecs role name.");
88101
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.rest.auth;
20+
21+
import org.apache.paimon.annotation.VisibleForTesting;
22+
import org.apache.paimon.rest.ExponentialHttpRetryInterceptor;
23+
24+
import okhttp3.OkHttpClient;
25+
import okhttp3.Request;
26+
import okhttp3.Response;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import javax.annotation.Nullable;
31+
32+
import java.io.IOException;
33+
import java.io.UncheckedIOException;
34+
import java.time.Duration;
35+
import java.util.Arrays;
36+
37+
import static okhttp3.ConnectionSpec.CLEARTEXT;
38+
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
39+
import static okhttp3.ConnectionSpec.MODERN_TLS;
40+
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
41+
42+
/** DLF Token Loader for ECS Metadata Service. */
43+
public class DLFECSTokenLoader implements DLFTokenLoader {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(DLFECSTokenLoader.class);
46+
47+
private static final OkHttpClient HTTP_CLIENT =
48+
new OkHttpClient.Builder()
49+
.retryOnConnectionFailure(true)
50+
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT))
51+
.addInterceptor(new ExponentialHttpRetryInterceptor(3))
52+
.connectTimeout(Duration.ofMinutes(3))
53+
.readTimeout(Duration.ofMinutes(3))
54+
.build();
55+
56+
private String ecsMetadataURL;
57+
58+
private String roleName;
59+
60+
public DLFECSTokenLoader(String ecsMetaDataURL, @Nullable String roleName) {
61+
this.ecsMetadataURL = ecsMetaDataURL;
62+
this.roleName = roleName;
63+
}
64+
65+
@Override
66+
public DLFToken loadToken() {
67+
if (roleName == null) {
68+
roleName = getRole(ecsMetadataURL);
69+
}
70+
return getToken(ecsMetadataURL + roleName);
71+
}
72+
73+
private static String getRole(String url) {
74+
try {
75+
return getResponseBody(url);
76+
} catch (Exception e) {
77+
throw new RuntimeException("get role failed, error : " + e.getMessage(), e);
78+
}
79+
}
80+
81+
private static DLFToken getToken(String url) {
82+
try {
83+
String token = getResponseBody(url);
84+
return OBJECT_MAPPER.readValue(token, DLFToken.class);
85+
} catch (IOException e) {
86+
throw new UncheckedIOException(e);
87+
} catch (Exception e) {
88+
throw new RuntimeException("get token failed, error : " + e.getMessage(), e);
89+
}
90+
}
91+
92+
@VisibleForTesting
93+
protected static String getResponseBody(String url) {
94+
Request request = new Request.Builder().url(url).get().build();
95+
long startTime = System.currentTimeMillis();
96+
try (Response response = HTTP_CLIENT.newCall(request).execute()) {
97+
if (response == null) {
98+
throw new RuntimeException("get response failed, response is null");
99+
}
100+
if (!response.isSuccessful()) {
101+
throw new RuntimeException("get response failed, response : " + response);
102+
}
103+
String responseBodyStr = response.body() != null ? response.body().string() : null;
104+
if (responseBodyStr == null) {
105+
throw new RuntimeException("get response failed, response body is null");
106+
}
107+
if (LOG.isDebugEnabled()) {
108+
LOG.debug(
109+
"get response success, url : {}, cost : {} ms",
110+
url,
111+
System.currentTimeMillis() - startTime);
112+
}
113+
return responseBodyStr;
114+
} catch (RuntimeException e) {
115+
throw e;
116+
} catch (Exception e) {
117+
throw new RuntimeException("get response failed, error : " + e.getMessage(), e);
118+
}
119+
}
120+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.rest.auth;
20+
21+
import org.apache.paimon.options.Options;
22+
23+
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL;
24+
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
25+
26+
/** Factory for {@link DLFECSTokenLoader}. */
27+
public class DLFECSTokenLoaderFactory implements DLFTokenLoaderFactory {
28+
29+
@Override
30+
public String identifier() {
31+
return "ecs";
32+
}
33+
34+
@Override
35+
public DLFTokenLoader create(Options options) {
36+
String ecsMetadataURL = options.get(DLF_TOKEN_ECS_METADATA_URL);
37+
String roleName = options.get(DLF_TOKEN_ECS_ROLE_NAME);
38+
return new DLFECSTokenLoader(ecsMetadataURL, roleName);
39+
}
40+
}

paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataFactory
4141
org.apache.paimon.rest.auth.BearTokenAuthProviderFactory
4242
org.apache.paimon.rest.auth.DLFAuthProviderFactory
4343
org.apache.paimon.rest.auth.DLFLocalFileTokenLoaderFactory
44+
org.apache.paimon.rest.auth.DLFECSTokenLoaderFactory

paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET;
4747
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_REGION;
4848
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_SECURITY_TOKEN;
49+
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL;
50+
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME;
4951
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_LOADER;
5052
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_TOKEN_PATH;
5153
import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN;
@@ -291,6 +293,124 @@ public void testCreateDlfAuthProviderByCustomDLFTokenLoader()
291293
Assert.assertEquals(fetchToken.getSecurityToken(), customToken.getSecurityToken());
292294
}
293295

296+
@Test
297+
public void testCreateDlfAuthProviderByECSTokenProvider()
298+
throws IOException, InterruptedException {
299+
MockECSMetadataService mockECSMetadataService = new MockECSMetadataService("EcsTestRole");
300+
mockECSMetadataService.start();
301+
try {
302+
DLFToken theFirstMockToken = generateToken();
303+
mockECSMetadataService.setMockToken(theFirstMockToken);
304+
String theFirstMockTokenStr =
305+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
306+
long tokenRefreshInMills = 1000;
307+
// create options with token loader
308+
Options options = new Options();
309+
options.set(DLF_TOKEN_LOADER.key(), "ecs");
310+
options.set(
311+
DLF_TOKEN_ECS_METADATA_URL.key(),
312+
mockECSMetadataService.getUrl() + "latest/meta-data/Ram/security-credentials/");
313+
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
314+
options.set(DLF_REGION.key(), "cn-hangzhou");
315+
options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
316+
AuthProvider authProvider =
317+
AuthProviderFactory.createAuthProvider(
318+
AuthProviderEnum.DLF.identifier(), options);
319+
ScheduledExecutorService executor =
320+
ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
321+
AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider);
322+
DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider();
323+
String theFirstFetchTokenStr =
324+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
325+
assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
326+
327+
DLFToken theSecondMockToken = generateToken();
328+
String theSecondMockTokenStr =
329+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
330+
mockECSMetadataService.setMockToken(theSecondMockToken);
331+
Thread.sleep(tokenRefreshInMills * 2);
332+
String theSecondFetchTokenStr =
333+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
334+
assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
335+
} finally {
336+
mockECSMetadataService.shutdown();
337+
}
338+
}
339+
340+
@Test
341+
public void testCreateDlfAuthProviderByECSTokenProviderWithDefineRole()
342+
throws IOException, InterruptedException {
343+
MockECSMetadataService mockECSMetadataService = new MockECSMetadataService("CustomRole");
344+
mockECSMetadataService.start();
345+
try {
346+
DLFToken theFirstMockToken = generateToken();
347+
mockECSMetadataService.setMockToken(theFirstMockToken);
348+
String theFirstMockTokenStr =
349+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theFirstMockToken);
350+
long tokenRefreshInMills = 1000;
351+
// create options with token loader
352+
Options options = new Options();
353+
options.set(DLF_TOKEN_LOADER.key(), "ecs");
354+
options.set(
355+
DLF_TOKEN_ECS_METADATA_URL.key(),
356+
mockECSMetadataService.getUrl() + "latest/meta-data/Ram/security-credentials/");
357+
options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
358+
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
359+
options.set(DLF_REGION.key(), "cn-hangzhou");
360+
options.set(TOKEN_REFRESH_TIME.key(), tokenRefreshInMills + "ms");
361+
AuthProvider authProvider =
362+
AuthProviderFactory.createAuthProvider(
363+
AuthProviderEnum.DLF.identifier(), options);
364+
ScheduledExecutorService executor =
365+
ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token");
366+
AuthSession session = AuthSession.fromRefreshAuthProvider(executor, authProvider);
367+
DLFAuthProvider dlfAuthProvider = (DLFAuthProvider) session.getAuthProvider();
368+
String theFirstFetchTokenStr =
369+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
370+
assertEquals(theFirstFetchTokenStr, theFirstMockTokenStr);
371+
372+
DLFToken theSecondMockToken = generateToken();
373+
String theSecondMockTokenStr =
374+
OBJECT_MAPPER_INSTANCE.writeValueAsString(theSecondMockToken);
375+
mockECSMetadataService.setMockToken(theSecondMockToken);
376+
Thread.sleep(tokenRefreshInMills * 2);
377+
String theSecondFetchTokenStr =
378+
OBJECT_MAPPER_INSTANCE.writeValueAsString(dlfAuthProvider.token);
379+
assertEquals(theSecondFetchTokenStr, theSecondMockTokenStr);
380+
} finally {
381+
mockECSMetadataService.shutdown();
382+
}
383+
}
384+
385+
@Test
386+
public void testCreateDlfAuthProviderByECSTokenProviderWithInvalidRole()
387+
throws IOException, InterruptedException {
388+
MockECSMetadataService mockECSMetadataService = new MockECSMetadataService("EcsTestRole");
389+
mockECSMetadataService.start();
390+
try {
391+
DLFToken theFirstMockToken = generateToken();
392+
mockECSMetadataService.setMockToken(theFirstMockToken);
393+
// create options with token loader
394+
Options options = new Options();
395+
options.set(DLF_TOKEN_LOADER.key(), "ecs");
396+
options.set(
397+
DLF_TOKEN_ECS_METADATA_URL.key(),
398+
mockECSMetadataService.getUrl() + "latest/meta-data/Ram/security-credentials/");
399+
options.set(DLF_TOKEN_ECS_ROLE_NAME.key(), "CustomRole");
400+
options.set(RESTCatalogOptions.URI.key(), "serverUrl");
401+
options.set(DLF_REGION.key(), "cn-hangzhou");
402+
assertThrows(
403+
RuntimeException.class,
404+
() -> {
405+
AuthProvider authProvider =
406+
AuthProviderFactory.createAuthProvider(
407+
AuthProviderEnum.DLF.identifier(), options);
408+
});
409+
} finally {
410+
mockECSMetadataService.shutdown();
411+
}
412+
}
413+
294414
@Test
295415
public void testDLFAuthProviderAuthHeaderWhenDataIsNotEmpty() throws Exception {
296416
String fileName = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)