diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 11853e2e..57e41a7c 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -360,6 +360,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> | <>|No | <> | <>|No +| <> | <>|No | <> | <>|No | <> |<>|No | <> | <>|No @@ -494,6 +495,16 @@ A password when using HTTP Basic Authentication to connect to {es}. * When present, the event's initial pipeline will _not_ be auto-detected from the event's data stream fields. * Value may be a {logstash-ref}/event-dependent-configuration.html#sprintf[sprintf-style] template; if any referenced fields cannot be resolved the event will not be routed to an ingest pipeline. +[id="plugins-{type}s-{plugin}-proxy"] +===== `proxy` + +* Value type is <> +* There is no default value for this setting. + +Address of the HTTP forward proxy used to connect to the {es} cluster. +An empty string is treated as if proxy was not set. +Environment variables may be used to set this value, e.g. `proxy => '${LS_PROXY:}'`. + [id="plugins-{type}s-{plugin}-ssl_certificate"] ===== `ssl_certificate` diff --git a/lib/logstash/filters/elastic_integration.rb b/lib/logstash/filters/elastic_integration.rb index 5e30a351..33e97209 100644 --- a/lib/logstash/filters/elastic_integration.rb +++ b/lib/logstash/filters/elastic_integration.rb @@ -40,6 +40,9 @@ class LogStash::Filters::ElasticIntegration < LogStash::Filters::Base # Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance. config :hosts, :validate => :uri, :list => true + # An HTTP forward proxy to use for connecting to the Elasticsearch cluster. + config :proxy, :validate => :uri + # Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used. # # For more details, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[cloud documentation] @@ -312,6 +315,7 @@ def extract_immutable_config builder.setHosts @hosts&.map(&:to_s) builder.setCloudId @cloud_id + builder.setProxy @proxy&.to_s builder.setSslEnabled @ssl_enabled diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilder.java index 20e8b0fd..2b6ed0ad 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilder.java @@ -65,6 +65,7 @@ public class ElasticsearchRestClientBuilder { private final IdentityConfig identityConfig = new IdentityConfig(); private final RequestAuthConfig requestAuthConfig = new RequestAuthConfig(); private final ElasticApiConfig elasticApiConfig = new ElasticApiConfig(); + private final ProxyConfig proxyConfig = new ProxyConfig(); private String userAgentHeaderValue; public static ElasticsearchRestClientBuilder forCloudId(final String cloudId) { @@ -122,6 +123,8 @@ public static Optional fromPluginConfiguration(f }); config.cloudAuth().ifPresent(requestAuthConfig::setCloudAuth); config.apiKey().ifPresent(requestAuthConfig::setApiKey); + }).configureProxy(proxyConfig -> { + config.proxy().ifPresent(proxyConfig::setProxy); }) ); } @@ -135,6 +138,11 @@ private static Optional builderInit(final Plugin .or(() -> config.hosts().map(ElasticsearchRestClientBuilder::forURLs)); } + public ElasticsearchRestClientBuilder configureProxy(final Consumer proxyConfigurator) { + proxyConfigurator.accept(this.proxyConfig); + return this; + } + private ElasticsearchRestClientBuilder(final Supplier restClientBuilderSupplier) { this.restClientBuilderSupplier = restClientBuilderSupplier; } @@ -178,6 +186,7 @@ RestClient build(final RestClientBuilder restClientBuilder) { })); this.elasticApiConfig.configureHttpClient(httpClientBuilder); + this.proxyConfig.configureHttpClient(httpClientBuilder); if (Objects.nonNull(this.userAgentHeaderValue)) { httpClientBuilder.setUserAgent(this.userAgentHeaderValue); @@ -467,4 +476,20 @@ public void configureHttpClient(final HttpAsyncClientBuilder httpClientBuilder) HttpClientConfigurator.forAddInterceptorFirst(productOriginHeaderInterceptor).configure(httpClientBuilder); } } + + public static class ProxyConfig { + private HttpHost httpProxy; + + public synchronized void setProxy(final String proxy) { + if (Objects.nonNull(this.httpProxy)) { + throw new IllegalStateException("Only one proxy may be provided"); + } + this.httpProxy = HttpHost.create(proxy); + } + public void configureHttpClient(final HttpAsyncClientBuilder httpClientBuilder) { + if (Objects.nonNull(httpProxy)) { + httpClientBuilder.setProxy(httpProxy); + } + } + } } diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/PluginConfiguration.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/PluginConfiguration.java index 39446f69..47f48055 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/PluginConfiguration.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/PluginConfiguration.java @@ -29,6 +29,7 @@ public final class PluginConfiguration { // elasticsearch-source: connection target private final String cloudId; private final List hosts; + private final String proxy; private final Boolean sslEnabled; // elasticsearch-source: ssl connection trust config @@ -79,6 +80,7 @@ private PluginConfiguration(final Builder builder) { this.apiKey = builder.apiKey; // pipeline name resolver this.pipelineNameTemplate = builder.pipelineNameTemplate; + this.proxy = builder.proxy; } private static List copyOfNullableList(final List source) { @@ -104,6 +106,10 @@ public Optional> hosts() { .map(hosts -> hosts.stream().map(PluginConfiguration::uncheckedParseURL).toList()); } + public Optional proxy() { + return Optional.ofNullable(proxy); + } + public Optional sslEnabled() { return Optional.ofNullable(sslEnabled); } @@ -171,6 +177,7 @@ public String toString() { if (Objects.nonNull(id)) { config.add(String.format("id=%s", id)); } if (Objects.nonNull(cloudId)) { config.add(String.format("cloudId=%s", cloudId)); } if (Objects.nonNull(hosts)) { config.add(String.format("hosts=%s", hosts)); } + if (Objects.nonNull(proxy)) { config.add(String.format("proxy=%s", proxy)); } if (Objects.nonNull(sslEnabled)) { config.add(String.format("sslEnabled=%s", sslEnabled)); } if (Objects.nonNull(sslVerificationMode)) { config.add(String.format("sslVerificationMode=%s", sslVerificationMode)); } if (Objects.nonNull(sslTruststorePath)) { config.add(String.format("sslTruststorePath=%s", sslTruststorePath)); } @@ -203,6 +210,7 @@ public static class Builder { String id; String cloudId; List hosts; + String proxy; Boolean sslEnabled; String sslVerificationMode; String sslTruststorePath; @@ -240,6 +248,13 @@ public Builder setHosts(final List hosts) { return this; } + public Builder setProxy(final String proxy) { + if (Objects.nonNull(proxy) && !proxy.isBlank()) { + this.proxy = proxy; + } + return this; + } + public Builder setSslEnabled(final Boolean sslEnabled) { this.sslEnabled = sslEnabled; return this; diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java index 2e4967a4..fe2d82ec 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java @@ -10,12 +10,12 @@ import org.apache.http.HttpHost; import org.apache.http.HttpRequest; import org.apache.http.HttpRequestInterceptor; -import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicHttpRequest; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.hamcrest.Matchers; +import org.mockito.Mockito; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Objects; import co.elastic.logstash.filters.elasticintegration.ElasticsearchRestClientBuilder.ElasticApiConfig; @@ -33,7 +32,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.stringContainsInOrder; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -67,6 +65,35 @@ public void testForCloudIdFactory() { } } + @Test + public void testProxyConfigSetProxy() { + ElasticsearchRestClientBuilder.ProxyConfig proxyConfig = new ElasticsearchRestClientBuilder.ProxyConfig(); + proxyConfig.setProxy("https://proxy.mycorp.com:9043"); + + // Verify the proxy is configured in the HttpAsyncClientBuilder + HttpAsyncClientBuilder httpClientBuilder = Mockito.mock(HttpAsyncClientBuilder.class); + proxyConfig.configureHttpClient(httpClientBuilder); + + // Verify setProxy was called with the correct HttpHost + ArgumentCaptor proxyCaptor = ArgumentCaptor.forClass(HttpHost.class); + verify(httpClientBuilder).setProxy(proxyCaptor.capture()); + + HttpHost capturedProxy = proxyCaptor.getValue(); + assertThat(capturedProxy.getHostName(), is("proxy.mycorp.com")); + assertThat(capturedProxy.getPort(), is(9043)); + assertThat(capturedProxy.getSchemeName(), is("https")); + } + + @Test + public void testNoProxyConfigurationDoesNotSetProxy() { + ElasticsearchRestClientBuilder.ProxyConfig proxyConfig = new ElasticsearchRestClientBuilder.ProxyConfig(); + HttpAsyncClientBuilder httpClientBuilder = Mockito.mock(HttpAsyncClientBuilder.class); + proxyConfig.configureHttpClient(httpClientBuilder); + + // Verify setProxy was never called + verify(httpClientBuilder, never()).setProxy(any(HttpHost.class)); + } + @Test public void testForHostsFactorySingleURL() { final Collection inputUrls = Collections.singleton(parseURL("https://169.0.0.254:9201/mypath"));