Skip to content

Commit 47a5169

Browse files
authored
Use integration metadata to create ES actions (#1155)
Change the creation of actions that are passed down to Elasticsearch to use also the metadata fields set by an integration. The interested fields are id (document_id), index and pipeline, the field values are taken verbatim without placeholders resolution. The index, document_id and pipeline that are configured in the plugin settings have precedence on the integration ones because manifest an explicit choice made by the user.
1 parent d2d9710 commit 47a5169

File tree

4 files changed

+182
-7
lines changed

4 files changed

+182
-7
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.21.0
2+
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `index`, `document_id`, or `pipeline` directives [#1155](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1155)
3+
14
## 11.20.1
25
- Doc: Replace `document_already_exist_exception` with `version_conflict_engine_exception` in the `silence_errors_in_log` setting example [#1159](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1159)
36

lib/logstash/outputs/elasticsearch.rb

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -540,15 +540,16 @@ def initialize(bad_action)
540540
# @return Hash (initial) parameters for given event
541541
# @private shared event params factory between index and data_stream mode
542542
def common_event_params(event)
543-
sprintf_index = @event_target.call(event)
544-
raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation
543+
event_control = event.get("[@metadata][_ingest_document]")
544+
event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index") rescue nil
545+
545546
params = {
546-
:_id => @document_id ? event.sprintf(@document_id) : nil,
547-
:_index => sprintf_index,
547+
:_id => resolve_document_id(event, event_id),
548+
:_index => resolve_index!(event, event_index),
548549
routing_field_name => @routing ? event.sprintf(@routing) : nil
549550
}
550551

551-
target_pipeline = resolve_pipeline(event)
552+
target_pipeline = resolve_pipeline(event, event_pipeline)
552553
# convention: empty string equates to not using a pipeline
553554
# this is useful when using a field reference in the pipeline setting, e.g.
554555
# elasticsearch {
@@ -559,7 +560,26 @@ def common_event_params(event)
559560
params
560561
end
561562

562-
def resolve_pipeline(event)
563+
def resolve_document_id(event, event_id)
564+
return event.sprintf(@document_id) if @document_id
565+
return event_id || nil
566+
end
567+
private :resolve_document_id
568+
569+
def resolve_index!(event, event_index)
570+
sprintf_index = @event_target.call(event)
571+
raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation
572+
# if it's not a data stream, sprintf_index is the @index with resolved placeholders.
573+
# if is a data stream, sprintf_index could be either the name of a data stream or the value contained in
574+
# @index without placeholders substitution. If event's metadata index is provided, it takes precedence
575+
# on datastream name or whatever is returned by the event_target provider.
576+
return event_index if @index == @default_index && event_index
577+
return sprintf_index
578+
end
579+
private :resolve_index!
580+
581+
def resolve_pipeline(event, event_pipeline)
582+
return event_pipeline if event_pipeline && !@pipeline
563583
pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s
564584
pipeline_template && event.sprintf(pipeline_template)
565585
end

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.20.1'
3+
s.version = '11.21.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,158 @@
271271
end
272272
end
273273

274+
describe "with event integration metadata" do
275+
let(:event_fields) {{}}
276+
let(:event) { LogStash::Event.new(event_fields)}
277+
278+
context "when plugin's index is specified" do
279+
let(:options) { super().merge("index" => "index_from_settings")}
280+
281+
context "when the event contains an integration metadata index" do
282+
let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) }
283+
284+
it "plugin's index is used" do
285+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings")
286+
end
287+
end
288+
289+
context "when the event doesn't contains an integration metadata index" do
290+
it "plugin's index is used" do
291+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "index_from_settings")
292+
end
293+
end
294+
end
295+
296+
context "when plugin's index is NOT specified" do
297+
let(:options) { super().merge("index" => nil)}
298+
299+
context "when the event contains an integration metadata index" do
300+
let(:event_fields) { super().merge({"@metadata" => {"_ingest_document" => {"index" => "meta-document-index"}}}) }
301+
302+
it "event's metadata index is used" do
303+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index")
304+
end
305+
306+
context "when datastream settings are NOT configured" do
307+
it "event's metadata index is used" do
308+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index")
309+
end
310+
end
311+
312+
context "when datastream settings are configured" do
313+
let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) }
314+
315+
it "event's metadata index is used" do
316+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => "meta-document-index")
317+
end
318+
end
319+
end
320+
321+
context "when the event DOESN'T contain integration metadata index" do
322+
let(:default_index_resolved) { event.sprintf(subject.default_index) }
323+
324+
it "default index is used" do
325+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved)
326+
end
327+
328+
context "when datastream settings are NOT configured" do
329+
it "default index is used" do
330+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved)
331+
end
332+
end
333+
334+
context "when datastream settings are configured" do
335+
let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) }
336+
337+
it "default index is used" do
338+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_index => default_index_resolved)
339+
end
340+
end
341+
end
342+
end
343+
344+
context "when plugin's document_id is specified" do
345+
let(:options) { super().merge("document_id" => "id_from_settings")}
346+
347+
context "when the event contains an integration metadata document_id" do
348+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) }
349+
350+
it "plugin's document_id is used" do
351+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "id_from_settings")
352+
end
353+
end
354+
355+
context "when the event DOESN'T contains an integration metadata document_id" do
356+
it "plugin's document_id is used" do
357+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "id_from_settings")
358+
end
359+
end
360+
end
361+
362+
context "when plugin's document_id is NOT specified" do
363+
let(:options) { super().merge("document_id" => nil)}
364+
365+
context "when the event contains an integration metadata document_id" do
366+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"id" => "meta-document-id"}}}) }
367+
368+
it "event's metadata document_id is used" do
369+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => "meta-document-id")
370+
end
371+
end
372+
373+
context "when the event DOESN'T contains an integration metadata document_id" do
374+
it "plugin's default id mechanism is used" do
375+
expect(subject.send(:event_action_tuple, event)[1]).to include(:_id => nil)
376+
end
377+
end
378+
end
379+
380+
context "when plugin's pipeline is specified" do
381+
let(:options) { {"pipeline" => "pipeline_from_settings" } }
382+
383+
context "when the event contains an integration metadata pipeline" do
384+
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"pipeline" => "integration-pipeline"}}}) }
385+
386+
it "plugin's pipeline is used" do
387+
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "pipeline_from_settings")
388+
end
389+
end
390+
391+
context "when the event DOESN'T contains an integration metadata pipeline" do
392+
it "plugin's pipeline is used" do
393+
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "pipeline_from_settings")
394+
end
395+
end
396+
end
397+
398+
context "when plugin's pipeline is NOT specified" do
399+
let(:options) { super().merge("pipeline" => nil)}
400+
401+
context "when the event contains an integration metadata pipeline" do
402+
let(:metadata) { {"_ingest_document" => {"pipeline" => "integration-pipeline"}} }
403+
let(:event) { LogStash::Event.new({"@metadata" => metadata}) }
404+
405+
it "event's metadata pipeline is used" do
406+
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline")
407+
end
408+
409+
context "when also target_ingest_pipeline id defined" do
410+
let(:metadata) { super().merge({"target_ingest_pipeline" => "meta-ingest-pipeline"}) }
411+
412+
it "then event's pipeline from _ingest_document is used" do
413+
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "integration-pipeline")
414+
end
415+
end
416+
end
417+
418+
context "when the event DOESN'T contains an integration metadata pipeline" do
419+
it "plugin's default pipeline mechanism is used" do
420+
expect(subject.send(:event_action_tuple, event)[1]).to_not have_key(:pipeline)
421+
end
422+
end
423+
end
424+
end
425+
274426
describe "with auth" do
275427
let(:user) { "myuser" }
276428
let(:password) { ::LogStash::Util::Password.new("mypassword") }

0 commit comments

Comments
 (0)