-
Notifications
You must be signed in to change notification settings - Fork 13
Support terminate processor. #345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This pull request does not have a backport label. Could you fix it @mashhurs? 🙏
|
'{ | ||
"terminate": { | ||
"if": "ctx.error != null", | ||
"tag": "terminated_ingest_pipeline" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wished this tag be available in _ingest_document
so that I can expect(event.get("[tag]")).to eql? "terminated_ingest_pipeline"
but no tag exists.
org.elasticsearch.ingest.common.SetProcessor.TYPE, | ||
org.elasticsearch.ingest.common.SortProcessor.TYPE, | ||
org.elasticsearch.ingest.common.SplitProcessor.TYPE, | ||
"terminate", // note: upstream constant is package-private |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TerminateProcessor
is also package private like dot_expander
https://github.com/elastic/logstash-filter-elastic_integration/blob/main/src/main/java/co/elastic/logstash/filters/elasticintegration/EventProcessorBuilder.java#L103
"data_stream" => data_stream)] | ||
|
||
subject.multi_filter(events).each do |event| | ||
expect(event.get("[@metadata][target_ingest_pipeline]")).to include("_none") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand what exactly this assertion proves WRT the termination? Looks like with "if": "ctx.error != null"
the terminate
will trigger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After plugin successfully executes the pipeline, it let's know (set the [@metadata][target_ingest_pipeline]
to _none
) es-output to avoid ingest pipeline execution in Elasticsearch side.
what exactly this assertion proves WRT the termination?
Yeah, this one I tried to highlight here that terminate
processor doesn't attach any specific fields we can validate (it seems tag
is for debug purpose). I have updated the integration test which adds append
processor after terminate
and validates to make sure append processor didn't execute.
Correspond ES query as an integration test would like this:
POST /_ingest/pipeline/_simulate
{
"pipeline" :
{
"processors": [
{
"terminate": {
"if": "ctx.error != null",
"tag": "terminated_pipeline"
}
},
{
"append": {
"field": "message",
"value": "my message value"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id1",
"_source": {
"foo": "bar",
"error": "some exception"
}
},
{
"_index": "index",
"_id": "id2",
"_source": {
"foo": "rab"
}
}
]
}
In the result, we can see id1
will not have message
since pipeline was terminated by terminate processor:
{
"docs": [
{
"doc": {
"_index": "index",
"_version": "-3",
"_id": "id1",
"_source": {
"error": "some exception",
"foo": "bar"
},
"_ingest": {
"timestamp": "2025-07-15T23:56:43.427866851Z"
}
}
},
{
"doc": {
"_index": "index",
"_version": "-3",
"_id": "id2",
"_source": {
"message": [
"my message value"
],
"foo": "rab"
},
"_ingest": {
"timestamp": "2025-07-15T23:56:43.427882279Z"
}
}
}
]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That is very helpful. Love that idea!
…xpectation that after terminate processor no other processors should run.
💚 Build Succeeded
History
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed breakdown. That is very helpful. Appreciate the extra context.
Just FYI: |
@Mergifyio backport 8.17 8.18 8.19 9.0 9.1 |
✅ Backports have been created
|
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220)
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and version bump. --------- Co-authored-by: Mashhur <[email protected]> Co-authored-by: Mashhur <[email protected]>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and version bump. --------- Co-authored-by: Mashhur <[email protected]> Co-authored-by: Mashhur <[email protected]>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and bump version. --------- Co-authored-by: Mashhur <[email protected]> Co-authored-by: Mashhur <[email protected]>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and bump version. --------- Co-authored-by: Mashhur <[email protected]> Co-authored-by: Mashhur <[email protected]>
* Support terminate processor. (#345) * Support terminate processor. * Update the integration test for terminate processor to it check the expectation that after terminate processor no other processors should run. (cherry picked from commit ce63220) * Add changelog and version bump. --------- Co-authored-by: Mashhur <[email protected]> Co-authored-by: Mashhur <[email protected]>
Registers terminate processor to the support list.