Skip to content

Commit ddaa1a3

Browse files
committed
Logging improvements: informing which ES version considered when building this plugin version, when plugin and connected ES versions do not match warn/inform incompatibility risk and show the guidance.
1 parent 63aa130 commit ddaa1a3

File tree

8 files changed

+117
-76
lines changed

8 files changed

+117
-76
lines changed

lib/logstash/filters/elastic_integration.rb

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ def register
135135
initialize_event_processor!
136136

137137
perform_preflight_check!
138+
if serverless?
139+
set_api_version_to_rest_client!
140+
end
141+
check_versions_alignment
138142
end # def register
139143

140144
def filter(event)
@@ -342,12 +346,6 @@ def extract_immutable_config
342346
def initialize_elasticsearch_rest_client!
343347
config = extract_immutable_config
344348
@elasticsearch_rest_client = _elasticsearch_rest_client(config)
345-
346-
if serverless?
347-
@elasticsearch_rest_client = _elasticsearch_rest_client(config) do |builder|
348-
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
349-
end
350-
end
351349
end
352350

353351
def _elasticsearch_rest_client(config, &builder_interceptor)
@@ -374,13 +372,21 @@ def initialize_event_processor!
374372
end
375373

376374
def perform_preflight_check!
375+
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
376+
@preflight_check = PreflightCheck.new(@elasticsearch_rest_client)
377+
connected_es_version_info
377378
check_user_privileges!
378379
check_es_cluster_license!
380+
rescue => e
381+
raise_config_error!(e.message)
382+
end
383+
384+
def connected_es_version_info
385+
@connected_es_version_info |= @preflight_check.getElasticsearchVersionInfo
379386
end
380387

381388
def check_user_privileges!
382-
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
383-
PreflightCheck.new(@elasticsearch_rest_client).checkUserPrivileges
389+
@preflight_check.checkUserPrivileges
384390
rescue => e
385391
security_error_message = "no handler found for uri [/_security/user/_has_privileges]"
386392
if e.message.include?(security_error_message)
@@ -403,17 +409,19 @@ def check_user_privileges!
403409
end
404410

405411
def check_es_cluster_license!
406-
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
407-
PreflightCheck.new(@elasticsearch_rest_client).checkLicense
412+
@preflight_check.checkLicense
408413
rescue => e
409414
raise_config_error!(e.message)
410415
end
411416

412417
def serverless?
413-
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
414-
PreflightCheck.new(@elasticsearch_rest_client).isServerless
415-
rescue => e
416-
raise_config_error!(e.message)
418+
connected_es_version_info["build_flavor"] == 'serverless'
419+
end
420+
421+
def set_api_version_to_rest_client!
422+
@elasticsearch_rest_client = _elasticsearch_rest_client(config) do |builder|
423+
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
424+
end
417425
end
418426

419427
##
@@ -454,4 +462,40 @@ def ensure_java_major_version!(minimum_major_version)
454462
ERR
455463
end
456464
end
465+
466+
##
467+
# compares the current plugin version with the Elasticsearch version connected to
468+
# generates a warning or info message based on the situation where the plugin is ahead or behind of the Elasticsearch
469+
def check_versions_alignment
470+
plugin_version_parts = VERSION.split('.')
471+
plugin_major_version = plugin_version_parts[0].to_i
472+
plugin_minor_version = plugin_version_parts[1].to_i
473+
474+
base_message = "This #{VERSION} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}"
475+
@logger.info(base_message)
476+
477+
es_version_parts = connected_es_version_info["number"].split('.')
478+
es_major_version = es_version_parts[0].to_i
479+
es_minor_version = es_version_parts[1].to_i
480+
481+
return if (plugin_major_version == es_major_version && plugin_minor_version == es_minor_version)
482+
483+
descriptive_message = "Upgrade the plugin and/or stack to the same `major.minor` to get the minimal disruptive experience"
484+
485+
if plugin_major_version != es_major_version
486+
# major version difference may alarm the warning, it doesn't seem useful to inform minor here
487+
# when plugin is ahead of connected ES, not much concern but recommended to keep the same version
488+
# when plugin is behind of connected ES, BIG concern as it cannot utilize the features of the Ingest Node
489+
period_indicator = plugin_major_version > es_major_version ? "newer" : "older"
490+
message = "This #{VERSION} version of plugin is compiled with #{period_indicator} Elasticsearch version than" +
491+
" currently connected Elasticsearch #{es_major_version}.#{es_minor_version} version. #{descriptive_message}"
492+
@logger.info(message) if plugin_major_version > es_major_version
493+
@logger.warn(message) if plugin_major_version < es_major_version
494+
else
495+
period_indicator = plugin_minor_version > es_minor_version ? "newer" : "older"
496+
message = "This #{VERSION} version of plugin is compiled with #{period_indicator} Elasticsearch version than" +
497+
" currently connected Elasticsearch #{es_major_version}.#{es_minor_version} version. #{descriptive_message}"
498+
@logger.info(message) unless plugin_minor_version == es_minor_version
499+
end
500+
end
457501
end

spec/unit/elastic_integration_spec.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
allow(java.lang.System).to receive(:getProperty).with("java.specification.version").and_return("11.0.16.1")
3030
end
3131

32-
it 'prevents initialization and presents helpful guidancee' do
32+
it 'prevents initialization and presents helpful guidance' do
3333
expect { described_class.new({}) }.to raise_error(LogStash::EnvironmentError)
3434
.with_message(including("requires Java 21 or later", # reason +
3535
"current JVM version `11.0.16.1`",
@@ -65,7 +65,9 @@
6565

6666
describe "plugin register" do
6767

68-
before(:each) { allow(plugin).to receive(:perform_preflight_check!).and_return(true) }
68+
before(:each) { allow(plugin).to receive(:connected_es_version_info).and_return({'number' => '8.17.0', 'build_flavor' => 'default'}) }
69+
before(:each) { allow(plugin).to receive(:check_user_privileges!).and_return(true) }
70+
before(:each) { allow(plugin).to receive(:check_es_cluster_license!).and_return(true) }
6971
before(:each) { allow(plugin).to receive(:serverless?).and_return(false) }
7072

7173
let(:registered_plugin) { plugin.tap(&:register) }

src/main/java/co/elastic/logstash/filters/elasticintegration/PreflightCheck.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,38 @@ public PreflightCheck(final RestClient elasticsearchRestClient) {
5050
this.elasticsearchRestClient = elasticsearchRestClient;
5151
}
5252

53+
/**
54+
* Get Elasticsearch cluster info and store version number and build flavor.
55+
* @return {@link Map} containing
56+
* <ul>
57+
* <li>"number": the key for a version number (e.g., "8.17.0")</li>
58+
* <li>"build_flavor": the key for a build flavor (e.g., "default" "oss" or "serverless")</li>
59+
* </ul>
60+
* Throws {@link Failure} if an error occurs while sending the request, parsing the response or extracting the
61+
* required information.
62+
*/
63+
public Map<String, String> getElasticsearchVersionInfo() {
64+
try {
65+
final Request request = new Request("GET", "/");
66+
final Response response = elasticsearchRestClient.performRequest(request);
67+
68+
final String resBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
69+
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));
70+
71+
final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");
72+
73+
final String elasticsearchVersion = versionNode.get("number").asText();
74+
logger.info(String.format("Connected to Elasticsearch version: %s", elasticsearchVersion));
75+
76+
final String elasticsearchBuildFlavor = versionNode.get("build_flavor").asText();
77+
logger.info(String.format("Elasticsearch build_flavor: %s", elasticsearchBuildFlavor));
78+
79+
return Map.of("number", elasticsearchVersion, "build_flavor", elasticsearchBuildFlavor);
80+
} catch (Exception e) {
81+
throw new Failure(String.format("Fetching Elasticsearch version information failed: %s", e.getMessage()), e);
82+
}
83+
}
84+
5385
public void checkUserPrivileges() {
5486
try {
5587
final Request hasPrivilegesRequest = new Request("POST", "/_security/user/_has_privileges");
@@ -105,26 +137,6 @@ public void checkLicense() {
105137
}
106138
}
107139

108-
public boolean isServerless() {
109-
try {
110-
final Request req = new Request("GET", "/");
111-
final Response res = elasticsearchRestClient.performRequest(req);
112-
113-
final String resBody = new String(res.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
114-
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));
115-
116-
final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");
117-
118-
final String buildFlavor = versionNode.get("build_flavor").asText();
119-
logger.info(String.format("Elasticsearch build_flavor: %s", buildFlavor));
120-
121-
return buildFlavor.equals("serverless");
122-
} catch (Exception e) {
123-
logger.error(String.format("Exception checking serverless: %s", e.getMessage()));
124-
throw new Failure(String.format("Preflight check failed: %s", e.getMessage()), e);
125-
}
126-
}
127-
128140
public static class Failure extends RuntimeException {
129141
public Failure(String message, Throwable cause) {
130142
super(message, cause);

src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ public void testElasticApiConfigAddsHeaders() throws HttpException, IOException
156156
for (HttpRequestInterceptor interceptor : interceptors) {
157157
interceptor.process(request, null);
158158
}
159+
assertThat(request.getFirstHeader("Elastic-Api-Version").getValue(),
160+
is("2023-10-31"));
159161
assertThat(request.getFirstHeader("x-elastic-product-origin").getValue(),
160162
is("logstash-filter-elastic_integration"));
161163
}

src/test/java/co/elastic/logstash/filters/elasticintegration/PreflightCheckTest.java

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package co.elastic.logstash.filters.elasticintegration;
88

9-
import co.elastic.logstash.api.Password;
109
import com.github.tomakehurst.wiremock.extension.responsetemplating.ResponseTemplateTransformer;
1110
import com.github.tomakehurst.wiremock.http.Fault;
1211
import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
@@ -19,6 +18,7 @@
1918
import java.net.URL;
2019
import java.nio.file.Path;
2120
import java.util.Collections;
21+
import java.util.Map;
2222
import java.util.function.Consumer;
2323

2424
import static co.elastic.logstash.filters.elasticintegration.util.ResourcesUtil.readResource;
@@ -77,7 +77,6 @@ void checkCredentialsPrivilegesMissingReadPipeline() throws Exception {
7777
}
7878

7979

80-
8180
@Test
8281
void checkCredentialsPrivilegesMissingManageIndexTemplates() throws Exception {
8382
wireMock.stubFor(
@@ -197,57 +196,50 @@ void checkLicenseConnectionError() throws Exception {
197196
}
198197

199198
@Test
200-
void checkIsServerlessTrue() throws Exception {
199+
void checkServerlessVersionInfo() throws Exception {
201200
wireMock.stubFor(get("/")
202-
.willReturn(okJson(getBodyFixture("is_serverless.true.json"))
201+
.willReturn(okJson(getBodyFixture("serverless.response.json"))
203202
.withTransformers("response-template")));
204203
withWiremockElasticsearch((restClient -> {
205204
final Logger logger = Mockito.mock(Logger.class);
206-
boolean result = new PreflightCheck(logger, restClient).isServerless();
205+
Map<String, String> result = new PreflightCheck(logger, restClient).getElasticsearchVersionInfo();
207206

208-
assertTrue(result);
207+
assertNotNull(result);
208+
assertThat(result.size(), equalTo(2));
209+
assertEquals(result.get("number"), "8.11.0");
210+
assertEquals(result.get("build_flavor"), "serverless");
211+
Mockito.verify(logger).info(argThat(containsString("Connected to Elasticsearch version: 8.11.0")));
209212
Mockito.verify(logger).info(argThat(containsString("Elasticsearch build_flavor: serverless")));
210213
}));
211214
}
212215

213216
@Test
214-
void checkIsServerlessFalse() throws Exception {
217+
void testElasticsearchVersionInfo() throws Exception {
215218
wireMock.stubFor(get("/")
216-
.willReturn(okJson(getBodyFixture("is_serverless.false.json"))
219+
.willReturn(okJson(getBodyFixture("elasticsearch.default.response.json"))
217220
.withTransformers("response-template")));
218221
withWiremockElasticsearch((restClient -> {
219222
final Logger logger = Mockito.mock(Logger.class);
220-
boolean result = new PreflightCheck(logger, restClient).isServerless();
223+
Map<String, String> versionInfo = new PreflightCheck(logger, restClient).getElasticsearchVersionInfo();
221224

222-
assertFalse(result);
225+
assertNotNull(versionInfo);
226+
assertThat(versionInfo.size(), equalTo(2));
227+
assertEquals(versionInfo.get("number"), "8.7.0");
228+
assertEquals(versionInfo.get("build_flavor"), "default");
229+
Mockito.verify(logger).info(argThat(containsString("Connected to Elasticsearch version: 8.7.0")));
223230
Mockito.verify(logger).info(argThat(containsString("Elasticsearch build_flavor: default")));
224231
}));
225232
}
226233

227234
@Test
228-
void checkIsServerless401Response() throws Exception {
235+
void testElasticsearch401Response() throws Exception {
229236
wireMock.stubFor(get("/")
230-
.willReturn(jsonResponse(getBodyFixture("is_serverless.resp.401.json"), 401)));
237+
.willReturn(jsonResponse(getBodyFixture("elasticsearch.401.response.json"), 401)));
231238
withWiremockElasticsearch((restClient -> {
232239
final PreflightCheck.Failure failure = assertThrows(PreflightCheck.Failure.class, () -> {
233-
new PreflightCheck(restClient).isServerless();
240+
new PreflightCheck(restClient).getElasticsearchVersionInfo();
234241
});
235-
assertThat(failure.getMessage(), hasToString(stringContainsInOrder("Preflight check failed", "401 Unauthorized")));
236-
}));
237-
}
238-
239-
public static final String ELASTIC_API_VERSION = "2023-10-31";
240-
public static final String ENCODED_API_KEY = "iamapikeyiamapikeyiamapikeyiamapikeyiamapikeyiamapikeyiama==";
241-
@Test
242-
void checkServerlessRequestContainsHeaders() throws Exception {
243-
wireMock.stubFor(get("/")
244-
.withHeader("Elastic-Api-Version", containing(ELASTIC_API_VERSION))
245-
.withHeader("Authorization", containing(ENCODED_API_KEY))
246-
.willReturn(okJson(getBodyFixture("is_serverless.true.json"))
247-
.withTransformers("response-template")));
248-
withWiremockServerlessElasticsearch((restClient -> {
249-
boolean result = new PreflightCheck(restClient).isServerless();
250-
assertTrue(result);
242+
assertThat(failure.getMessage(), hasToString(stringContainsInOrder("Fetching Elasticsearch version information failed", "401 Unauthorized")));
251243
}));
252244
}
253245

@@ -258,17 +250,6 @@ private void withWiremockElasticsearch(final Consumer<RestClient> handler) throw
258250
}
259251
}
260252

261-
private void withWiremockServerlessElasticsearch(final Consumer<RestClient> handler) throws Exception{
262-
final URL wiremockElasticsearch = new URL("http", "127.0.0.1", wireMock.getRuntimeInfo().getHttpPort(),"/");
263-
try (RestClient restClient = ElasticsearchRestClientBuilder
264-
.forURLs(Collections.singletonList(wiremockElasticsearch))
265-
.configureElasticApi(c -> c.setApiVersion(ELASTIC_API_VERSION))
266-
.configureRequestAuth(c -> c.setApiKey(new Password(ENCODED_API_KEY)))
267-
.build()) {
268-
handler.accept(restClient);
269-
}
270-
}
271-
272253
static String getBodyFixture(final String name) {
273254
return readResource(PreflightCheck.class, Path.of("preflight-check", name).toString());
274255
}

0 commit comments

Comments
 (0)