Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions docs/content/concepts/rest-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,48 +51,48 @@ Paimon REST Catalog provides a lightweight implementation to access the catalog
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'bear'
'token' = '<token>'
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'bear'
'token' = '<token>'
);
```
- DLF ak
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'dlf',
'dlf.accessKeyId'='<accessKeyId>',
'dlf.accessKeySecret'='<accessKeySecret>',
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'dlf',
'dlf.accessKeyId'='<accessKeyId>',
'dlf.accessKeySecret'='<accessKeySecret>',
);
```

- DLF sts token
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'dlf',
'dlf.accessKeyId'='<accessKeyId>',
'dlf.accessKeySecret'='<accessKeySecret>',
'dlf.securityToken'='<securityToken>'
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'dlf',
'dlf.accessKeyId'='<accessKeyId>',
'dlf.accessKeySecret'='<accessKeySecret>',
'dlf.securityToken'='<securityToken>'
);
```

- DLF sts token path
```sql
CREATE CATALOG `paimon-rest-catalog`
WITH (
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'dlf'
'type' = 'paimon',
'uri' = '<catalog server url>',
'metastore' = 'rest',
'token.provider' = 'dlf'
);
```

Expand Down
78 changes: 18 additions & 60 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.rest;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.auth.RESTAuthFunction;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.RESTException;
Expand All @@ -29,57 +28,51 @@

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
import static okhttp3.ConnectionSpec.MODERN_TLS;
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;

/** HTTP client for REST catalog. */
public class HttpClient implements RESTClient {

private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
private static final OkHttpClient HTTP_CLIENT =
new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT))
.addInterceptor(new ExponentialHttpRetryInterceptor(5))
.connectTimeout(Duration.ofMinutes(3))
.readTimeout(Duration.ofMinutes(3))
.build();

private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");
private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000;

private final OkHttpClient okHttpClient;
private final String uri;

private ErrorHandler errorHandler;

public HttpClient(Options options) {
this(HttpClientOptions.create(options));
}

public HttpClient(HttpClientOptions httpClientOptions) {
if (httpClientOptions.uri() != null && httpClientOptions.uri().endsWith("/")) {
this.uri = httpClientOptions.uri().substring(0, httpClientOptions.uri().length() - 1);
public HttpClient(String uri) {
if (uri != null && uri.endsWith("/")) {
this.uri = uri.substring(0, uri.length() - 1);
} else {
this.uri = httpClientOptions.uri();
this.uri = uri;
}
this.okHttpClient = createHttpClient(httpClientOptions);
this.errorHandler = DefaultErrorHandler.getInstance();
}

Expand Down Expand Up @@ -160,14 +153,8 @@ public <T extends RESTResponse> T delete(
}
}

@Override
public void close() throws IOException {
okHttpClient.dispatcher().cancelAll();
okHttpClient.connectionPool().evictAll();
}

private <T extends RESTResponse> T exec(Request request, Class<T> responseType) {
try (Response response = okHttpClient.newCall(request).execute()) {
try (Response response = HTTP_CLIENT.newCall(request).execute()) {
String responseBodyStr = response.body() != null ? response.body().string() : null;
if (!response.isSuccessful()) {
ErrorResponse error;
Expand Down Expand Up @@ -203,38 +190,6 @@ private static RequestBody buildRequestBody(String body) throws JsonProcessingEx
return RequestBody.create(body.getBytes(StandardCharsets.UTF_8), MEDIA_TYPE);
}

private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions) {
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
ExecutorService executorService =
createCachedThreadPool(httpClientOptions.threadPoolSize(), THREAD_NAME, workQueue);
ConnectionPool connectionPool =
new ConnectionPool(
httpClientOptions.maxConnections(),
CONNECTION_KEEP_ALIVE_DURATION_MS,
TimeUnit.MILLISECONDS);
Dispatcher dispatcher = new Dispatcher(executorService);
// set max requests per host use max connections
dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections());
OkHttpClient.Builder builder =
new OkHttpClient.Builder()
.dispatcher(dispatcher)
.retryOnConnectionFailure(true)
.connectionPool(connectionPool)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT))
.addInterceptor(
new ExponentialHttpRetryInterceptor(
httpClientOptions.maxRetries()));
httpClientOptions
.connectTimeout()
.ifPresent(
timeoutDuration -> {
builder.connectTimeout(timeoutDuration);
builder.readTimeout(timeoutDuration);
});

return builder.build();
}

private String getRequestUrl(String path) {
return StringUtils.isNullOrWhitespaceOnly(path) ? uri : uri + path;
}
Expand Down Expand Up @@ -274,4 +229,7 @@ protected static Pair<String, Map<String, String>> parsePath(String path) {
));
return Pair.of(resourcePath, parameters);
}

@Override
public void close() {}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public RESTCatalog(CatalogContext context) {
}

public RESTCatalog(CatalogContext context, boolean configRequired) {
this.client = new HttpClient(context.options());
this.client = new HttpClient(context.options().get(RESTCatalogOptions.URI));
AuthSession catalogAuth = createAuthSession(context.options(), tokenRefreshExecutor());
Options options = context.options();
Map<String, String> baseHeaders = Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,6 @@ public class RESTCatalogOptions {
.noDefaultValue()
.withDescription("REST Catalog server's uri.");

public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
ConfigOptions.key("rest.client.connection-timeout")
.durationType()
.defaultValue(Duration.ofSeconds(180))
.withDescription("REST Catalog http client connect timeout.");

public static final ConfigOption<Integer> MAX_CONNECTIONS =
ConfigOptions.key("rest.client.max-connections")
.intType()
.defaultValue(100)
.withDescription("REST Catalog http client's max connections.");

public static final ConfigOption<Integer> MAX_RETIES =
ConfigOptions.key("rest.client.max-retries")
.intType()
.defaultValue(5)
.withDescription("REST Catalog http client's max retry times.");

public static final ConfigOption<Integer> THREAD_POOL_SIZE =
ConfigOptions.key("rest.client.num-threads")
.intType()
.defaultValue(1)
.withDescription("REST Catalog http client thread num.");

public static final ConfigOption<String> TOKEN =
ConfigOptions.key("token")
.stringType()
Expand Down Expand Up @@ -98,12 +74,6 @@ public class RESTCatalogOptions {
.noDefaultValue()
.withDescription("REST Catalog auth DLF security token");

public static final ConfigOption<String> DLF_ROLE_SESSION_NAME =
ConfigOptions.key("dlf.roleSessionName")
.stringType()
.noDefaultValue()
.withDescription("REST Catalog auth DLF role session name");

public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
ConfigOptions.key("data-token.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.junit.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -63,14 +62,12 @@ public void setUp() throws Exception {
server = new TestHttpWebServer(MOCK_PATH);
server.start();
errorHandler = DefaultErrorHandler.getInstance();
HttpClientOptions httpClientOptions =
new HttpClientOptions(server.getBaseUrl(), Duration.ofSeconds(3), 1, 10, 2);
mockResponseData = new MockRESTData(MOCK_PATH);
mockResponseDataStr = server.createResponseBody(mockResponseData);
errorResponseStr =
server.createResponseBody(
new ErrorResponse(ErrorResponseResourceType.DATABASE, "test", "test", 400));
httpClient = new HttpClient(httpClientOptions);
httpClient = new HttpClient(server.getBaseUrl());
httpClient.setErrorHandler(errorHandler);
AuthProvider authProvider = new BearTokenAuthProvider(TOKEN);
AuthSession authSession = new AuthSession(authProvider);
Expand Down Expand Up @@ -134,10 +131,7 @@ public void testDeleteFail() {

@Test
public void testRetry() {
HttpClient httpClient =
new HttpClient(
new HttpClientOptions(
server.getBaseUrl(), Duration.ofSeconds(30), 1, 10, 2));
HttpClient httpClient = new HttpClient(server.getBaseUrl());
server.enqueueResponse(mockResponseDataStr, 429);
server.enqueueResponse(mockResponseDataStr, 200);
assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class, restAuthFunction));
Expand Down
Loading
Loading