From cd6ee432e6defce65f10f42c0dee1dcc930220da Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 16 Jul 2025 08:25:30 -0700 Subject: [PATCH 1/2] Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220f73500beeed37e676febc7c8a7cb157c1) --- spec/integration/elastic_integration_spec.rb | 29 +++++++++++++++++++ .../EventProcessorBuilder.java | 1 + 2 files changed, 30 insertions(+) diff --git a/spec/integration/elastic_integration_spec.rb b/spec/integration/elastic_integration_spec.rb index 6fb89b26..f8c053cb 100644 --- a/spec/integration/elastic_integration_spec.rb +++ b/spec/integration/elastic_integration_spec.rb @@ -1134,6 +1134,35 @@ def path; @path; end # end end + describe 'with terminate processor' do + let(:pipeline_processor) { + '{ + "terminate": { + "if": "ctx.error != null", + "tag": "terminated_ingest_pipeline" + } + }, + { + "append": { + "field": "append_field", + "value": ["integration", "test"] + } + }' + } + + it 'terminates the ingest pipeline' do + events = [LogStash::Event.new( + "message" => "Send message to pipeline which gets terminated.", + "error" => "This is intentionally placed error.", + "data_stream" => data_stream)] + + subject.multi_filter(events).each do |event| + expect(event.get("[@metadata][target_ingest_pipeline]")).to include("_none") + # intentionally placed append processor to check if it is not executed after terminate processor + expect(event.get("append_field")).to be_nil + end + end + end end context '#multi-pipeline execution' do diff --git a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java index 84552d3c..7f1548dd 100644 --- a/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java +++ b/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java @@ -122,6 +122,7 @@ public EventProcessorBuilder() { org.elasticsearch.ingest.common.SetProcessor.TYPE, org.elasticsearch.ingest.common.SortProcessor.TYPE, org.elasticsearch.ingest.common.SplitProcessor.TYPE, + "terminate", // note: upstream constant is package-private org.elasticsearch.ingest.common.TrimProcessor.TYPE, org.elasticsearch.ingest.common.URLDecodeProcessor.TYPE, org.elasticsearch.ingest.common.UppercaseProcessor.TYPE, From 0d040d6b7554f2a783faaf6244c91005200a659b Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 16 Jul 2025 08:48:39 -0700 Subject: [PATCH 2/2] Add changelog and version bump. --- CHANGELOG.md | 3 +++ VERSION | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9273c533..1ecd5f7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 9.1.1 + - Add `terminate` processor support [#345](https://github.com/elastic/logstash-filter-elastic_integration/pull/345) + ## 9.1.0 - Introduces `proxy` param to support proxy [#316](https://github.com/elastic/logstash-filter-elastic_integration/pull/316) - Embeds Ingest Node components from Elasticsearch 9.1 diff --git a/VERSION b/VERSION index 47da986f..44931da2 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -9.1.0 +9.1.1