Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def register
initialize_event_processor!

perform_preflight_check!
check_versions_alignment
end # def register

def filter(event)
Expand Down Expand Up @@ -345,8 +346,8 @@ def initialize_elasticsearch_rest_client!

if serverless?
@elasticsearch_rest_client = _elasticsearch_rest_client(config) do |builder|
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
end
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
end
end
end

Expand Down Expand Up @@ -374,13 +375,24 @@ def initialize_event_processor!
end

def perform_preflight_check!
connected_es_version_info
check_user_privileges!
check_es_cluster_license!
end

def check_user_privileges!
def preflight_check_instance
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).checkUserPrivileges
@preflight_check ||= PreflightCheck.new(@elasticsearch_rest_client)
end

def connected_es_version_info
@connected_es_version_info ||= preflight_check_instance.getElasticsearchVersionInfo
rescue => e
raise_config_error!(e.message)
end

def check_user_privileges!
preflight_check_instance.checkUserPrivileges
rescue => e
security_error_message = "no handler found for uri [/_security/user/_has_privileges]"
if e.message.include?(security_error_message)
Expand All @@ -403,17 +415,13 @@ def check_user_privileges!
end

def check_es_cluster_license!
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).checkLicense
preflight_check_instance.checkLicense
rescue => e
raise_config_error!(e.message)
end

def serverless?
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).isServerless
rescue => e
raise_config_error!(e.message)
connected_es_version_info["build_flavor"] == 'serverless'
end

##
Expand Down Expand Up @@ -454,4 +462,38 @@ def ensure_java_major_version!(minimum_major_version)
ERR
end
end

##
# compares the current plugin version with the Elasticsearch version connected to
# generates a warning or info message based on the situation where the plugin is ahead or behind of the connected Elasticsearch
def check_versions_alignment
plugin_major_version, plugin_minor_version = VERSION.split('.').map(&:to_i)
es_major_version, es_minor_version = connected_es_version_info["number"].split('.').first(2).map(&:to_i)

logger.info("This #{VERSION} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}")

es_full_version = connected_es_version_info["number"]

if es_major_version > plugin_major_version
logger.warn <<~WARNING
This plugin v#{VERSION} is connected to a newer MAJOR version of Elasticsearch v#{es_full_version}, and may have trouble loading or running pipelines that use new features; for the best experience, update this plugin to at least v#{es_major_version}.#{es_minor_version}
WARNING
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent of my prior suggestion to use heredocs was to enable the text to be readable with line-breaks, but I missed an important bit where even squiggle-heredocs keep the newlines 🤦🏼.

I don't think that the squiggle-heredocs are inherently more readable though, so it might be worth going back to normal strings.

Suggested change
logger.warn <<~WARNING
This plugin v#{VERSION} is connected to a newer MAJOR version of Elasticsearch v#{es_full_version}, and may have trouble loading or running pipelines that use new features; for the best experience, update this plugin to at least v#{es_major_version}.#{es_minor_version}
WARNING
logger.warn "This plugin v#{VERSION} is connected to a newer MAJOR version of Elasticsearch v#{es_full_version}, and may have trouble loading or running pipelines that use new features; for the best experience, update this plugin to at least v#{es_major_version}.#{es_minor_version}"

or with concatenation (each needs a trailing space)

Suggested change
logger.warn <<~WARNING
This plugin v#{VERSION} is connected to a newer MAJOR version of Elasticsearch v#{es_full_version}, and may have trouble loading or running pipelines that use new features; for the best experience, update this plugin to at least v#{es_major_version}.#{es_minor_version}
WARNING
logger.warn "This plugin v#{VERSION} is connected to a newer MAJOR " +
"version of Elasticsearch v#{es_full_version}, and may " +
"have trouble loading or running pipelines that use new " +
"features; for the best experience, update this plugin " +
"to at least v#{es_major_version}.#{es_minor_version}"

or a heredoc, but with trickery to turn the newlines into spaces:

Suggested change
logger.warn <<~WARNING
This plugin v#{VERSION} is connected to a newer MAJOR version of Elasticsearch v#{es_full_version}, and may have trouble loading or running pipelines that use new features; for the best experience, update this plugin to at least v#{es_major_version}.#{es_minor_version}
WARNING
logger.warn(<<~WARNING.tr("\n", " "))
This plugin v#{VERSION} is connected to a newer MAJOR version of
Elasticsearch v#{es_full_version}, and may have trouble loading or
running pipelines that use new features; for the best experience,
update this plugin to at least v#{es_major_version}.#{es_minor_version}
WARNING

Copy link
Collaborator Author

@mashhurs mashhurs Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the hardest part actually!

heredocs had a power here to highlight the warnings with multiline texts. However, when I did an actual test

  • the console look so ugly-cut-sentenced, especially when I made a half screen console - sentences alignment;
  • logs didn't align with our logging standard, in a kind of jumpy way - easy to lose concentration;

Overall, it looks to me our current standard loggings has a better readability (smooth reading experience) and concentration (not jump words) points.

In case of readability, yes a) I forgot to remove heredocs styles b) made unit tests readable instead actual logic (had a sleep disorder 5AM commit, sorry) - thank you for revisiting

My last commit rearranges (your 2nd suggestion, concatenation with trailing space) and removes heredocs style!
image

elsif es_major_version < plugin_major_version
logger.warn <<~WARNING
This plugin v#{VERSION} is connected to an older MAJOR version of Elasticsearch v#{es_full_version}, and may have trouble loading or running pipelines that use features that were deprecated before Elasticsearch v#{plugin_major_version}.0; for the best experience, align major/minor versions across the Elastic Stack.
WARNING
elsif es_minor_version > plugin_minor_version
logger.warn <<~WARNING
This plugin v#{VERSION} is connected to a newer MINOR version of Elasticsearch v#{es_full_version}, and may have trouble loading or running pipelines that use new features; for the best experience, update this plugin to at least v#{es_major_version}.#{es_minor_version}
WARNING
elsif es_minor_version < plugin_minor_version
logger.info <<~INFO
This plugin v#{VERSION} is connected to an older MINOR version of Elasticsearch v#{es_full_version}; for the best experience, align major/minor versions across the Elastic Stack.
INFO
else
logger.debug <<~DEBUG
This plugin v#{VERSION} is connected to the same MAJOR/MINOR version of Elasticsearch v#{es_full_version}.
DEBUG
end
end
end
1 change: 1 addition & 0 deletions spec/integration/elastic_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ def path; @path; end
# plugin register fails
expected_message = "The cluster privilege `monitor` is REQUIRED in order to validate Elasticsearch license"
expect(subject).to receive(:serverless?).and_return(false)
expect(subject).to receive(:connected_es_version_info).and_return({'number' => LogStash::Filters::ElasticIntegration::VERSION, 'build_flavor' => 'default'})
expect{ subject.register }.to raise_error(LogStash::ConfigurationError).with_message(expected_message)

# send event and check
Expand Down
90 changes: 87 additions & 3 deletions spec/unit/elastic_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
subject(:plugin) { LogStash::Filters::ElasticIntegration.new(config) }

let(:mock_geoip_database_manager) { double("LogStash::GeoipDatabaseManagement::Manager", :enabled? => false) }
before(:each) { allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager) }
before(:each) {
allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager)
allow(plugin).to receive(:logger).and_return(mock_logger)
}

let(:mock_logger) { double("Logger").as_null_object }

describe 'the plugin class' do
subject { described_class }
Expand All @@ -29,7 +34,7 @@
allow(java.lang.System).to receive(:getProperty).with("java.specification.version").and_return("11.0.16.1")
end

it 'prevents initialization and presents helpful guidancee' do
it 'prevents initialization and presents helpful guidance' do
expect { described_class.new({}) }.to raise_error(LogStash::EnvironmentError)
.with_message(including("requires Java 21 or later", # reason +
"current JVM version `11.0.16.1`",
Expand Down Expand Up @@ -65,12 +70,16 @@

describe "plugin register" do

before(:each) { allow(plugin).to receive(:perform_preflight_check!).and_return(true) }
before(:each) { allow(plugin).to receive(:connected_es_version_info).and_return(connected_es_version_info) }
before(:each) { allow(plugin).to receive(:check_user_privileges!).and_return(true) }
before(:each) { allow(plugin).to receive(:check_es_cluster_license!).and_return(true) }
before(:each) { allow(plugin).to receive(:serverless?).and_return(false) }

let(:registered_plugin) { plugin.tap(&:register) }
after(:each) { plugin.close }

let(:connected_es_version_info) { {'number' => '8.17.0', 'build_flavor' => 'default'} }

shared_examples "validate `ssl_enabled`" do
it "enables SSL" do
expect(registered_plugin.ssl_enabled).to be_truthy
Expand Down Expand Up @@ -722,5 +731,80 @@
end
end

describe "plugin vs connected ES versions compatibility" do
let(:config) { super().merge("hosts" => %w[127.0.0.2:9300]) }
let(:version) { LogStash::Filters::ElasticIntegration::VERSION }
let(:plugin_major_version) { version.split('.').first.to_i }
let(:plugin_minor_version) { version.split('.')[1].to_i }
let(:base_message) { "This #{version} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}" }
let(:expected_message) { }

before(:each) { plugin.tap(&:check_versions_alignment) }

it "informs which version of ES the plugin is built from" do
expected_message =
"This plugin v#{version} is connected to the same MAJOR/MINOR version " +
"of Elasticsearch v#{connected_es_version_info['number']}.\n"

expect(mock_logger).to have_received(:info).with(base_message)
expect(mock_logger).to have_received(:debug).with(expected_message)
end

shared_examples "version mismatch" do |version_type, ahead_or_behind, first_log_level, second_log_level|
let(:connected_es_version_info) do
major = plugin_major_version
minor = plugin_minor_version
major += ahead_or_behind == :ahead ? -1 : 1 if version_type == :major
minor += ahead_or_behind == :ahead ? -1 : 1 if version_type == :minor
{ "number" => "#{major}.#{minor}.10", "major" => "#{major}", "minor" => "#{minor}", "build_flavor" => "default" }
end

it "informs to update" do
expect(mock_logger).to have_received(first_log_level).with(base_message)
expect(mock_logger).to have_received(second_log_level).with(expected_message)
end
end

context "plugin major version is behind" do
let(:expected_message) {
"This plugin v#{version} is connected to a newer MAJOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " +
"running pipelines that use new features; for the best experience, " +
"update this plugin to at least v#{connected_es_version_info['major']}.#{connected_es_version_info['minor']}\n"
}
include_examples "version mismatch", :major, :behind, :info, :warn
end

context "plugin major version is ahead" do
let(:expected_message) {
"This plugin v#{version} is connected to an older MAJOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " +
"running pipelines that use features that were deprecated before " +
"Elasticsearch v#{plugin_major_version}.0; for the best experience, " +
"align major/minor versions across the Elastic Stack.\n"
}
include_examples "version mismatch", :major, :ahead, :info, :warn
end

context "plugin minor version is behind" do
let(:expected_message) {
"This plugin v#{version} is connected to a newer MINOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " +
"running pipelines that use new features; for the best experience, " +
"update this plugin to at least v#{connected_es_version_info['major']}.#{connected_es_version_info['minor']}\n"
}
include_examples "version mismatch", :minor, :behind, :info, :warn
end

context "plugin minor version is ahead" do
let(:expected_message) {
"This plugin v#{version} is connected to an older MINOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}; for the best experience, " +
"align major/minor versions across the Elastic Stack.\n"
}
include_examples "version mismatch", :minor, :ahead, :info, :info
end

end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@ public PreflightCheck(final RestClient elasticsearchRestClient) {
this.elasticsearchRestClient = elasticsearchRestClient;
}

/**
* Get Elasticsearch cluster info and store version number and build flavor.
* @return {@link Map} containing
* <ul>
* <li>"number": the key for a version number (e.g., "8.17.0")</li>
* <li>"build_flavor": the key for a build flavor (e.g., "default" "oss" or "serverless")</li>
* </ul>
* Throws {@link Failure} if an error occurs while sending the request, parsing the response or extracting the
* required information.
*/
public Map<String, String> getElasticsearchVersionInfo() {
try {
final Request request = new Request("GET", "/");
final Response response = elasticsearchRestClient.performRequest(request);

final String resBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));

final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");

final String elasticsearchVersion = versionNode.get("number").asText();
logger.info(String.format("Connected to Elasticsearch version: %s", elasticsearchVersion));

final String elasticsearchBuildFlavor = versionNode.get("build_flavor").asText();
logger.info(String.format("Elasticsearch build_flavor: %s", elasticsearchBuildFlavor));

return Map.of("number", elasticsearchVersion, "build_flavor", elasticsearchBuildFlavor);
} catch (Exception e) {
throw new Failure(String.format("Fetching Elasticsearch version information failed: %s", e.getMessage()), e);
}
}

public void checkUserPrivileges() {
try {
final Request hasPrivilegesRequest = new Request("POST", "/_security/user/_has_privileges");
Expand Down Expand Up @@ -105,26 +137,6 @@ public void checkLicense() {
}
}

public boolean isServerless() {
try {
final Request req = new Request("GET", "/");
final Response res = elasticsearchRestClient.performRequest(req);

final String resBody = new String(res.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));

final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");

final String buildFlavor = versionNode.get("build_flavor").asText();
logger.info(String.format("Elasticsearch build_flavor: %s", buildFlavor));

return buildFlavor.equals("serverless");
} catch (Exception e) {
logger.error(String.format("Exception checking serverless: %s", e.getMessage()));
throw new Failure(String.format("Preflight check failed: %s", e.getMessage()), e);
}
}

public static class Failure extends RuntimeException {
public Failure(String message, Throwable cause) {
super(message, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public void testElasticApiConfigAddsHeaders() throws HttpException, IOException
for (HttpRequestInterceptor interceptor : interceptors) {
interceptor.process(request, null);
}
assertThat(request.getFirstHeader("Elastic-Api-Version").getValue(),
is("2023-10-31"));
assertThat(request.getFirstHeader("x-elastic-product-origin").getValue(),
is("logstash-filter-elastic_integration"));
}
Expand Down
Loading