Skip to content

Commit 115d000

Browse files
authored
Merge pull request #5 from mtfelisb/fix/httphost
Using HttpHost instead of host and port separately
2 parents f2a0e7b + 41cfc7a commit 115d000

File tree

6 files changed

+28
-70
lines changed

6 files changed

+28
-70
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.mtfelisb</groupId>
88
<artifactId>flink-connector-elasticsearch</artifactId>
9-
<version>1.0.0</version>
9+
<version>2.0.0</version>
1010
<packaging>jar</packaging>
1111

1212
<licenses>

src/main/java/com/mtfelisb/flink/connectors/elasticsearch/sink/ElasticsearchSinkBuilder.java

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
package com.mtfelisb.flink.connectors.elasticsearch.sink;
2323

24+
import org.apache.http.HttpHost;
25+
2426
import java.io.IOException;
2527

2628
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -34,16 +36,10 @@
3436
*/
3537
public class ElasticsearchSinkBuilder<T> {
3638
/**
37-
* The host where the Elasticsearch cluster is reachable
38-
*
39-
*/
40-
private String host;
41-
42-
/**
43-
* The port where the Elasticsearch cluster is reachable
39+
* The HttpHost where the Elasticsearch cluster is reachable
4440
*
4541
*/
46-
private int port;
42+
private HttpHost httpHost;
4743

4844
/**
4945
* The username to authenticate the connection with the Elasticsearch cluster
@@ -70,29 +66,15 @@ public class ElasticsearchSinkBuilder<T> {
7066
private Emitter<T> emitter;
7167

7268
/**
73-
* setHost
74-
* set the host where the Elasticsearch cluster is reachable
69+
* setHttpHost
70+
* set the HttpHost where the Elasticsearch cluster is reachable
7571
*
76-
* @param host the host address
72+
* @param httpHost the host address
7773
* @return this builder
7874
*/
79-
public ElasticsearchSinkBuilder<T> setHost(String host) {
80-
checkNotNull(host);
81-
checkState(host.length() > 0, "Host cannot be empty");
82-
this.host = host;
83-
return this;
84-
}
85-
86-
/**
87-
* setPort
88-
* set the port where the Elasticsearch cluster is reachable
89-
*
90-
* @param port the port number
91-
* @return
92-
*/
93-
public ElasticsearchSinkBuilder<T> setPort(int port) {
94-
// no need to checkNotNull() here since int can't be null
95-
this.port = port;
75+
public ElasticsearchSinkBuilder<T> setHttpHost(HttpHost httpHost) {
76+
checkNotNull(httpHost);
77+
this.httpHost = httpHost;
9678
return this;
9779
}
9880

@@ -159,7 +141,7 @@ public ElasticsearchSink<T> build() {
159141
validate();
160142

161143
return new ElasticsearchSink<T>(
162-
new NetworkConfigFactory(host, port, username, password),
144+
new NetworkConfigFactory(httpHost, username, password),
163145
emitter,
164146
threshold,
165147
new BulkRequestFactory()
@@ -172,8 +154,7 @@ public static <T> ElasticsearchSinkBuilder<T> builder() {
172154

173155
private void validate() {
174156
this.setEmitter(this.emitter);
175-
this.setHost(this.host);
176-
this.setPort(this.port);
157+
this.setHttpHost(this.httpHost);
177158
this.setThreshold(this.threshold);
178159
}
179160
}

src/main/java/com/mtfelisb/flink/connectors/elasticsearch/sink/NetworkConfigFactory.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,24 @@
4545
public class NetworkConfigFactory implements INetworkConfigFactory {
4646
private final static Logger LOG = LogManager.getLogger(NetworkConfigFactory.class);
4747

48-
private final String host;
49-
50-
private final int port;
48+
private final HttpHost httpHost;
5149

5250
private final String username;
5351

5452
private final String password;
5553

56-
public NetworkConfigFactory(String host, int port, String username, String password) {
57-
this.host = checkNotNull(host);
58-
this.port = checkNotNull(port);
54+
public NetworkConfigFactory(HttpHost httpHost, String username, String password) {
55+
this.httpHost = checkNotNull(httpHost);
5956
this.username = username;
6057
this.password = password;
6158
}
6259

6360
@Override
6461
public ElasticsearchClient create() {
65-
LOG.debug("Http client host {} and port {}", host, port);
62+
LOG.debug("Http client hostname {}", httpHost.toHostString());
6663

6764
// Create the low-level client
68-
RestClient restClient = RestClient.builder(new HttpHost(host, port))
65+
RestClient restClient = RestClient.builder(httpHost)
6966
.setHttpClientConfigCallback(httpClientBuilder ->
7067
(username != null && password != null) ?
7168
httpClientBuilder.setDefaultCredentialsProvider(getCredentials())

src/test/java/com/mtfelisb/flink/connectors/elasticsearch/sink/ElasticsearchSinkBuilderTest.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
package com.mtfelisb.flink.connectors.elasticsearch.sink;
2323

24+
import org.apache.http.HttpHost;
2425
import org.junit.jupiter.api.Test;
2526

2627
import static org.junit.Assert.assertEquals;
@@ -53,8 +54,7 @@ public void sinkBuilderSetEmitter() {
5354
Throwable exception = assertThrows(
5455
NullPointerException.class,
5556
() -> ElasticsearchSinkBuilder.<String>builder()
56-
.setPort(9200)
57-
.setHost("localhost")
57+
.setHttpHost(new HttpHost("localhost", 9200))
5858
.setPassword("password")
5959
.setUsername("username")
6060
.setEmitter(null)
@@ -66,7 +66,7 @@ public void sinkBuilderSetEmitter() {
6666
}
6767

6868
/**
69-
* The host should be declared to create
69+
* The httpHost should be declared to create
7070
* a valid ElasticsearchSink instance
7171
*
7272
*/
@@ -75,30 +75,13 @@ public void sinkBuilderSetHost() {
7575
Throwable exception = assertThrows(
7676
NullPointerException.class,
7777
() -> ElasticsearchSinkBuilder.<String>builder()
78-
.setHost(null)
78+
.setHttpHost(null)
7979
.build()
8080
);
8181

8282
assertEquals(null, exception.getMessage());
8383
}
8484

85-
/**
86-
* The host should be declared to create
87-
* a valid ElasticsearchSink instance
88-
*
89-
*/
90-
@Test
91-
public void sinkBuilderSetHostEmpty() {
92-
Throwable exception = assertThrows(
93-
IllegalStateException.class,
94-
() -> ElasticsearchSinkBuilder.<String>builder()
95-
.setHost("")
96-
.build()
97-
);
98-
99-
assertEquals("Host cannot be empty", exception.getMessage());
100-
}
101-
10285
/**
10386
* The username, if provided, cannot be null to create
10487
* a valid ElasticsearchSink instance

src/test/java/com/mtfelisb/flink/connectors/elasticsearch/sink/ElasticsearchSinkTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ public void indexingByThresholdReached() throws Exception {
6060

6161
final ElasticsearchSink<DummyData> sink = ElasticsearchSinkBuilder.<DummyData>builder()
6262
.setThreshold(2L)
63-
.setHost(ES_CONTAINER.getHost())
64-
.setPort(ES_CONTAINER.getFirstMappedPort())
63+
.setHttpHost(HttpHost.create(ES_CONTAINER.getHttpHostAddress()))
6564
.setEmitter(
6665
(value, op, ctx) ->
6766
(BulkOperation.Builder) op
@@ -102,8 +101,7 @@ public void indexingByCheckpoint() throws Exception {
102101

103102
final ElasticsearchSink<DummyData> sink = ElasticsearchSinkBuilder.<DummyData>builder()
104103
.setThreshold(1000L)
105-
.setHost(ES_CONTAINER.getHost())
106-
.setPort(ES_CONTAINER.getFirstMappedPort())
104+
.setHttpHost(HttpHost.create(ES_CONTAINER.getHttpHostAddress()))
107105
.setEmitter(
108106
(value, op, ctx) ->
109107
(BulkOperation.Builder) op

src/test/java/com/mtfelisb/flink/connectors/elasticsearch/sink/NetworkConfigFactoryTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package com.mtfelisb.flink.connectors.elasticsearch.sink;
2323

2424
import co.elastic.clients.elasticsearch.ElasticsearchClient;
25+
import org.apache.http.HttpHost;
2526
import org.testcontainers.junit.jupiter.Testcontainers;
2627
import static org.junit.Assert.assertEquals;
2728
import static org.junit.Assert.assertThrows;
@@ -38,7 +39,7 @@ public class NetworkConfigFactoryTest extends ElasticsearchSinkBaseITCase {
3839
public void requiredHost() {
3940
Throwable exception = assertThrows(
4041
NullPointerException.class,
41-
() -> new NetworkConfigFactory(null, 9200, "username", "password"));
42+
() -> new NetworkConfigFactory(null, "username", "password"));
4243

4344
assertEquals(null, exception.getMessage());
4445
}
@@ -52,8 +53,7 @@ public void requiredHost() {
5253
@Test
5354
public void create() throws IOException {
5455
ElasticsearchClient esClient = new NetworkConfigFactory(
55-
ES_CONTAINER.getHost(),
56-
ES_CONTAINER.getFirstMappedPort(),
56+
HttpHost.create(ES_CONTAINER.getHttpHostAddress()),
5757
null,
5858
null
5959
).create();
@@ -70,8 +70,7 @@ public void create() throws IOException {
7070
@Test
7171
public void createWithUsernameAndPassword() throws IOException {
7272
ElasticsearchClient esClient = new NetworkConfigFactory(
73-
ES_AUTH_CONTAINER.getHost(),
74-
ES_AUTH_CONTAINER.getFirstMappedPort(),
73+
HttpHost.create(ES_CONTAINER.getHttpHostAddress()),
7574
"elastic",
7675
"generic-pass"
7776
).create();

0 commit comments

Comments
 (0)