Skip to content

Commit 2d89bc0

Browse files
authored
Proxy support: introduces proxy param to setup a proxy URL which sets the setting to ES client. URL is an uri LS type. (#304)
1 parent f190c72 commit 2d89bc0

File tree

5 files changed

+85
-3
lines changed

5 files changed

+85
-3
lines changed

docs/index.asciidoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
360360
| <<plugins-{type}s-{plugin}-hosts>> |<<array,array>>|No
361361
| <<plugins-{type}s-{plugin}-password>> | <<password,password>>|No
362362
| <<plugins-{type}s-{plugin}-pipeline_name>> | <<string,string>>|No
363+
| <<plugins-{type}s-{plugin}-proxy>> | <<uri,uri>>|No
363364
| <<plugins-{type}s-{plugin}-ssl_certificate>> | <<path,path>>|No
364365
| <<plugins-{type}s-{plugin}-ssl_certificate_authorities>> |<<array,array>>|No
365366
| <<plugins-{type}s-{plugin}-ssl_enabled>> | <<boolean,boolean>>|No
@@ -494,6 +495,16 @@ A password when using HTTP Basic Authentication to connect to {es}.
494495
* When present, the event's initial pipeline will _not_ be auto-detected from the event's data stream fields.
495496
* Value may be a {logstash-ref}/event-dependent-configuration.html#sprintf[sprintf-style] template; if any referenced fields cannot be resolved the event will not be routed to an ingest pipeline.
496497

498+
[id="plugins-{type}s-{plugin}-proxy"]
499+
===== `proxy`
500+
501+
* Value type is <<uri,uri>>
502+
* There is no default value for this setting.
503+
504+
Address of the HTTP forward proxy used to connect to the {es} cluster.
505+
An empty string is treated as if proxy was not set.
506+
Environment variables may be used to set this value, e.g. `proxy => '${LS_PROXY:}'`.
507+
497508
[id="plugins-{type}s-{plugin}-ssl_certificate"]
498509
===== `ssl_certificate`
499510

lib/logstash/filters/elastic_integration.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ class LogStash::Filters::ElasticIntegration < LogStash::Filters::Base
4040
# Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance.
4141
config :hosts, :validate => :uri, :list => true
4242

43+
# An HTTP forward proxy to use for connecting to the Elasticsearch cluster.
44+
config :proxy, :validate => :uri
45+
4346
# Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used.
4447
#
4548
# For more details, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[cloud documentation]
@@ -312,6 +315,7 @@ def extract_immutable_config
312315

313316
builder.setHosts @hosts&.map(&:to_s)
314317
builder.setCloudId @cloud_id
318+
builder.setProxy @proxy&.to_s
315319

316320
builder.setSslEnabled @ssl_enabled
317321

src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilder.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class ElasticsearchRestClientBuilder {
6565
private final IdentityConfig identityConfig = new IdentityConfig();
6666
private final RequestAuthConfig requestAuthConfig = new RequestAuthConfig();
6767
private final ElasticApiConfig elasticApiConfig = new ElasticApiConfig();
68+
private final ProxyConfig proxyConfig = new ProxyConfig();
6869
private String userAgentHeaderValue;
6970

7071
public static ElasticsearchRestClientBuilder forCloudId(final String cloudId) {
@@ -122,6 +123,8 @@ public static Optional<ElasticsearchRestClientBuilder> fromPluginConfiguration(f
122123
});
123124
config.cloudAuth().ifPresent(requestAuthConfig::setCloudAuth);
124125
config.apiKey().ifPresent(requestAuthConfig::setApiKey);
126+
}).configureProxy(proxyConfig -> {
127+
config.proxy().ifPresent(proxyConfig::setProxy);
125128
})
126129
);
127130
}
@@ -135,6 +138,11 @@ private static Optional<ElasticsearchRestClientBuilder> builderInit(final Plugin
135138
.or(() -> config.hosts().map(ElasticsearchRestClientBuilder::forURLs));
136139
}
137140

141+
public ElasticsearchRestClientBuilder configureProxy(final Consumer<ProxyConfig> proxyConfigurator) {
142+
proxyConfigurator.accept(this.proxyConfig);
143+
return this;
144+
}
145+
138146
private ElasticsearchRestClientBuilder(final Supplier<RestClientBuilder> restClientBuilderSupplier) {
139147
this.restClientBuilderSupplier = restClientBuilderSupplier;
140148
}
@@ -178,6 +186,7 @@ RestClient build(final RestClientBuilder restClientBuilder) {
178186
}));
179187

180188
this.elasticApiConfig.configureHttpClient(httpClientBuilder);
189+
this.proxyConfig.configureHttpClient(httpClientBuilder);
181190

182191
if (Objects.nonNull(this.userAgentHeaderValue)) {
183192
httpClientBuilder.setUserAgent(this.userAgentHeaderValue);
@@ -467,4 +476,20 @@ public void configureHttpClient(final HttpAsyncClientBuilder httpClientBuilder)
467476
HttpClientConfigurator.forAddInterceptorFirst(productOriginHeaderInterceptor).configure(httpClientBuilder);
468477
}
469478
}
479+
480+
public static class ProxyConfig {
481+
private HttpHost httpProxy;
482+
483+
public synchronized void setProxy(final String proxy) {
484+
if (Objects.nonNull(this.httpProxy)) {
485+
throw new IllegalStateException("Only one proxy may be provided");
486+
}
487+
this.httpProxy = HttpHost.create(proxy);
488+
}
489+
public void configureHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
490+
if (Objects.nonNull(httpProxy)) {
491+
httpClientBuilder.setProxy(httpProxy);
492+
}
493+
}
494+
}
470495
}

src/main/java/co/elastic/logstash/filters/elasticintegration/PluginConfiguration.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public final class PluginConfiguration {
2929
// elasticsearch-source: connection target
3030
private final String cloudId;
3131
private final List<String> hosts;
32+
private final String proxy;
3233
private final Boolean sslEnabled;
3334

3435
// elasticsearch-source: ssl connection trust config
@@ -79,6 +80,7 @@ private PluginConfiguration(final Builder builder) {
7980
this.apiKey = builder.apiKey;
8081
// pipeline name resolver
8182
this.pipelineNameTemplate = builder.pipelineNameTemplate;
83+
this.proxy = builder.proxy;
8284
}
8385

8486
private static <T> List<T> copyOfNullableList(final List<T> source) {
@@ -104,6 +106,10 @@ public Optional<List<URL>> hosts() {
104106
.map(hosts -> hosts.stream().map(PluginConfiguration::uncheckedParseURL).toList());
105107
}
106108

109+
public Optional<String> proxy() {
110+
return Optional.ofNullable(proxy);
111+
}
112+
107113
public Optional<Boolean> sslEnabled() {
108114
return Optional.ofNullable(sslEnabled);
109115
}
@@ -171,6 +177,7 @@ public String toString() {
171177
if (Objects.nonNull(id)) { config.add(String.format("id=%s", id)); }
172178
if (Objects.nonNull(cloudId)) { config.add(String.format("cloudId=%s", cloudId)); }
173179
if (Objects.nonNull(hosts)) { config.add(String.format("hosts=%s", hosts)); }
180+
if (Objects.nonNull(proxy)) { config.add(String.format("proxy=%s", proxy)); }
174181
if (Objects.nonNull(sslEnabled)) { config.add(String.format("sslEnabled=%s", sslEnabled)); }
175182
if (Objects.nonNull(sslVerificationMode)) { config.add(String.format("sslVerificationMode=%s", sslVerificationMode)); }
176183
if (Objects.nonNull(sslTruststorePath)) { config.add(String.format("sslTruststorePath=%s", sslTruststorePath)); }
@@ -203,6 +210,7 @@ public static class Builder {
203210
String id;
204211
String cloudId;
205212
List<String> hosts;
213+
String proxy;
206214
Boolean sslEnabled;
207215
String sslVerificationMode;
208216
String sslTruststorePath;
@@ -240,6 +248,13 @@ public Builder setHosts(final List<String> hosts) {
240248
return this;
241249
}
242250

251+
public Builder setProxy(final String proxy) {
252+
if (Objects.nonNull(proxy) && !proxy.isBlank()) {
253+
this.proxy = proxy;
254+
}
255+
return this;
256+
}
257+
243258
public Builder setSslEnabled(final Boolean sslEnabled) {
244259
this.sslEnabled = sslEnabled;
245260
return this;

src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
import org.apache.http.HttpHost;
1111
import org.apache.http.HttpRequest;
1212
import org.apache.http.HttpRequestInterceptor;
13-
import org.apache.http.message.BasicHeader;
1413
import org.apache.http.message.BasicHttpRequest;
1514
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
1615
import org.elasticsearch.client.RestClient;
1716
import org.elasticsearch.client.RestClientBuilder;
1817
import org.hamcrest.Matchers;
18+
import org.mockito.Mockito;
1919
import org.junit.jupiter.api.Test;
2020

2121
import java.io.IOException;
@@ -24,7 +24,6 @@
2424
import java.util.Collection;
2525
import java.util.Collections;
2626
import java.util.List;
27-
import java.util.Objects;
2827

2928
import co.elastic.logstash.filters.elasticintegration.ElasticsearchRestClientBuilder.ElasticApiConfig;
3029

@@ -33,7 +32,6 @@
3332

3433
import static org.hamcrest.MatcherAssert.assertThat;
3534
import static org.hamcrest.Matchers.containsString;
36-
import static org.hamcrest.Matchers.equalTo;
3735
import static org.hamcrest.Matchers.is;
3836
import static org.hamcrest.Matchers.stringContainsInOrder;
3937
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -67,6 +65,35 @@ public void testForCloudIdFactory() {
6765
}
6866
}
6967

68+
@Test
69+
public void testProxyConfigSetProxy() {
70+
ElasticsearchRestClientBuilder.ProxyConfig proxyConfig = new ElasticsearchRestClientBuilder.ProxyConfig();
71+
proxyConfig.setProxy("https://proxy.mycorp.com:9043");
72+
73+
// Verify the proxy is configured in the HttpAsyncClientBuilder
74+
HttpAsyncClientBuilder httpClientBuilder = Mockito.mock(HttpAsyncClientBuilder.class);
75+
proxyConfig.configureHttpClient(httpClientBuilder);
76+
77+
// Verify setProxy was called with the correct HttpHost
78+
ArgumentCaptor<HttpHost> proxyCaptor = ArgumentCaptor.forClass(HttpHost.class);
79+
verify(httpClientBuilder).setProxy(proxyCaptor.capture());
80+
81+
HttpHost capturedProxy = proxyCaptor.getValue();
82+
assertThat(capturedProxy.getHostName(), is("proxy.mycorp.com"));
83+
assertThat(capturedProxy.getPort(), is(9043));
84+
assertThat(capturedProxy.getSchemeName(), is("https"));
85+
}
86+
87+
@Test
88+
public void testNoProxyConfigurationDoesNotSetProxy() {
89+
ElasticsearchRestClientBuilder.ProxyConfig proxyConfig = new ElasticsearchRestClientBuilder.ProxyConfig();
90+
HttpAsyncClientBuilder httpClientBuilder = Mockito.mock(HttpAsyncClientBuilder.class);
91+
proxyConfig.configureHttpClient(httpClientBuilder);
92+
93+
// Verify setProxy was never called
94+
verify(httpClientBuilder, never()).setProxy(any(HttpHost.class));
95+
}
96+
7097
@Test
7198
public void testForHostsFactorySingleURL() {
7299
final Collection<URL> inputUrls = Collections.singleton(parseURL("https://169.0.0.254:9201/mypath"));

0 commit comments

Comments
 (0)