Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def register
initialize_event_processor!

perform_preflight_check!
check_versions_alignment
end # def register

def filter(event)
Expand Down Expand Up @@ -345,8 +346,8 @@ def initialize_elasticsearch_rest_client!

if serverless?
@elasticsearch_rest_client = _elasticsearch_rest_client(config) do |builder|
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
end
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
end
end
end

Expand Down Expand Up @@ -374,13 +375,24 @@ def initialize_event_processor!
end

def perform_preflight_check!
connected_es_version_info
check_user_privileges!
check_es_cluster_license!
end

def check_user_privileges!
def preflight_check_instance
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).checkUserPrivileges
@preflight_check ||= PreflightCheck.new(@elasticsearch_rest_client)
end

def connected_es_version_info
@connected_es_version_info ||= preflight_check_instance.getElasticsearchVersionInfo
rescue => e
raise_config_error!(e.message)
end

def check_user_privileges!
preflight_check_instance.checkUserPrivileges
rescue => e
security_error_message = "no handler found for uri [/_security/user/_has_privileges]"
if e.message.include?(security_error_message)
Expand All @@ -403,17 +415,13 @@ def check_user_privileges!
end

def check_es_cluster_license!
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).checkLicense
preflight_check_instance.checkLicense
rescue => e
raise_config_error!(e.message)
end

def serverless?
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).isServerless
rescue => e
raise_config_error!(e.message)
connected_es_version_info["build_flavor"] == 'serverless'
end

##
Expand Down Expand Up @@ -454,4 +462,44 @@ def ensure_java_major_version!(minimum_major_version)
ERR
end
end

##
# compares the current plugin version with the Elasticsearch version connected to
# generates a warning or info message based on the situation where the plugin is ahead or behind of the connected Elasticsearch
def check_versions_alignment
plugin_major_version, plugin_minor_version = VERSION.split('.').map(&:to_i)
es_major_version, es_minor_version = connected_es_version_info["number"].split('.').first(2).map(&:to_i)

logger.info("This #{VERSION} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}")

es_full_version = connected_es_version_info["number"]

if es_major_version > plugin_major_version
logger.warn "This plugin v#{VERSION} is connected to a newer MAJOR " +
"version of Elasticsearch v#{es_full_version}, and may " +
"have trouble loading or running pipelines that use new " +
"features; for the best experience, update this plugin " +
"to at least v#{es_major_version}.#{es_minor_version}."
elsif es_major_version < plugin_major_version
logger.warn "This plugin v#{VERSION} is connected to an older MAJOR " +
"version of Elasticsearch v#{es_full_version}, and may " +
"have trouble loading or running pipelines that use " +
"features that were deprecated before Elasticsearch " +
"v#{plugin_major_version}.0; for the best experience, " +
"align major/minor versions across the Elastic Stack."
elsif es_minor_version > plugin_minor_version
logger.warn "This plugin v#{VERSION} is connected to a newer MINOR " +
"version of Elasticsearch v#{es_full_version}, and may " +
"have trouble loading or running pipelines that use new " +
"features; for the best experience, update this plugin to " +
"at least v#{es_major_version}.#{es_minor_version}."
elsif es_minor_version < plugin_minor_version
logger.info "This plugin v#{VERSION} is connected to an older MINOR " +
"version of Elasticsearch v#{es_full_version}; for the best experience, " +
"align major/minor versions across the Elastic Stack."
else
logger.debug "This plugin v#{VERSION} is connected to the same MAJOR/MINOR " +
"version of Elasticsearch v#{es_full_version}."
end
end
end
1 change: 1 addition & 0 deletions spec/integration/elastic_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ def path; @path; end
# plugin register fails
expected_message = "The cluster privilege `monitor` is REQUIRED in order to validate Elasticsearch license"
expect(subject).to receive(:serverless?).and_return(false)
expect(subject).to receive(:connected_es_version_info).and_return({'number' => LogStash::Filters::ElasticIntegration::VERSION, 'build_flavor' => 'default'})
expect{ subject.register }.to raise_error(LogStash::ConfigurationError).with_message(expected_message)

# send event and check
Expand Down
90 changes: 87 additions & 3 deletions spec/unit/elastic_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
subject(:plugin) { LogStash::Filters::ElasticIntegration.new(config) }

let(:mock_geoip_database_manager) { double("LogStash::GeoipDatabaseManagement::Manager", :enabled? => false) }
before(:each) { allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager) }
before(:each) {
allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager)
allow(plugin).to receive(:logger).and_return(mock_logger)
}

let(:mock_logger) { double("Logger").as_null_object }

describe 'the plugin class' do
subject { described_class }
Expand All @@ -29,7 +34,7 @@
allow(java.lang.System).to receive(:getProperty).with("java.specification.version").and_return("11.0.16.1")
end

it 'prevents initialization and presents helpful guidancee' do
it 'prevents initialization and presents helpful guidance' do
expect { described_class.new({}) }.to raise_error(LogStash::EnvironmentError)
.with_message(including("requires Java 17 or later", # reason +
"current JVM version `11.0.16.1`",
Expand Down Expand Up @@ -65,12 +70,16 @@

describe "plugin register" do

before(:each) { allow(plugin).to receive(:perform_preflight_check!).and_return(true) }
before(:each) { allow(plugin).to receive(:connected_es_version_info).and_return(connected_es_version_info) }
before(:each) { allow(plugin).to receive(:check_user_privileges!).and_return(true) }
before(:each) { allow(plugin).to receive(:check_es_cluster_license!).and_return(true) }
before(:each) { allow(plugin).to receive(:serverless?).and_return(false) }

let(:registered_plugin) { plugin.tap(&:register) }
after(:each) { plugin.close }

let(:connected_es_version_info) { {'number' => '8.17.0', 'build_flavor' => 'default'} }

shared_examples "validate `ssl_enabled`" do
it "enables SSL" do
expect(registered_plugin.ssl_enabled).to be_truthy
Expand Down Expand Up @@ -722,5 +731,80 @@
end
end

describe "plugin vs connected ES versions compatibility" do
let(:config) { super().merge("hosts" => %w[127.0.0.2:9300]) }
let(:version) { LogStash::Filters::ElasticIntegration::VERSION }
let(:plugin_major_version) { version.split('.').first.to_i }
let(:plugin_minor_version) { version.split('.')[1].to_i }
let(:base_message) { "This #{version} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}" }
let(:expected_message) { }

before(:each) { plugin.tap(&:check_versions_alignment) }

it "informs which version of ES the plugin is built from" do
expected_message =
"This plugin v#{version} is connected to the same MAJOR/MINOR version " +
"of Elasticsearch v#{connected_es_version_info['number']}."

expect(mock_logger).to have_received(:info).with(base_message)
expect(mock_logger).to have_received(:debug).with(expected_message)
end

shared_examples "version mismatch" do |version_type, ahead_or_behind, first_log_level, second_log_level|
let(:connected_es_version_info) do
major = plugin_major_version
minor = plugin_minor_version
major += ahead_or_behind == :ahead ? -1 : 1 if version_type == :major
minor += ahead_or_behind == :ahead ? -1 : 1 if version_type == :minor
{ "number" => "#{major}.#{minor}.10", "major" => "#{major}", "minor" => "#{minor}", "build_flavor" => "default" }
end

it "informs to update" do
expect(mock_logger).to have_received(first_log_level).with(base_message)
expect(mock_logger).to have_received(second_log_level).with(expected_message)
end
end

context "plugin major version is behind" do
let(:expected_message) {
"This plugin v#{version} is connected to a newer MAJOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " +
"running pipelines that use new features; for the best experience, " +
"update this plugin to at least v#{connected_es_version_info['major']}.#{connected_es_version_info['minor']}."
}
include_examples "version mismatch", :major, :behind, :info, :warn
end

context "plugin major version is ahead" do
let(:expected_message) {
"This plugin v#{version} is connected to an older MAJOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " +
"running pipelines that use features that were deprecated before " +
"Elasticsearch v#{plugin_major_version}.0; for the best experience, " +
"align major/minor versions across the Elastic Stack."
}
include_examples "version mismatch", :major, :ahead, :info, :warn
end

context "plugin minor version is behind" do
let(:expected_message) {
"This plugin v#{version} is connected to a newer MINOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}, and may have trouble loading or " +
"running pipelines that use new features; for the best experience, " +
"update this plugin to at least v#{connected_es_version_info['major']}.#{connected_es_version_info['minor']}."
}
include_examples "version mismatch", :minor, :behind, :info, :warn
end

context "plugin minor version is ahead" do
let(:expected_message) {
"This plugin v#{version} is connected to an older MINOR version of " +
"Elasticsearch v#{connected_es_version_info['number']}; for the best experience, " +
"align major/minor versions across the Elastic Stack."
}
include_examples "version mismatch", :minor, :ahead, :info, :info
end

end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@ public PreflightCheck(final RestClient elasticsearchRestClient) {
this.elasticsearchRestClient = elasticsearchRestClient;
}

/**
* Get Elasticsearch cluster info and store version number and build flavor.
* @return {@link Map} containing
* <ul>
* <li>"number": the key for a version number (e.g., "8.17.0")</li>
* <li>"build_flavor": the key for a build flavor (e.g., "default" "oss" or "serverless")</li>
* </ul>
* Throws {@link Failure} if an error occurs while sending the request, parsing the response or extracting the
* required information.
*/
public Map<String, String> getElasticsearchVersionInfo() {
try {
final Request request = new Request("GET", "/");
final Response response = elasticsearchRestClient.performRequest(request);

final String resBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));

final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");

final String elasticsearchVersion = versionNode.get("number").asText();
logger.info(String.format("Connected to Elasticsearch version: %s", elasticsearchVersion));

final String elasticsearchBuildFlavor = versionNode.get("build_flavor").asText();
logger.info(String.format("Elasticsearch build_flavor: %s", elasticsearchBuildFlavor));

return Map.of("number", elasticsearchVersion, "build_flavor", elasticsearchBuildFlavor);
} catch (Exception e) {
throw new Failure(String.format("Fetching Elasticsearch version information failed: %s", e.getMessage()), e);
}
}

public void checkUserPrivileges() {
try {
final Request hasPrivilegesRequest = new Request("POST", "/_security/user/_has_privileges");
Expand Down Expand Up @@ -105,26 +137,6 @@ public void checkLicense() {
}
}

public boolean isServerless() {
try {
final Request req = new Request("GET", "/");
final Response res = elasticsearchRestClient.performRequest(req);

final String resBody = new String(res.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));

final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");

final String buildFlavor = versionNode.get("build_flavor").asText();
logger.info(String.format("Elasticsearch build_flavor: %s", buildFlavor));

return buildFlavor.equals("serverless");
} catch (Exception e) {
logger.error(String.format("Exception checking serverless: %s", e.getMessage()));
throw new Failure(String.format("Preflight check failed: %s", e.getMessage()), e);
}
}

public static class Failure extends RuntimeException {
public Failure(String message, Throwable cause) {
super(message, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public void testElasticApiConfigAddsHeaders() throws HttpException, IOException
for (HttpRequestInterceptor interceptor : interceptors) {
interceptor.process(request, null);
}
assertThat(request.getFirstHeader("Elastic-Api-Version").getValue(),
is("2023-10-31"));
assertThat(request.getFirstHeader("x-elastic-product-origin").getValue(),
is("logstash-filter-elastic_integration"));
}
Expand Down
Loading