Skip to content

Commit 6576fb4

Browse files
committed
Replace OkHttp with Elastic Java Low-Level Client
1 parent 80be218 commit 6576fb4

File tree

3 files changed

+35
-59
lines changed

3 files changed

+35
-59
lines changed

flink-connector-jdbc/pom.xml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ under the License.
3838
<scala.binary.version>2.12</scala.binary.version>
3939
<scala-library.version>2.12.7</scala-library.version>
4040
<assertj.version>3.23.1</assertj.version>
41+
<jackson.version>2.15.2</jackson.version>
4142
<postgres.version>42.5.1</postgres.version>
4243
<oracle.version>21.8.0.0</oracle.version>
4344
<trino.version>418</trino.version>
4445
<byte-buddy.version>1.12.10</byte-buddy.version>
46+
<elasticsearch.version>8.8.1</elasticsearch.version>
4547
</properties>
4648

4749
<dependencies>
@@ -247,25 +249,25 @@ under the License.
247249
<dependency>
248250
<groupId>org.elasticsearch.plugin</groupId>
249251
<artifactId>x-pack-sql-jdbc</artifactId>
250-
<version>8.8.1</version>
252+
<version>${elasticsearch.version}</version>
251253
<scope>test</scope>
252254
</dependency>
253255
<dependency>
254-
<groupId>com.squareup.okhttp3</groupId>
255-
<artifactId>okhttp</artifactId>
256-
<version>4.11.0</version>
256+
<groupId>org.elasticsearch.client</groupId>
257+
<artifactId>elasticsearch-rest-client</artifactId>
258+
<version>${elasticsearch.version}</version>
257259
<scope>test</scope>
258260
</dependency>
259261
<dependency>
260262
<groupId>com.fasterxml.jackson.core</groupId>
261263
<artifactId>jackson-databind</artifactId>
262-
<version>2.15.2</version>
264+
<version>${jackson.version}</version>
263265
<scope>test</scope>
264266
</dependency>
265267
<dependency>
266268
<groupId>com.fasterxml.jackson.datatype</groupId>
267269
<artifactId>jackson-datatype-jsr310</artifactId>
268-
<version>2.15.2</version>
270+
<version>${jackson.version}</version>
269271
<scope>test</scope>
270272
</dependency>
271273

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class ElasticsearchDatabase extends DatabaseExtension implements Elastics
3939
.waitingFor(
4040
Wait.forHttp("/")
4141
.withBasicCredentials(USERNAME, PASSWORD)
42-
.withReadTimeout(Duration.ofMinutes(1)));
42+
.withReadTimeout(Duration.ofMinutes(2)));
4343

4444
private static ElasticsearchMetadata metadata;
4545
private static ElasticsearchRestClient client;

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java

Lines changed: 26 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@
2020

2121
import com.fasterxml.jackson.databind.DeserializationFeature;
2222
import com.fasterxml.jackson.databind.ObjectMapper;
23-
import okhttp3.Credentials;
24-
import okhttp3.MediaType;
25-
import okhttp3.OkHttpClient;
26-
import okhttp3.Request;
27-
import okhttp3.RequestBody;
28-
import okhttp3.Response;
29-
import okhttp3.ResponseBody;
23+
import org.apache.http.HttpHost;
24+
import org.apache.http.auth.AuthScope;
25+
import org.apache.http.auth.UsernamePasswordCredentials;
26+
import org.apache.http.client.CredentialsProvider;
27+
import org.apache.http.impl.client.BasicCredentialsProvider;
28+
import org.apache.http.util.EntityUtils;
29+
import org.elasticsearch.client.Request;
30+
import org.elasticsearch.client.RestClient;
3031
import org.junit.jupiter.api.Assertions;
3132

3233
import java.io.IOException;
@@ -41,10 +42,7 @@ public class ElasticsearchRestClient {
4142
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
4243
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
4344

44-
private final String host;
45-
private final int port;
46-
private final String username;
47-
private final String password;
45+
private final RestClient restClient;
4846

4947
public ElasticsearchRestClient(ElasticsearchMetadata metadata) {
5048
this(metadata.getContainerHost(),
@@ -54,72 +52,48 @@ public ElasticsearchRestClient(ElasticsearchMetadata metadata) {
5452
}
5553

5654
public ElasticsearchRestClient(String host, int port, String username, String password) {
57-
this.host = host;
58-
this.port = port;
59-
this.username = username;
60-
this.password = password;
55+
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
56+
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
57+
this.restClient = RestClient.builder(new HttpHost(host, port, "http"))
58+
.setHttpClientConfigCallback(builder -> builder.setDefaultCredentialsProvider(credentialsProvider))
59+
.build();
6160
}
6261

6362
public boolean trialEnabled() throws Exception {
64-
Request request = requestWithAuthorization()
65-
.url(format("http://%s:%d/_license", host, port))
66-
.get()
67-
.build();
63+
Request request = new Request("GET", "/_license");
6864
ElasticLicenseResponse response = executeRequest(request, ElasticLicenseResponse.class);
6965
return response != null && response.license.status.equals("active") && response.license.type.equals("trial");
7066
}
7167

7268
public void enableTrial() throws Exception {
73-
Request request = requestWithAuthorization()
74-
.url(format("http://%s:%d/_license/start_trial?acknowledge=true", host, port))
75-
.post(RequestBody.create(new byte[]{}))
76-
.build();
77-
executeRequest(request);
69+
executeRequest(new Request("POST", "/_license/start_trial?acknowledge=true"));
7870
}
7971

8072
public void createIndex(String indexName, String indexDefinition) throws Exception {
81-
Request request = requestWithAuthorization()
82-
.url(format("http://%s:%d/%s/", host, port, indexName))
83-
.put(RequestBody.create(indexDefinition, MediaType.get("application/json")))
84-
.build();
73+
Request request = new Request("PUT", format("/%s/", indexName));
74+
request.setJsonEntity(indexDefinition);
8575
executeRequest(request);
8676
}
8777

8878
public void deleteIndex(String indexName) throws Exception {
89-
Request request = requestWithAuthorization()
90-
.url(format("http://%s:%d/%s/", host, port, indexName))
91-
.delete()
92-
.build();
93-
executeRequest(request);
79+
executeRequest(new Request("DELETE", format("/%s/", indexName)));
9480
}
9581

9682
public void addDataBulk(String indexName, String content) throws Exception {
97-
Request request = requestWithAuthorization()
98-
.url(format("http://%s:%d/%s/_bulk?refresh=true", host, port, indexName))
99-
.post(RequestBody.create(content, MediaType.get("application/json")))
100-
.build();
83+
Request request = new Request("PUT", format("/%s/_bulk?refresh=true", indexName));
84+
request.setJsonEntity(content);
10185
executeRequest(request);
10286
}
10387

10488
private <T> T executeRequest(Request request, Class<T> outputClass) throws IOException {
105-
OkHttpClient client = new OkHttpClient();
106-
try (Response response = client.newCall(request).execute()) {
107-
ResponseBody body = response.body();
108-
Assertions.assertTrue(response.isSuccessful());
109-
Assertions.assertNotNull(body);
110-
return OBJECT_MAPPER.readValue(body.string(), outputClass);
111-
}
89+
org.elasticsearch.client.Response response = restClient.performRequest(request);
90+
Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
91+
return OBJECT_MAPPER.readValue(EntityUtils.toString(response.getEntity()), outputClass);
11292
}
11393

11494
private void executeRequest(Request request) throws IOException {
115-
OkHttpClient client = new OkHttpClient();
116-
try (Response response = client.newCall(request).execute()) {
117-
Assertions.assertTrue(response.isSuccessful());
118-
}
119-
}
120-
121-
private Request.Builder requestWithAuthorization() {
122-
return new Request.Builder().addHeader("Authorization", Credentials.basic(username, password));
95+
org.elasticsearch.client.Response response = restClient.performRequest(request);
96+
Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
12397
}
12498

12599
private static class ElasticLicenseResponse {

0 commit comments

Comments
 (0)