diff --git a/lib/logstash/filters/elastic_integration.rb b/lib/logstash/filters/elastic_integration.rb index 8fce6cd4..5e30a351 100644 --- a/lib/logstash/filters/elastic_integration.rb +++ b/lib/logstash/filters/elastic_integration.rb @@ -135,6 +135,7 @@ def register initialize_event_processor! perform_preflight_check! + check_versions_alignment end # def register def filter(event) @@ -345,8 +346,8 @@ def initialize_elasticsearch_rest_client! if serverless? @elasticsearch_rest_client = _elasticsearch_rest_client(config) do |builder| - builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) } - end + builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) } + end end end @@ -374,13 +375,24 @@ def initialize_event_processor! end def perform_preflight_check! + connected_es_version_info check_user_privileges! check_es_cluster_license! end - def check_user_privileges! + def preflight_check_instance java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck') - PreflightCheck.new(@elasticsearch_rest_client).checkUserPrivileges + @preflight_check ||= PreflightCheck.new(@elasticsearch_rest_client) + end + + def connected_es_version_info + @connected_es_version_info ||= preflight_check_instance.getElasticsearchVersionInfo + rescue => e + raise_config_error!(e.message) + end + + def check_user_privileges! + preflight_check_instance.checkUserPrivileges rescue => e security_error_message = "no handler found for uri [/_security/user/_has_privileges]" if e.message.include?(security_error_message) @@ -403,17 +415,13 @@ def check_user_privileges! end def check_es_cluster_license! - java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck') - PreflightCheck.new(@elasticsearch_rest_client).checkLicense + preflight_check_instance.checkLicense rescue => e raise_config_error!(e.message) end def serverless? - java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck') - PreflightCheck.new(@elasticsearch_rest_client).isServerless - rescue => e - raise_config_error!(e.message) + connected_es_version_info["build_flavor"] == 'serverless' end ## @@ -454,4 +462,44 @@ def ensure_java_major_version!(minimum_major_version) ERR end end + + ## + # compares the current plugin version with the Elasticsearch version connected to + # generates a warning or info message based on the situation where the plugin is ahead or behind of the connected Elasticsearch + def check_versions_alignment + plugin_major_version, plugin_minor_version = VERSION.split('.').map(&:to_i) + es_major_version, es_minor_version = connected_es_version_info["number"].split('.').first(2).map(&:to_i) + + logger.info("This #{VERSION} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}") + + es_full_version = connected_es_version_info["number"] + + if es_major_version > plugin_major_version + logger.warn "This plugin v#{VERSION} is connected to a newer MAJOR " + + "version of Elasticsearch v#{es_full_version}, and may " + + "have trouble loading or running pipelines that use new " + + "features; for the best experience, update this plugin " + + "to at least v#{es_major_version}.#{es_minor_version}." + elsif es_major_version < plugin_major_version + logger.warn "This plugin v#{VERSION} is connected to an older MAJOR " + + "version of Elasticsearch v#{es_full_version}, and may " + + "have trouble loading or running pipelines that use " + + "features that were deprecated before Elasticsearch " + + "v#{plugin_major_version}.0; for the best experience, " + + "align major/minor versions across the Elastic Stack." + elsif es_minor_version > plugin_minor_version + logger.warn "This plugin v#{VERSION} is connected to a newer MINOR " + + "version of Elasticsearch v#{es_full_version}, and may " + + "have trouble loading or running pipelines that use new " + + "features; for the best experience, update this plugin to " + + "at least v#{es_major_version}.#{es_minor_version}." + elsif es_minor_version < plugin_minor_version + logger.info "This plugin v#{VERSION} is connected to an older MINOR " + + "version of Elasticsearch v#{es_full_version}; for the best experience, " + + "align major/minor versions across the Elastic Stack." + else + logger.debug "This plugin v#{VERSION} is connected to the same MAJOR/MINOR " + + "version of Elasticsearch v#{es_full_version}." + end + end end \ No newline at end of file diff --git a/spec/integration/elastic_integration_spec.rb b/spec/integration/elastic_integration_spec.rb index 43755611..6fb89b26 100644 --- a/spec/integration/elastic_integration_spec.rb +++ b/spec/integration/elastic_integration_spec.rb @@ -1300,6 +1300,7 @@ def path; @path; end # plugin register fails expected_message = "The cluster privilege `monitor` is REQUIRED in order to validate Elasticsearch license" expect(subject).to receive(:serverless?).and_return(false) + expect(subject).to receive(:connected_es_version_info).and_return({'number' => LogStash::Filters::ElasticIntegration::VERSION, 'build_flavor' => 'default'}) expect{ subject.register }.to raise_error(LogStash::ConfigurationError).with_message(expected_message) # send event and check diff --git a/spec/unit/elastic_integration_spec.rb b/spec/unit/elastic_integration_spec.rb index e6fba65c..88b16615 100644 --- a/spec/unit/elastic_integration_spec.rb +++ b/spec/unit/elastic_integration_spec.rb @@ -16,7 +16,12 @@ subject(:plugin) { LogStash::Filters::ElasticIntegration.new(config) } let(:mock_geoip_database_manager) { double("LogStash::GeoipDatabaseManagement::Manager", :enabled? => false) } - before(:each) { allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager) } + before(:each) { + allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager) + allow(plugin).to receive(:logger).and_return(mock_logger) + } + + let(:mock_logger) { double("Logger").as_null_object } describe 'the plugin class' do subject { described_class } @@ -29,7 +34,7 @@ allow(java.lang.System).to receive(:getProperty).with("java.specification.version").and_return("11.0.16.1") end - it 'prevents initialization and presents helpful guidancee' do + it 'prevents initialization and presents helpful guidance' do expect { described_class.new({}) }.to raise_error(LogStash::EnvironmentError) .with_message(including("requires Java 21 or later", # reason + "current JVM version `11.0.16.1`", @@ -65,12 +70,16 @@ describe "plugin register" do - before(:each) { allow(plugin).to receive(:perform_preflight_check!).and_return(true) } + before(:each) { allow(plugin).to receive(:connected_es_version_info).and_return(connected_es_version_info) } + before(:each) { allow(plugin).to receive(:check_user_privileges!).and_return(true) } + before(:each) { allow(plugin).to receive(:check_es_cluster_license!).and_return(true) } before(:each) { allow(plugin).to receive(:serverless?).and_return(false) } let(:registered_plugin) { plugin.tap(&:register) } after(:each) { plugin.close } + let(:connected_es_version_info) { {'number' => '8.17.0', 'build_flavor' => 'default'} } + shared_examples "validate `ssl_enabled`" do it "enables SSL" do expect(registered_plugin.ssl_enabled).to be_truthy @@ -722,5 +731,80 @@ end end + describe "plugin vs connected ES versions compatibility" do + let(:config) { super().merge("hosts" => %w[127.0.0.2:9300]) } + let(:version) { LogStash::Filters::ElasticIntegration::VERSION } + let(:plugin_major_version) { version.split('.').first.to_i } + let(:plugin_minor_version) { version.split('.')[1].to_i } + let(:base_message) { "This #{version} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}" } + let(:expected_message) { } + + before(:each) { plugin.tap(&:check_versions_alignment) } + + it "informs which version of ES the plugin is built from" do + expected_message = + "This plugin v#{version} is connected to the same MAJOR/MINOR version " + + "of Elasticsearch v#{connected_es_version_info['number']}." + + expect(mock_logger).to have_received(:info).with(base_message) + expect(mock_logger).to have_received(:debug).with(expected_message) + end + + shared_examples "version mismatch" do |version_type, ahead_or_behind, first_log_level, second_log_level| + let(:connected_es_version_info) do + major = plugin_major_version + minor = plugin_minor_version + major += ahead_or_behind == :ahead ? -1 : 1 if version_type == :major + minor += ahead_or_behind == :ahead ? -1 : 1 if version_type == :minor + { "number" => "#{major}.#{minor}.10", "major" => "#{major}", "minor" => "#{minor}", "build_flavor" => "default" } + end + + it "informs to update" do + expect(mock_logger).to have_received(first_log_level).with(base_message) + expect(mock_logger).to have_received(second_log_level).with(expected_message) + end + end + + context "plugin major version is behind" do + let(:expected_message) { + "This plugin v#{version} is connected to a newer MAJOR version of " + + "Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " + + "running pipelines that use new features; for the best experience, " + + "update this plugin to at least v#{connected_es_version_info['major']}.#{connected_es_version_info['minor']}." + } + include_examples "version mismatch", :major, :behind, :info, :warn + end + + context "plugin major version is ahead" do + let(:expected_message) { + "This plugin v#{version} is connected to an older MAJOR version of " + + "Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " + + "running pipelines that use features that were deprecated before " + + "Elasticsearch v#{plugin_major_version}.0; for the best experience, " + + "align major/minor versions across the Elastic Stack." + } + include_examples "version mismatch", :major, :ahead, :info, :warn + end + + context "plugin minor version is behind" do + let(:expected_message) { + "This plugin v#{version} is connected to a newer MINOR version of " + + "Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " + + "running pipelines that use new features; for the best experience, " + + "update this plugin to at least v#{connected_es_version_info['major']}.#{connected_es_version_info['minor']}." + } + include_examples "version mismatch", :minor, :behind, :info, :warn + end + + context "plugin minor version is ahead" do + let(:expected_message) { + "This plugin v#{version} is connected to an older MINOR version of " + + "Elasticsearch v#{connected_es_version_info['number']}; for the best experience, " + + "align major/minor versions across the Elastic Stack." + } + include_examples "version mismatch", :minor, :ahead, :info, :info + end + + end end end \ No newline at end of file diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/PreflightCheck.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/PreflightCheck.java index 374284e0..c87f3968 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/PreflightCheck.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/PreflightCheck.java @@ -50,6 +50,38 @@ public PreflightCheck(final RestClient elasticsearchRestClient) { this.elasticsearchRestClient = elasticsearchRestClient; } + /** + * Get Elasticsearch cluster info and store version number and build flavor. + * @return {@link Map} containing + * + * Throws {@link Failure} if an error occurs while sending the request, parsing the response or extracting the + * required information. + */ + public Map getElasticsearchVersionInfo() { + try { + final Request request = new Request("GET", "/"); + final Response response = elasticsearchRestClient.performRequest(request); + + final String resBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); + logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody)); + + final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version"); + + final String elasticsearchVersion = versionNode.get("number").asText(); + logger.info(String.format("Connected to Elasticsearch version: %s", elasticsearchVersion)); + + final String elasticsearchBuildFlavor = versionNode.get("build_flavor").asText(); + logger.info(String.format("Elasticsearch build_flavor: %s", elasticsearchBuildFlavor)); + + return Map.of("number", elasticsearchVersion, "build_flavor", elasticsearchBuildFlavor); + } catch (Exception e) { + throw new Failure(String.format("Fetching Elasticsearch version information failed: %s", e.getMessage()), e); + } + } + public void checkUserPrivileges() { try { final Request hasPrivilegesRequest = new Request("POST", "/_security/user/_has_privileges"); @@ -105,26 +137,6 @@ public void checkLicense() { } } - public boolean isServerless() { - try { - final Request req = new Request("GET", "/"); - final Response res = elasticsearchRestClient.performRequest(req); - - final String resBody = new String(res.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); - logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody)); - - final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version"); - - final String buildFlavor = versionNode.get("build_flavor").asText(); - logger.info(String.format("Elasticsearch build_flavor: %s", buildFlavor)); - - return buildFlavor.equals("serverless"); - } catch (Exception e) { - logger.error(String.format("Exception checking serverless: %s", e.getMessage())); - throw new Failure(String.format("Preflight check failed: %s", e.getMessage()), e); - } - } - public static class Failure extends RuntimeException { public Failure(String message, Throwable cause) { super(message, cause); diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java index 08c6018b..2e4967a4 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/ElasticsearchRestClientBuilderTest.java @@ -156,6 +156,8 @@ public void testElasticApiConfigAddsHeaders() throws HttpException, IOException for (HttpRequestInterceptor interceptor : interceptors) { interceptor.process(request, null); } + assertThat(request.getFirstHeader("Elastic-Api-Version").getValue(), + is("2023-10-31")); assertThat(request.getFirstHeader("x-elastic-product-origin").getValue(), is("logstash-filter-elastic_integration")); } diff --git a/src/test/java/co/elastic/logstash/filters/elasticintegration/PreflightCheckTest.java b/src/test/java/co/elastic/logstash/filters/elasticintegration/PreflightCheckTest.java index f03caa22..95cbafa9 100644 --- a/src/test/java/co/elastic/logstash/filters/elasticintegration/PreflightCheckTest.java +++ b/src/test/java/co/elastic/logstash/filters/elasticintegration/PreflightCheckTest.java @@ -6,7 +6,6 @@ */ package co.elastic.logstash.filters.elasticintegration; -import co.elastic.logstash.api.Password; import com.github.tomakehurst.wiremock.extension.responsetemplating.ResponseTemplateTransformer; import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.junit5.WireMockExtension; @@ -19,6 +18,7 @@ import java.net.URL; import java.nio.file.Path; import java.util.Collections; +import java.util.Map; import java.util.function.Consumer; import static co.elastic.logstash.filters.elasticintegration.util.ResourcesUtil.readResource; @@ -77,7 +77,6 @@ void checkCredentialsPrivilegesMissingReadPipeline() throws Exception { } - @Test void checkCredentialsPrivilegesMissingManageIndexTemplates() throws Exception { wireMock.stubFor( @@ -197,57 +196,50 @@ void checkLicenseConnectionError() throws Exception { } @Test - void checkIsServerlessTrue() throws Exception { + void checkServerlessVersionInfo() throws Exception { wireMock.stubFor(get("/") - .willReturn(okJson(getBodyFixture("is_serverless.true.json")) + .willReturn(okJson(getBodyFixture("serverless.response.json")) .withTransformers("response-template"))); withWiremockElasticsearch((restClient -> { final Logger logger = Mockito.mock(Logger.class); - boolean result = new PreflightCheck(logger, restClient).isServerless(); + Map result = new PreflightCheck(logger, restClient).getElasticsearchVersionInfo(); - assertTrue(result); + assertNotNull(result); + assertThat(result.size(), equalTo(2)); + assertEquals(result.get("number"), "8.11.0"); + assertEquals(result.get("build_flavor"), "serverless"); + Mockito.verify(logger).info(argThat(containsString("Connected to Elasticsearch version: 8.11.0"))); Mockito.verify(logger).info(argThat(containsString("Elasticsearch build_flavor: serverless"))); })); } @Test - void checkIsServerlessFalse() throws Exception { + void testElasticsearchVersionInfo() throws Exception { wireMock.stubFor(get("/") - .willReturn(okJson(getBodyFixture("is_serverless.false.json")) + .willReturn(okJson(getBodyFixture("elasticsearch.default.response.json")) .withTransformers("response-template"))); withWiremockElasticsearch((restClient -> { final Logger logger = Mockito.mock(Logger.class); - boolean result = new PreflightCheck(logger, restClient).isServerless(); + Map versionInfo = new PreflightCheck(logger, restClient).getElasticsearchVersionInfo(); - assertFalse(result); + assertNotNull(versionInfo); + assertThat(versionInfo.size(), equalTo(2)); + assertEquals(versionInfo.get("number"), "8.7.0"); + assertEquals(versionInfo.get("build_flavor"), "default"); + Mockito.verify(logger).info(argThat(containsString("Connected to Elasticsearch version: 8.7.0"))); Mockito.verify(logger).info(argThat(containsString("Elasticsearch build_flavor: default"))); })); } @Test - void checkIsServerless401Response() throws Exception { + void testElasticsearch401Response() throws Exception { wireMock.stubFor(get("/") - .willReturn(jsonResponse(getBodyFixture("is_serverless.resp.401.json"), 401))); + .willReturn(jsonResponse(getBodyFixture("elasticsearch.401.response.json"), 401))); withWiremockElasticsearch((restClient -> { final PreflightCheck.Failure failure = assertThrows(PreflightCheck.Failure.class, () -> { - new PreflightCheck(restClient).isServerless(); + new PreflightCheck(restClient).getElasticsearchVersionInfo(); }); - assertThat(failure.getMessage(), hasToString(stringContainsInOrder("Preflight check failed", "401 Unauthorized"))); - })); - } - - public static final String ELASTIC_API_VERSION = "2023-10-31"; - public static final String ENCODED_API_KEY = "iamapikeyiamapikeyiamapikeyiamapikeyiamapikeyiamapikeyiama=="; - @Test - void checkServerlessRequestContainsHeaders() throws Exception { - wireMock.stubFor(get("/") - .withHeader("Elastic-Api-Version", containing(ELASTIC_API_VERSION)) - .withHeader("Authorization", containing(ENCODED_API_KEY)) - .willReturn(okJson(getBodyFixture("is_serverless.true.json")) - .withTransformers("response-template"))); - withWiremockServerlessElasticsearch((restClient -> { - boolean result = new PreflightCheck(restClient).isServerless(); - assertTrue(result); + assertThat(failure.getMessage(), hasToString(stringContainsInOrder("Fetching Elasticsearch version information failed", "401 Unauthorized"))); })); } @@ -258,17 +250,6 @@ private void withWiremockElasticsearch(final Consumer handler) throw } } - private void withWiremockServerlessElasticsearch(final Consumer handler) throws Exception{ - final URL wiremockElasticsearch = new URL("http", "127.0.0.1", wireMock.getRuntimeInfo().getHttpPort(),"/"); - try (RestClient restClient = ElasticsearchRestClientBuilder - .forURLs(Collections.singletonList(wiremockElasticsearch)) - .configureElasticApi(c -> c.setApiVersion(ELASTIC_API_VERSION)) - .configureRequestAuth(c -> c.setApiKey(new Password(ENCODED_API_KEY))) - .build()) { - handler.accept(restClient); - } - } - static String getBodyFixture(final String name) { return readResource(PreflightCheck.class, Path.of("preflight-check", name).toString()); } diff --git a/src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/is_serverless.resp.401.json b/src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/elasticsearch.401.response.json similarity index 100% rename from src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/is_serverless.resp.401.json rename to src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/elasticsearch.401.response.json diff --git a/src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/is_serverless.false.json b/src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/elasticsearch.default.response.json similarity index 100% rename from src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/is_serverless.false.json rename to src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/elasticsearch.default.response.json diff --git a/src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/is_serverless.true.json b/src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/serverless.response.json similarity index 100% rename from src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/is_serverless.true.json rename to src/test/resources/co/elastic/logstash/filters/elasticintegration/preflight-check/serverless.response.json