From ba4039991140b7e07da910d733e67537ed2e8083 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 14 Jul 2025 16:54:25 -0700 Subject: [PATCH 1/2] Support termiate processor. --- spec/integration/elastic_integration_spec.rb | 21 +++++++++++++++++++ .../EventProcessorBuilder.java | 1 + 2 files changed, 22 insertions(+) diff --git a/spec/integration/elastic_integration_spec.rb b/spec/integration/elastic_integration_spec.rb index 6fb89b26..905adfa9 100644 --- a/spec/integration/elastic_integration_spec.rb +++ b/spec/integration/elastic_integration_spec.rb @@ -1134,6 +1134,27 @@ def path; @path; end # end end + describe 'with terminate processor' do + let(:pipeline_processor) { + '{ + "terminate": { + "if": "ctx.error != null", + "tag": "terminated_ingest_pipeline" + } + }' + } + + it 'terminates the ingest pipeline' do + events = [LogStash::Event.new( + "message" => "Send message to pipeline which gets terminated.", + "error" => "This is intentionally placed error.", + "data_stream" => data_stream)] + + subject.multi_filter(events).each do |event| + expect(event.get("[@metadata][target_ingest_pipeline]")).to include("_none") + end + end + end end context '#multi-pipeline execution' do diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java index 84552d3c..7f1548dd 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java @@ -122,6 +122,7 @@ public EventProcessorBuilder() { org.elasticsearch.ingest.common.SetProcessor.TYPE, org.elasticsearch.ingest.common.SortProcessor.TYPE, org.elasticsearch.ingest.common.SplitProcessor.TYPE, + "terminate", // note: upstream constant is package-private org.elasticsearch.ingest.common.TrimProcessor.TYPE, org.elasticsearch.ingest.common.URLDecodeProcessor.TYPE, org.elasticsearch.ingest.common.UppercaseProcessor.TYPE, From f81763095177fa65bb98934f23b2280a4a962198 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 15 Jul 2025 16:54:10 -0700 Subject: [PATCH 2/2] Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. --- spec/integration/elastic_integration_spec.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spec/integration/elastic_integration_spec.rb b/spec/integration/elastic_integration_spec.rb index 905adfa9..f8c053cb 100644 --- a/spec/integration/elastic_integration_spec.rb +++ b/spec/integration/elastic_integration_spec.rb @@ -1141,6 +1141,12 @@ def path; @path; end "if": "ctx.error != null", "tag": "terminated_ingest_pipeline" } + }, + { + "append": { + "field": "append_field", + "value": ["integration", "test"] + } }' } @@ -1152,6 +1158,8 @@ def path; @path; end subject.multi_filter(events).each do |event| expect(event.get("[@metadata][target_ingest_pipeline]")).to include("_none") + # intentionally placed append processor to check if it is not executed after terminate processor + expect(event.get("append_field")).to be_nil end end end