Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions lib/logstash/filters/elastic_integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def register
initialize_event_processor!

perform_preflight_check!
check_versions_alignment
end # def register

def filter(event)
Expand Down Expand Up @@ -345,8 +346,8 @@ def initialize_elasticsearch_rest_client!

if serverless?
@elasticsearch_rest_client = _elasticsearch_rest_client(config) do |builder|
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
end
builder.configureElasticApi { |elasticApi| elasticApi.setApiVersion(ELASTIC_API_VERSION) }
end
end
end

Expand Down Expand Up @@ -374,13 +375,24 @@ def initialize_event_processor!
end

def perform_preflight_check!
connected_es_version_info
check_user_privileges!
check_es_cluster_license!
end

def check_user_privileges!
def preflight_check_instance
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).checkUserPrivileges
@preflight_check ||= PreflightCheck.new(@elasticsearch_rest_client)
end

def connected_es_version_info
@connected_es_version_info ||= preflight_check_instance.getElasticsearchVersionInfo
rescue => e
raise_config_error!(e.message)
end

def check_user_privileges!
preflight_check_instance.checkUserPrivileges
rescue => e
security_error_message = "no handler found for uri [/_security/user/_has_privileges]"
if e.message.include?(security_error_message)
Expand All @@ -403,17 +415,13 @@ def check_user_privileges!
end

def check_es_cluster_license!
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).checkLicense
preflight_check_instance.checkLicense
rescue => e
raise_config_error!(e.message)
end

def serverless?
java_import('co.elastic.logstash.filters.elasticintegration.PreflightCheck')
PreflightCheck.new(@elasticsearch_rest_client).isServerless
rescue => e
raise_config_error!(e.message)
connected_es_version_info["build_flavor"] == 'serverless'
end

##
Expand Down Expand Up @@ -454,4 +462,40 @@ def ensure_java_major_version!(minimum_major_version)
ERR
end
end

##
# compares the current plugin version with the Elasticsearch version connected to
# generates a warning or info message based on the situation where the plugin is ahead or behind of the connected Elasticsearch
def check_versions_alignment
plugin_version_parts = VERSION.split('.')
plugin_major_version = plugin_version_parts[0].to_i
plugin_minor_version = plugin_version_parts[1].to_i

base_message = "This #{VERSION} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}"
logger.info(base_message)

es_version_parts = connected_es_version_info["number"].split('.')
es_major_version = es_version_parts[0].to_i
es_minor_version = es_version_parts[1].to_i

This comment was marked as outdated.


return if (plugin_major_version == es_major_version && plugin_minor_version == es_minor_version)

descriptive_message = "Upgrade the plugin and/or stack to the same `major.minor` to get the minimal disruptive experience"

if plugin_major_version != es_major_version
# major version difference may alarm the warning, it doesn't seem useful to inform minor here
# when plugin is ahead of connected ES, not much concern but recommended to keep the same version
# when plugin is behind of connected ES, BIG concern as it cannot utilize the features of the Ingest Node
period_indicator = plugin_major_version > es_major_version ? "newer" : "older"
message = "This #{VERSION} version of plugin is compiled with #{period_indicator} Elasticsearch version than" +
" currently connected Elasticsearch #{connected_es_version_info["number"]} version. #{descriptive_message}"
logger.warn(message) if plugin_major_version < es_major_version
logger.info(message) if plugin_major_version > es_major_version
else
period_indicator = plugin_minor_version > es_minor_version ? "newer" : "older"
message = "This #{VERSION} version of plugin is compiled with #{period_indicator} Elasticsearch version than" +
" currently connected Elasticsearch #{connected_es_version_info["number"]} version. #{descriptive_message}"
logger.info(message)
end
end
end
1 change: 1 addition & 0 deletions spec/integration/elastic_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ def path; @path; end
# plugin register fails
expected_message = "The cluster privilege `monitor` is REQUIRED in order to validate Elasticsearch license"
expect(subject).to receive(:serverless?).and_return(false)
expect(subject).to receive(:connected_es_version_info).and_return({'number' => LogStash::Filters::ElasticIntegration::VERSION, 'build_flavor' => 'default'})
expect{ subject.register }.to raise_error(LogStash::ConfigurationError).with_message(expected_message)

# send event and check
Expand Down
64 changes: 61 additions & 3 deletions spec/unit/elastic_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
subject(:plugin) { LogStash::Filters::ElasticIntegration.new(config) }

let(:mock_geoip_database_manager) { double("LogStash::GeoipDatabaseManagement::Manager", :enabled? => false) }
before(:each) { allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager) }
before(:each) {
allow(plugin).to receive(:load_geoip_database_manager!).and_return(mock_geoip_database_manager)
allow(plugin).to receive(:logger).and_return(mock_logger)
}

let(:mock_logger) { double("Logger").as_null_object }

describe 'the plugin class' do
subject { described_class }
Expand All @@ -29,7 +34,7 @@
allow(java.lang.System).to receive(:getProperty).with("java.specification.version").and_return("11.0.16.1")
end

it 'prevents initialization and presents helpful guidancee' do
it 'prevents initialization and presents helpful guidance' do
expect { described_class.new({}) }.to raise_error(LogStash::EnvironmentError)
.with_message(including("requires Java 21 or later", # reason +
"current JVM version `11.0.16.1`",
Expand Down Expand Up @@ -65,12 +70,16 @@

describe "plugin register" do

before(:each) { allow(plugin).to receive(:perform_preflight_check!).and_return(true) }
before(:each) { allow(plugin).to receive(:connected_es_version_info).and_return(connected_es_version_info) }
before(:each) { allow(plugin).to receive(:check_user_privileges!).and_return(true) }
before(:each) { allow(plugin).to receive(:check_es_cluster_license!).and_return(true) }
before(:each) { allow(plugin).to receive(:serverless?).and_return(false) }

let(:registered_plugin) { plugin.tap(&:register) }
after(:each) { plugin.close }

let(:connected_es_version_info) { {'number' => '8.17.0', 'build_flavor' => 'default'} }

shared_examples "validate `ssl_enabled`" do
it "enables SSL" do
expect(registered_plugin.ssl_enabled).to be_truthy
Expand Down Expand Up @@ -722,5 +731,54 @@
end
end

describe "plugin vs connected ES versions compatibility" do
let(:config) { super().merge("hosts" => %w[127.0.0.2:9300]) }
let(:version) { LogStash::Filters::ElasticIntegration::VERSION }
let(:plugin_major_version) { version.split('.').first.to_i }
let(:plugin_minor_version) { version.split('.')[1].to_i }
let(:base_message) { "This #{version} version of plugin embedded Ingest node components from Elasticsearch #{plugin_major_version}.#{plugin_minor_version}" }
let(:descriptive_message) { "Upgrade the plugin and/or stack to the same `major.minor` to get the minimal disruptive experience" }

before(:each) { plugin.tap(&:check_versions_alignment) }

it "informs which version of ES the plugin is built from" do
expect(mock_logger).to have_received(:info).with(base_message)
end

shared_examples "version mismatch" do |version_type, ahead_or_behind, first_log_level, second_log_level|
let(:connected_es_version_info) do
major = plugin_major_version
minor = plugin_minor_version
major += ahead_or_behind == :ahead ? -1 : 1 if version_type == :major
minor += ahead_or_behind == :ahead ? -1 : 1 if version_type == :minor
{ 'number' => "#{major}.#{minor}.10", 'build_flavor' => 'default' }
end

it "informs to update" do
ahead_or_behind_msg = ahead_or_behind == :ahead ? "newer" : "older"
message = "This #{version} version of plugin is compiled with #{ahead_or_behind_msg} Elasticsearch version than " \
"currently connected Elasticsearch #{connected_es_version_info['number']} version. #{descriptive_message}"

expect(mock_logger).to have_received(first_log_level).with(base_message)
expect(mock_logger).to have_received(second_log_level).with(message)
end
end

context "plugin major version is behind" do
include_examples "version mismatch", :major, :behind, :info, :warn
end

context "plugin major version is ahead" do
include_examples "version mismatch", :major, :ahead, :info, :info
end

context "plugin minor version is behind" do
include_examples "version mismatch", :minor, :behind, :info, :info
end

context "plugin minor version is ahead" do
include_examples "version mismatch", :minor, :ahead, :info, :info
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@ public PreflightCheck(final RestClient elasticsearchRestClient) {
this.elasticsearchRestClient = elasticsearchRestClient;
}

/**
* Get Elasticsearch cluster info and store version number and build flavor.
* @return {@link Map} containing
* <ul>
* <li>"number": the key for a version number (e.g., "8.17.0")</li>
* <li>"build_flavor": the key for a build flavor (e.g., "default" "oss" or "serverless")</li>
* </ul>
* Throws {@link Failure} if an error occurs while sending the request, parsing the response or extracting the
* required information.
*/
public Map<String, String> getElasticsearchVersionInfo() {
try {
final Request request = new Request("GET", "/");
final Response response = elasticsearchRestClient.performRequest(request);

final String resBody = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));

final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");

final String elasticsearchVersion = versionNode.get("number").asText();
logger.info(String.format("Connected to Elasticsearch version: %s", elasticsearchVersion));

final String elasticsearchBuildFlavor = versionNode.get("build_flavor").asText();
logger.info(String.format("Elasticsearch build_flavor: %s", elasticsearchBuildFlavor));

return Map.of("number", elasticsearchVersion, "build_flavor", elasticsearchBuildFlavor);
} catch (Exception e) {
throw new Failure(String.format("Fetching Elasticsearch version information failed: %s", e.getMessage()), e);
}
}

public void checkUserPrivileges() {
try {
final Request hasPrivilegesRequest = new Request("POST", "/_security/user/_has_privileges");
Expand Down Expand Up @@ -105,26 +137,6 @@ public void checkLicense() {
}
}

public boolean isServerless() {
try {
final Request req = new Request("GET", "/");
final Response res = elasticsearchRestClient.performRequest(req);

final String resBody = new String(res.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
logger.debug(() -> String.format("Elasticsearch '/' RAW: %s", resBody));

final JsonNode versionNode = OBJECT_MAPPER.readTree(resBody).get("version");

final String buildFlavor = versionNode.get("build_flavor").asText();
logger.info(String.format("Elasticsearch build_flavor: %s", buildFlavor));

return buildFlavor.equals("serverless");
} catch (Exception e) {
logger.error(String.format("Exception checking serverless: %s", e.getMessage()));
throw new Failure(String.format("Preflight check failed: %s", e.getMessage()), e);
}
}

public static class Failure extends RuntimeException {
public Failure(String message, Throwable cause) {
super(message, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public void testElasticApiConfigAddsHeaders() throws HttpException, IOException
for (HttpRequestInterceptor interceptor : interceptors) {
interceptor.process(request, null);
}
assertThat(request.getFirstHeader("Elastic-Api-Version").getValue(),
is("2023-10-31"));
assertThat(request.getFirstHeader("x-elastic-product-origin").getValue(),
is("logstash-filter-elastic_integration"));
}
Expand Down
Loading