Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-hosts>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-password>> | <<password,password>>|No
| <<plugins-{type}s-{plugin}-pipeline_name>> | <<string,string>>|No
| <<plugins-{type}s-{plugin}-proxy>> | <<uri,uri>>|No
| <<plugins-{type}s-{plugin}-ssl_certificate>> | <<path,path>>|No
| <<plugins-{type}s-{plugin}-ssl_certificate_authorities>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-ssl_enabled>> | <<boolean,boolean>>|No
Expand Down Expand Up @@ -494,6 +495,16 @@ A password when using HTTP Basic Authentication to connect to {es}.
* When present, the event's initial pipeline will _not_ be auto-detected from the event's data stream fields.
* Value may be a {logstash-ref}/event-dependent-configuration.html#sprintf[sprintf-style] template; if any referenced fields cannot be resolved the event will not be routed to an ingest pipeline.

[id="plugins-{type}s-{plugin}-proxy"]
===== `proxy`

* Value type is <<uri,uri>>
* There is no default value for this setting.

Address of the HTTP forward proxy used to connect to the {es} cluster.
An empty string is treated as if proxy was not set.
Environment variables may be used to set this value, e.g. `proxy => '${LS_PROXY:}'`.

[id="plugins-{type}s-{plugin}-ssl_certificate"]
===== `ssl_certificate`

Expand Down
4 changes: 4 additions & 0 deletions lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class LogStash::Filters::ElasticIntegration < LogStash::Filters::Base
# Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance.
config :hosts, :validate => :uri, :list => true

# An HTTP forward proxy to use for connecting to the Elasticsearch cluster.
config :proxy, :validate => :uri

# Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used.
#
# For more details, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[cloud documentation]
Expand Down Expand Up @@ -312,6 +315,7 @@ def extract_immutable_config

builder.setHosts @hosts&.map(&:to_s)
builder.setCloudId @cloud_id
builder.setProxy @proxy&.to_s

builder.setSslEnabled @ssl_enabled

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class ElasticsearchRestClientBuilder {
private final IdentityConfig identityConfig = new IdentityConfig();
private final RequestAuthConfig requestAuthConfig = new RequestAuthConfig();
private final ElasticApiConfig elasticApiConfig = new ElasticApiConfig();
private final ProxyConfig proxyConfig = new ProxyConfig();
private String userAgentHeaderValue;

public static ElasticsearchRestClientBuilder forCloudId(final String cloudId) {
Expand Down Expand Up @@ -122,6 +123,8 @@ public static Optional<ElasticsearchRestClientBuilder> fromPluginConfiguration(f
});
config.cloudAuth().ifPresent(requestAuthConfig::setCloudAuth);
config.apiKey().ifPresent(requestAuthConfig::setApiKey);
}).configureProxy(proxyConfig -> {
config.proxy().ifPresent(proxyConfig::setProxy);
})
);
}
Expand All @@ -135,6 +138,11 @@ private static Optional<ElasticsearchRestClientBuilder> builderInit(final Plugin
.or(() -> config.hosts().map(ElasticsearchRestClientBuilder::forURLs));
}

public ElasticsearchRestClientBuilder configureProxy(final Consumer<ProxyConfig> proxyConfigurator) {
proxyConfigurator.accept(this.proxyConfig);
return this;
}

private ElasticsearchRestClientBuilder(final Supplier<RestClientBuilder> restClientBuilderSupplier) {
this.restClientBuilderSupplier = restClientBuilderSupplier;
}
Expand Down Expand Up @@ -178,6 +186,7 @@ RestClient build(final RestClientBuilder restClientBuilder) {
}));

this.elasticApiConfig.configureHttpClient(httpClientBuilder);
this.proxyConfig.configureHttpClient(httpClientBuilder);

if (Objects.nonNull(this.userAgentHeaderValue)) {
httpClientBuilder.setUserAgent(this.userAgentHeaderValue);
Expand Down Expand Up @@ -467,4 +476,20 @@ public void configureHttpClient(final HttpAsyncClientBuilder httpClientBuilder)
HttpClientConfigurator.forAddInterceptorFirst(productOriginHeaderInterceptor).configure(httpClientBuilder);
}
}

public static class ProxyConfig {
private HttpHost httpProxy;

public synchronized void setProxy(final String proxy) {
if (Objects.nonNull(this.httpProxy)) {
throw new IllegalStateException("Only one proxy may be provided");
}
this.httpProxy = HttpHost.create(proxy);
}
public void configureHttpClient(final HttpAsyncClientBuilder httpClientBuilder) {
if (Objects.nonNull(httpProxy)) {
httpClientBuilder.setProxy(httpProxy);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class PluginConfiguration {
// elasticsearch-source: connection target
private final String cloudId;
private final List<String> hosts;
private final String proxy;
private final Boolean sslEnabled;

// elasticsearch-source: ssl connection trust config
Expand Down Expand Up @@ -79,6 +80,7 @@ private PluginConfiguration(final Builder builder) {
this.apiKey = builder.apiKey;
// pipeline name resolver
this.pipelineNameTemplate = builder.pipelineNameTemplate;
this.proxy = builder.proxy;
}

private static <T> List<T> copyOfNullableList(final List<T> source) {
Expand All @@ -104,6 +106,10 @@ public Optional<List<URL>> hosts() {
.map(hosts -> hosts.stream().map(PluginConfiguration::uncheckedParseURL).toList());
}

public Optional<String> proxy() {
return Optional.ofNullable(proxy);
}

public Optional<Boolean> sslEnabled() {
return Optional.ofNullable(sslEnabled);
}
Expand Down Expand Up @@ -171,6 +177,7 @@ public String toString() {
if (Objects.nonNull(id)) { config.add(String.format("id=%s", id)); }
if (Objects.nonNull(cloudId)) { config.add(String.format("cloudId=%s", cloudId)); }
if (Objects.nonNull(hosts)) { config.add(String.format("hosts=%s", hosts)); }
if (Objects.nonNull(proxy)) { config.add(String.format("proxy=%s", proxy)); }
if (Objects.nonNull(sslEnabled)) { config.add(String.format("sslEnabled=%s", sslEnabled)); }
if (Objects.nonNull(sslVerificationMode)) { config.add(String.format("sslVerificationMode=%s", sslVerificationMode)); }
if (Objects.nonNull(sslTruststorePath)) { config.add(String.format("sslTruststorePath=%s", sslTruststorePath)); }
Expand Down Expand Up @@ -203,6 +210,7 @@ public static class Builder {
String id;
String cloudId;
List<String> hosts;
String proxy;
Boolean sslEnabled;
String sslVerificationMode;
String sslTruststorePath;
Expand Down Expand Up @@ -240,6 +248,13 @@ public Builder setHosts(final List<String> hosts) {
return this;
}

public Builder setProxy(final String proxy) {
if (Objects.nonNull(proxy) && !proxy.isBlank()) {
this.proxy = proxy;
}
return this;
}

public Builder setSslEnabled(final Boolean sslEnabled) {
this.sslEnabled = sslEnabled;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.hamcrest.Matchers;
import org.mockito.Mockito;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -24,7 +24,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import co.elastic.logstash.filters.elasticintegration.ElasticsearchRestClientBuilder.ElasticApiConfig;

Expand All @@ -33,7 +32,6 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.stringContainsInOrder;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -67,6 +65,35 @@ public void testForCloudIdFactory() {
}
}

@Test
public void testProxyConfigSetProxy() {
ElasticsearchRestClientBuilder.ProxyConfig proxyConfig = new ElasticsearchRestClientBuilder.ProxyConfig();
proxyConfig.setProxy("https://proxy.mycorp.com:9043");

// Verify the proxy is configured in the HttpAsyncClientBuilder
HttpAsyncClientBuilder httpClientBuilder = Mockito.mock(HttpAsyncClientBuilder.class);
proxyConfig.configureHttpClient(httpClientBuilder);

// Verify setProxy was called with the correct HttpHost
ArgumentCaptor<HttpHost> proxyCaptor = ArgumentCaptor.forClass(HttpHost.class);
verify(httpClientBuilder).setProxy(proxyCaptor.capture());

HttpHost capturedProxy = proxyCaptor.getValue();
assertThat(capturedProxy.getHostName(), is("proxy.mycorp.com"));
assertThat(capturedProxy.getPort(), is(9043));
assertThat(capturedProxy.getSchemeName(), is("https"));
}

@Test
public void testNoProxyConfigurationDoesNotSetProxy() {
ElasticsearchRestClientBuilder.ProxyConfig proxyConfig = new ElasticsearchRestClientBuilder.ProxyConfig();
HttpAsyncClientBuilder httpClientBuilder = Mockito.mock(HttpAsyncClientBuilder.class);
proxyConfig.configureHttpClient(httpClientBuilder);

// Verify setProxy was never called
verify(httpClientBuilder, never()).setProxy(any(HttpHost.class));
}

@Test
public void testForHostsFactorySingleURL() {
final Collection<URL> inputUrls = Collections.singleton(parseURL("https://169.0.0.254:9201/mypath"));
Expand Down