Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions spec/integration/elastic_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,35 @@ def path; @path; end
# end
end

describe 'with terminate processor' do
let(:pipeline_processor) {
'{
"terminate": {
"if": "ctx.error != null",
"tag": "terminated_ingest_pipeline"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wished this tag be available in _ingest_document so that I can expect(event.get("[tag]")).to eql? "terminated_ingest_pipeline" but no tag exists.

}
},
{
"append": {
"field": "append_field",
"value": ["integration", "test"]
}
}'
}

it 'terminates the ingest pipeline' do
events = [LogStash::Event.new(
"message" => "Send message to pipeline which gets terminated.",
"error" => "This is intentionally placed error.",
"data_stream" => data_stream)]

subject.multi_filter(events).each do |event|
expect(event.get("[@metadata][target_ingest_pipeline]")).to include("_none")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand what exactly this assertion proves WRT the termination? Looks like with "if": "ctx.error != null" the terminate will trigger.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After plugin successfully executes the pipeline, it let's know (set the [@metadata][target_ingest_pipeline] to _none) es-output to avoid ingest pipeline execution in Elasticsearch side.

what exactly this assertion proves WRT the termination?

Yeah, this one I tried to highlight here that terminate processor doesn't attach any specific fields we can validate (it seems tag is for debug purpose). I have updated the integration test which adds append processor after terminate and validates to make sure append processor didn't execute.
Correspond ES query as an integration test would like this:

POST /_ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "processors": [
      {
        "terminate": {
          "if": "ctx.error != null",
          "tag": "terminated_pipeline"
        }
      },
      {
        "append": {
          "field": "message",
          "value": "my message value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id1",
      "_source": {
        "foo": "bar",
        "error": "some exception"
      }
    },
    {
      "_index": "index",
      "_id": "id2",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

In the result, we can see id1 will not have message since pipeline was terminated by terminate processor:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_version": "-3",
        "_id": "id1",
        "_source": {
          "error": "some exception",
          "foo": "bar"
        },
        "_ingest": {
          "timestamp": "2025-07-15T23:56:43.427866851Z"
        }
      }
    },
    {
      "doc": {
        "_index": "index",
        "_version": "-3",
        "_id": "id2",
        "_source": {
          "message": [
            "my message value"
          ],
          "foo": "rab"
        },
        "_ingest": {
          "timestamp": "2025-07-15T23:56:43.427882279Z"
        }
      }
    }
  ]
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That is very helpful. Love that idea!

# intentionally placed append processor to check if it is not executed after terminate processor
expect(event.get("append_field")).to be_nil
end
end
end
end

context '#multi-pipeline execution' do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public EventProcessorBuilder() {
org.elasticsearch.ingest.common.SetProcessor.TYPE,
org.elasticsearch.ingest.common.SortProcessor.TYPE,
org.elasticsearch.ingest.common.SplitProcessor.TYPE,
"terminate", // note: upstream constant is package-private
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.elasticsearch.ingest.common.TrimProcessor.TYPE,
org.elasticsearch.ingest.common.URLDecodeProcessor.TYPE,
org.elasticsearch.ingest.common.UppercaseProcessor.TYPE,
Expand Down