Skip to content

Commit ad1a8d5

Browse files
kaisechengjsvd
andauthored
Add support to Serverless Elasticsearch (#1145)
The pool stores build flavor in health check to determine if the Elasticsearch is serverless. In serverless, ILM, legacy template API and license checker are disabled, and the plugin throws ConfigurationError when ilm_enabled => true or template_api => legacy Co-authored-by: João Duarte <[email protected]>
1 parent 0e4f213 commit ad1a8d5

File tree

14 files changed

+180
-25
lines changed

14 files changed

+180
-25
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.16.0
2+
- Added support to Serverless Elasticsearch [#1445](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1145)
3+
14
## 11.15.9
25
- allow dlq_ settings when using data streams [#1144](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1144)
36

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ def maximum_seen_major_version
9393
@pool.maximum_seen_major_version
9494
end
9595

96+
def serverless?
97+
@pool.serverless?
98+
end
99+
96100
def alive_urls_count
97101
@pool.alive_urls_count
98102
end

lib/logstash/outputs/elasticsearch/http_client/pool.rb

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ def initialize(original_error, url)
4848
:sniffer_delay => 10,
4949
}.freeze
5050

51+
BUILD_FLAVOUR_SERVERLESS = 'serverless'.freeze
52+
5153
def initialize(logger, adapter, initial_urls=[], options={})
5254
@logger = logger
5355
@adapter = adapter
@@ -75,6 +77,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
7577
@license_checker = options[:license_checker] || LogStash::PluginMixins::ElasticSearch::NoopLicenseChecker::INSTANCE
7678

7779
@last_es_version = Concurrent::AtomicReference.new
80+
@build_flavour = Concurrent::AtomicReference.new
7881
end
7982

8083
def start
@@ -250,14 +253,18 @@ def healthcheck!(register_phase = true)
250253
# If no exception was raised it must have succeeded!
251254
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
252255
# We reconnected to this node, check its ES version
253-
es_version = get_es_version(url)
256+
version_info = get_es_version(url)
257+
es_version = version_info.fetch('number', nil)
258+
build_flavour = version_info.fetch('build_flavor', nil)
259+
254260
if es_version.nil?
255261
logger.warn("Failed to retrieve Elasticsearch version data from connected endpoint, connection aborted", :url => url.sanitized.to_s)
256262
next
257263
end
258264
@state_mutex.synchronize do
259265
meta[:version] = es_version
260266
set_last_es_version(es_version, url)
267+
set_build_flavour(build_flavour)
261268

262269
alive = @license_checker.appropriate_license?(self, url)
263270
meta[:state] = alive ? :alive : :dead
@@ -475,7 +482,7 @@ def get_es_version(url)
475482

476483
response = LogStash::Json.load(response.body)
477484

478-
response.fetch('version', {}).fetch('number', nil)
485+
response.fetch('version', {})
479486
end
480487

481488
def last_es_version
@@ -486,6 +493,10 @@ def maximum_seen_major_version
486493
@state_mutex.synchronize { @maximum_seen_major_version }
487494
end
488495

496+
def serverless?
497+
@build_flavour.get == BUILD_FLAVOUR_SERVERLESS
498+
end
499+
489500
private
490501

491502
# @private executing within @state_mutex
@@ -515,5 +526,9 @@ def warn_on_higher_major_version(major, url)
515526
previous_major: @maximum_seen_major_version, new_major: major, node_url: url.sanitized.to_s)
516527
end
517528

529+
def set_build_flavour(flavour)
530+
@build_flavour.set(flavour)
531+
end
532+
518533
end
519534
end; end; end; end;

lib/logstash/outputs/elasticsearch/ilm.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,18 @@ def ilm_in_use?
1414
return @ilm_actually_enabled if defined?(@ilm_actually_enabled)
1515
@ilm_actually_enabled =
1616
begin
17-
if @ilm_enabled == 'auto'
17+
if serverless?
18+
raise LogStash::ConfigurationError, "Invalid ILM configuration `ilm_enabled => true`. " +
19+
"Serverless Elasticsearch cluster does not support Index Lifecycle Management." if @ilm_enabled.to_s == 'true'
20+
@logger.info("ILM auto configuration (`ilm_enabled => auto` or unset) resolved to `false`. "\
21+
"Serverless Elasticsearch cluster does not support Index Lifecycle Management.") if @ilm_enabled == 'auto'
22+
false
23+
elsif @ilm_enabled == 'auto'
1824
if ilm_on_by_default?
1925
ilm_alias_set?
2026
else
21-
@logger.info("Index Lifecycle Management is set to 'auto', but will be disabled - Your Elasticsearch cluster is before 7.0.0, which is the minimum version required to automatically run Index Lifecycle Management")
27+
@logger.info("ILM auto configuration (`ilm_enabled => auto` or unset) resolved to `false`."\
28+
" Elasticsearch cluster is before 7.0.0, which is the minimum version required to automatically run Index Lifecycle Management")
2229
false
2330
end
2431
elsif @ilm_enabled.to_s == 'true'

lib/logstash/outputs/elasticsearch/license_checker.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ def initialize(logger)
1111
# @param url [LogStash::Util::SafeURI] ES node URL
1212
# @return [Boolean] true if provided license is deemed appropriate
1313
def appropriate_license?(pool, url)
14+
return true if pool.serverless?
15+
1416
license = extract_license(pool.get_license(url))
1517
case license_status(license)
1618
when 'active'

lib/logstash/outputs/elasticsearch/template_manager.rb

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ def self.install_template(plugin)
1313
"We recommend either setting `template_api => legacy` to continue providing legacy-style templates, " +
1414
"or migrating your template to the composable style and setting `template_api => composable`. " +
1515
"The legacy template API is slated for removal in Elasticsearch 9.")
16+
elsif plugin.template_api == 'legacy' && plugin.serverless?
17+
raise LogStash::ConfigurationError, "Invalid template configuration `template_api => legacy`. Serverless Elasticsearch does not support legacy template API."
1618
end
1719

20+
1821
if plugin.template
1922
plugin.logger.info("Using mapping template from", :path => plugin.template)
2023
template = read_template_file(plugin.template)
@@ -61,11 +64,13 @@ def self.resolve_template_settings(plugin, template)
6164
plugin.logger.trace("Resolving ILM template settings: under 'settings' key", :template => template, :template_api => plugin.template_api, :es_version => plugin.maximum_seen_major_version)
6265
legacy_index_template_settings(template)
6366
else
64-
template_endpoint = template_endpoint(plugin)
65-
plugin.logger.trace("Resolving ILM template settings: template doesn't have 'settings' or 'template' fields, falling back to auto detection", :template => template, :template_api => plugin.template_api, :es_version => plugin.maximum_seen_major_version, :template_endpoint => template_endpoint)
66-
template_endpoint == INDEX_TEMPLATE_ENDPOINT ?
67-
composable_index_template_settings(template) :
67+
use_index_template_api = index_template_api?(plugin)
68+
plugin.logger.trace("Resolving ILM template settings: template doesn't have 'settings' or 'template' fields, falling back to auto detection", :template => template, :template_api => plugin.template_api, :es_version => plugin.maximum_seen_major_version, :index_template_api => use_index_template_api)
69+
if use_index_template_api
70+
composable_index_template_settings(template)
71+
else
6872
legacy_index_template_settings(template)
73+
end
6974
end
7075
end
7176

@@ -100,12 +105,25 @@ def self.read_template_file(template_path)
100105
end
101106

102107
def self.template_endpoint(plugin)
103-
if plugin.template_api == 'auto'
104-
plugin.maximum_seen_major_version < 8 ? LEGACY_TEMPLATE_ENDPOINT : INDEX_TEMPLATE_ENDPOINT
105-
elsif plugin.template_api.to_s == 'legacy'
106-
LEGACY_TEMPLATE_ENDPOINT
108+
index_template_api?(plugin) ? INDEX_TEMPLATE_ENDPOINT : LEGACY_TEMPLATE_ENDPOINT
109+
end
110+
111+
def self.index_template_api?(plugin)
112+
case plugin.serverless?
113+
when true
114+
true
107115
else
108-
INDEX_TEMPLATE_ENDPOINT
116+
case plugin.template_api
117+
when 'auto'
118+
plugin.maximum_seen_major_version >= 8
119+
when 'composable'
120+
true
121+
when 'legacy'
122+
false
123+
else
124+
plugin.logger.warn("Invalid template_api value #{plugin.template_api}")
125+
true
126+
end
109127
end
110128
end
111129

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ def maximum_seen_major_version
145145
client.maximum_seen_major_version
146146
end
147147

148+
def serverless?
149+
client.serverless?
150+
end
151+
148152
def alive_urls_count
149153
client.alive_urls_count
150154
end

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.15.9'
3+
s.version = '11.16.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/es_spec_helper.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,14 @@ def routing_field_name
5959
end
6060

6161
def self.es_version
62-
[
63-
nilify(RSpec.configuration.filter[:es_version]),
64-
nilify(ENV['ES_VERSION']),
65-
nilify(ENV['ELASTIC_STACK_VERSION']),
66-
].compact.first
62+
{
63+
"number" => [
64+
nilify(RSpec.configuration.filter[:es_version]),
65+
nilify(ENV['ES_VERSION']),
66+
nilify(ENV['ELASTIC_STACK_VERSION']),
67+
].compact.first,
68+
"build_flavor" => 'default'
69+
}
6770
end
6871

6972
RSpec::Matchers.define :have_hits do |expected|
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"license": {
3+
"status": "active",
4+
"uid": "d85d2c6a-b96d-3cc6-96db-5571a789b156",
5+
"type": "enterprise",
6+
"issue_date": "1970-01-01T00:00:00.000Z",
7+
"issue_date_in_millis": 0,
8+
"expiry_date": "2100-01-01T00:00:00.000Z",
9+
"expiry_date_in_millis": 4102444800000,
10+
"max_nodes": null,
11+
"max_resource_units": 100000,
12+
"issued_to": "Elastic Cloud",
13+
"issuer": "API",
14+
"start_date_in_millis": 0
15+
}
16+
}

0 commit comments

Comments
 (0)