Skip to content

Commit 0801b43

Browse files
authored
Add Elastic Api Version header (#1147)
Add header {"Elastic-Api-Version" : "2023-10-31"} to all requests when Elasticsearch is serverless. A test connection is added in health check in register phase and the pipeline will fail if the header is invalid. Combine the version check request and Elasticsearch product check request into one. Rename "build_flavour" to "build_flavor"
1 parent c111521 commit 0801b43

File tree

8 files changed

+330
-177
lines changed

8 files changed

+330
-177
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.18.0
2+
- Added request header `Elastic-Api-Version` for serverless [#1147](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1147)
3+
14
## 11.17.0
25
- Added support to http compression level. Deprecated `http_compression` in favour of `compression_level` and enabled compression level 1 by default. [#1148](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1148)
36

lib/logstash/outputs/elasticsearch.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,9 @@ def use_event_type?(noop_required_client)
596596
def install_template
597597
TemplateManager.install_template(self)
598598
rescue => e
599-
@logger.error("Failed to install template", message: e.message, exception: e.class, backtrace: e.backtrace)
599+
details = { message: e.message, exception: e.class, backtrace: e.backtrace }
600+
details[:body] = e.response_body if e.respond_to?(:response_body)
601+
@logger.error("Failed to install template", details)
600602
end
601603

602604
def setup_ecs_compatibility_related_defaults

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ def emulate_batch_error_response(actions, http_code, reason)
209209
end
210210

211211
def get(path)
212-
response = @pool.get(path, nil)
212+
response = @pool.get(path)
213213
LogStash::Json.load(response.body)
214214
end
215215

lib/logstash/outputs/elasticsearch/http_client/pool.rb

Lines changed: 106 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,18 @@ def initialize(response_code, url, request_body, response_body)
1616
@response_body = response_body
1717
end
1818

19+
def invalid_eav_header?
20+
@response_code == 400 && @response_body&.include?(ELASTIC_API_VERSION)
21+
end
22+
23+
def invalid_credentials?
24+
@response_code == 401
25+
end
26+
27+
def forbidden?
28+
@response_code == 403
29+
end
30+
1931
end
2032
class HostUnreachableError < Error;
2133
attr_reader :original_error, :url
@@ -48,7 +60,9 @@ def initialize(original_error, url)
4860
:sniffer_delay => 10,
4961
}.freeze
5062

51-
BUILD_FLAVOUR_SERVERLESS = 'serverless'.freeze
63+
BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
64+
ELASTIC_API_VERSION = "Elastic-Api-Version".freeze
65+
DEFAULT_EAV_HEADER = { ELASTIC_API_VERSION => "2023-10-31" }.freeze
5266

5367
def initialize(logger, adapter, initial_urls=[], options={})
5468
@logger = logger
@@ -77,7 +91,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
7791
@license_checker = options[:license_checker] || LogStash::PluginMixins::ElasticSearch::NoopLicenseChecker::INSTANCE
7892

7993
@last_es_version = Concurrent::AtomicReference.new
80-
@build_flavour = Concurrent::AtomicReference.new
94+
@build_flavor = Concurrent::AtomicReference.new
8195
end
8296

8397
def start
@@ -232,39 +246,56 @@ def get_license(url)
232246
end
233247

234248
def health_check_request(url)
235-
response = perform_request_to_url(url, :head, @healthcheck_path)
236-
raise BadResponseCodeError.new(response.code, url, nil, response.body) unless (200..299).cover?(response.code)
249+
logger.debug("Running health check to see if an Elasticsearch connection is working",
250+
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
251+
begin
252+
response = perform_request_to_url(url, :head, @healthcheck_path)
253+
return response, nil
254+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
255+
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
256+
return nil, e
257+
end
237258
end
238259

239260
def healthcheck!(register_phase = true)
240261
# Try to keep locking granularity low such that we don't affect IO...
241262
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
242263
begin
243-
logger.debug("Running health check to see if an Elasticsearch connection is working",
244-
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
245-
health_check_request(url)
264+
_, health_bad_code_err = health_check_request(url)
265+
root_response, root_bad_code_err = get_root_path(url) if health_bad_code_err.nil? || register_phase
246266

247267
# when called from resurrectionist skip the product check done during register phase
248268
if register_phase
249-
if !elasticsearch?(url)
250-
raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
251-
end
269+
raise LogStash::ConfigurationError,
270+
"Could not read Elasticsearch. Please check the credentials" if root_bad_code_err&.invalid_credentials?
271+
raise LogStash::ConfigurationError,
272+
"Could not read Elasticsearch. Please check the privileges" if root_bad_code_err&.forbidden?
273+
# when customer_headers is invalid
274+
raise LogStash::ConfigurationError,
275+
"The Elastic-Api-Version header is not valid" if root_bad_code_err&.invalid_eav_header?
276+
# when it is not Elasticserach
277+
raise LogStash::ConfigurationError,
278+
"Could not connect to a compatible version of Elasticsearch" if root_bad_code_err.nil? && !elasticsearch?(root_response)
279+
280+
test_serverless_connection(url, root_response)
252281
end
282+
283+
raise health_bad_code_err if health_bad_code_err
284+
raise root_bad_code_err if root_bad_code_err
285+
253286
# If no exception was raised it must have succeeded!
254287
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
255-
# We reconnected to this node, check its ES version
256-
version_info = get_es_version(url)
257-
es_version = version_info.fetch('number', nil)
258-
build_flavour = version_info.fetch('build_flavor', nil)
259-
260-
if es_version.nil?
261-
logger.warn("Failed to retrieve Elasticsearch version data from connected endpoint, connection aborted", :url => url.sanitized.to_s)
262-
next
263-
end
288+
289+
# We check its ES version
290+
es_version, build_flavor = parse_es_version(root_response)
291+
logger.warn("Failed to retrieve Elasticsearch build flavor") if build_flavor.nil?
292+
logger.warn("Failed to retrieve Elasticsearch version data from connected endpoint, connection aborted", :url => url.sanitized.to_s) if es_version.nil?
293+
next if es_version.nil?
294+
264295
@state_mutex.synchronize do
265296
meta[:version] = es_version
266297
set_last_es_version(es_version, url)
267-
set_build_flavour(build_flavour)
298+
set_build_flavor(build_flavor)
268299

269300
alive = @license_checker.appropriate_license?(self, url)
270301
meta[:state] = alive ? :alive : :dead
@@ -275,40 +306,21 @@ def healthcheck!(register_phase = true)
275306
end
276307
end
277308

278-
def elasticsearch?(url)
309+
def get_root_path(url, params={})
279310
begin
280-
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
311+
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
312+
return resp, nil
281313
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
282-
return false if response.code == 401 || response.code == 403
283-
raise e
314+
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
315+
return nil, e
284316
end
285-
286-
version_info = LogStash::Json.load(response.body)
287-
return false if version_info['version'].nil?
288-
289-
version = ::Gem::Version.new(version_info["version"]['number'])
290-
return false if version < ::Gem::Version.new('6.0.0')
291-
292-
if VERSION_6_TO_7.satisfied_by?(version)
293-
return valid_tagline?(version_info)
294-
elsif VERSION_7_TO_7_14.satisfied_by?(version)
295-
build_flavor = version_info["version"]['build_flavor']
296-
return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info)
297-
else
298-
# case >= 7.14
299-
lower_headers = response.headers.transform_keys {|key| key.to_s.downcase }
300-
product_header = lower_headers['x-elastic-product']
301-
return false if product_header != 'Elasticsearch'
302-
end
303-
return true
304-
rescue => e
305-
logger.error("Unable to retrieve Elasticsearch version", url: url.sanitized.to_s, exception: e.class, message: e.message)
306-
false
307317
end
308318

309-
def valid_tagline?(version_info)
310-
tagline = version_info['tagline']
311-
tagline == "You Know, for Search"
319+
def test_serverless_connection(url, root_response)
320+
_, build_flavor = parse_es_version(root_response)
321+
params = { :headers => DEFAULT_EAV_HEADER }
322+
_, bad_code_err = get_root_path(url, params) if build_flavor == BUILD_FLAVOR_SERVERLESS
323+
raise LogStash::ConfigurationError, "The Elastic-Api-Version header is not valid" if bad_code_err&.invalid_eav_header?
312324
end
313325

314326
def stop_resurrectionist
@@ -334,6 +346,7 @@ def perform_request(method, path, params={}, body=nil)
334346
end
335347

336348
def perform_request_to_url(url, method, path, params={}, body=nil)
349+
params[:headers] = DEFAULT_EAV_HEADER.merge(params[:headers] || {}) if serverless?
337350
@adapter.perform_request(url, method, path, params, body)
338351
end
339352

@@ -476,15 +489,6 @@ def return_connection(url)
476489
end
477490
end
478491

479-
def get_es_version(url)
480-
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
481-
return nil unless (200..299).cover?(response.code)
482-
483-
response = LogStash::Json.load(response.body)
484-
485-
response.fetch('version', {})
486-
end
487-
488492
def last_es_version
489493
@last_es_version.get
490494
end
@@ -494,7 +498,7 @@ def maximum_seen_major_version
494498
end
495499

496500
def serverless?
497-
@build_flavour.get == BUILD_FLAVOUR_SERVERLESS
501+
@build_flavor.get == BUILD_FLAVOR_SERVERLESS
498502
end
499503

500504
private
@@ -526,9 +530,50 @@ def warn_on_higher_major_version(major, url)
526530
previous_major: @maximum_seen_major_version, new_major: major, node_url: url.sanitized.to_s)
527531
end
528532

529-
def set_build_flavour(flavour)
530-
@build_flavour.set(flavour)
533+
def set_build_flavor(flavor)
534+
@build_flavor.set(flavor)
535+
end
536+
537+
def parse_es_version(response)
538+
return nil, nil unless (200..299).cover?(response&.code)
539+
540+
response = LogStash::Json.load(response&.body)
541+
version_info = response.fetch('version', {})
542+
es_version = version_info.fetch('number', nil)
543+
build_flavor = version_info.fetch('build_flavor', nil)
544+
545+
return es_version, build_flavor
546+
end
547+
548+
def elasticsearch?(response)
549+
return false if response.nil?
550+
551+
version_info = LogStash::Json.load(response.body)
552+
return false if version_info['version'].nil?
553+
554+
version = ::Gem::Version.new(version_info["version"]['number'])
555+
return false if version < ::Gem::Version.new('6.0.0')
556+
557+
if VERSION_6_TO_7.satisfied_by?(version)
558+
return valid_tagline?(version_info)
559+
elsif VERSION_7_TO_7_14.satisfied_by?(version)
560+
build_flavor = version_info["version"]['build_flavor']
561+
return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info)
562+
else
563+
# case >= 7.14
564+
lower_headers = response.headers.transform_keys {|key| key.to_s.downcase }
565+
product_header = lower_headers['x-elastic-product']
566+
return false if product_header != 'Elasticsearch'
567+
end
568+
return true
569+
rescue => e
570+
logger.error("Unable to retrieve Elasticsearch version", exception: e.class, message: e.message)
571+
false
531572
end
532573

574+
def valid_tagline?(version_info)
575+
tagline = version_info['tagline']
576+
tagline == "You Know, for Search"
577+
end
533578
end
534579
end; end; end; end;

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ def discover_cluster_uuid
179179
cluster_info = client.get('/')
180180
plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid'])
181181
rescue => e
182-
@logger.error("Unable to retrieve Elasticsearch cluster uuid", message: e.message, exception: e.class, backtrace: e.backtrace)
182+
details = { message: e.message, exception: e.class, backtrace: e.backtrace }
183+
details[:body] = e.response_body if e.respond_to?(:response_body)
184+
@logger.error("Unable to retrieve Elasticsearch cluster uuid", details)
183185
end
184186

185187
def retrying_submit(actions)

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.17.0'
3+
s.version = '11.18.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)