Skip to content

Conversation

@seanzatzdev
Copy link
Contributor

@seanzatzdev seanzatzdev commented Aug 21, 2025

In our docs at https://www.elastic.co/docs/manage-data/data-store/data-streams/failure-store-recipes#create-a-pipeline-to-convert-failure-documents we currently describe how to remediate documents from the failure store using a Painless script.

To make things less error-prone, this PR encapsulates this functionality into a processor.

e.g.

POST localhost:9200/_ingest/pipeline/_simulate
{
    "pipeline": {
        "processors": [
            {
                "recover_failure_document": {}
            }
        ]
    },
    "docs": [
        {
            "_index": ".fs-my-datastream-ingest-2025.05.09-000001",
            "_id": "HnTJs5YBwrYNjPmaFcri",
            "_score": 1,
            "_source": {
                "@timestamp": "2025-05-09T06:41:24.775Z",
                "document": {
                    "index": "my-datastream-ingest",
                    "source": {
                        "@timestamp": "2025-04-21T00:00:00Z",
                        "counter_name": "test"
                    }
                },
                "error": {
                    "type": "illegal_argument_exception",
                    "message": "field [counter] not present as part of path [counter]",
                    "stack_trace": "j.l.IllegalArgumentException: field [counter] not present as part of path [counter] at o.e.i.IngestDocument.getFieldValue(IngestDocument.java: 202 at o.e.i.c.SetProcessor.execute(SetProcessor.java: 86) 14 more",
                    "pipeline_trace": [
                        "complicated-processor"
                    ],
                    "pipeline": "complicated-processor",
                    "processor_type": "set",
                    "processor_tag": "copy to new counter again"
                }
            }
        }
    ]
}

returns the following response:

{
    "docs": [
        {
            "doc": {
                "_index": "my-datastream-ingest",
                "_version": "-3",
                "_id": "HnTJs5YBwrYNjPmaFcri",
                "_source": {
                    "@timestamp": "2025-04-21T00:00:00Z",
                    "counter_name": "test"
                },
                "_ingest": {
                    "pre_recovery": {
                        "@timestamp": "2025-05-09T06:41:24.775Z",
                        "_index": ".fs-my-datastream-ingest-2025.05.09-000001",
                        "document": {
                            "index": "my-datastream-ingest"
                        },
                        "_id": "HnTJs5YBwrYNjPmaFcri",
                        "error": {
                            "pipeline": "complicated-processor",
                            "processor_type": "set",
                            "processor_tag": "copy to new counter again",
                            "pipeline_trace": [
                                "complicated-processor"
                            ],
                            "stack_trace": "j.l.IllegalArgumentException: field [counter] not present as part of path [counter] at o.e.i.IngestDocument.getFieldValue(IngestDocument.java: 202 at o.e.i.c.SetProcessor.execute(SetProcessor.java: 86) 14 more",
                            "type": "illegal_argument_exception",
                            "message": "field [counter] not present as part of path [counter]"
                        },
                        "_version": -3
                    },
                    "timestamp": "2025-09-04T22:32:12.800709Z"
                }
            }
        }
    ]
}

Using this processor for an invalid failure-doc results in an error, e.g.:

POST localhost:9200/_ingest/pipeline/_simulate

{
    "pipeline": {
        "processors": [
            {
                "recover_failure_document": {}
            }
        ]
    },
    "docs": [
        {
            "_index": ".fs-my-datastream-ingest-2025.05.09-000001",
            "_id": "HnTJs5YBwrYNjPmaFcri",
            "_score": 1,
            "_source": {
                "@timestamp": "2025-05-09T06:41:24.775Z",
                "error": {
                    "type": "illegal_argument_exception",
                    "message": "field [counter] not present as part of path [counter]",
                    "stack_trace": "j.l.IllegalArgumentException: field [counter] not present as part of path [counter] at o.e.i.IngestDocument.getFieldValue(IngestDocument.java: 202 at o.e.i.c.SetProcessor.execute(SetProcessor.java: 86) 14 more",
                    "pipeline_trace": [
                        "complicated-processor"
                    ],
                    "pipeline": "complicated-processor",
                    "processor_type": "set",
                    "processor_tag": "copy to new counter again"
                }
            }
        }
    ]
}

returns

{
    "docs": [
        {
            "error": {
                "root_cause": [
                    {
                        "type": "illegal_argument_exception",
                        "reason": "field [document] not present as part of path [document]"
                    }
                ],
                "type": "illegal_argument_exception",
                "reason": "field [document] not present as part of path [document]"
            }
        }
    ]
}

Closes #132940

@github-actions
Copy link
Contributor

github-actions bot commented Aug 28, 2025

@seanzatzdev seanzatzdev changed the title Add remediate processor to remediate failure docs. Add remediate processor to remediate failure store docs. Sep 2, 2025
@seanzatzdev seanzatzdev changed the title Add remediate processor to remediate failure store docs. Add remediate processor to remediate failurestore docs. Sep 2, 2025
@seanzatzdev seanzatzdev self-assigned this Sep 2, 2025
@seanzatzdev seanzatzdev marked this pull request as ready for review September 2, 2025 14:04
@seanzatzdev seanzatzdev requested a review from a team as a code owner September 2, 2025 14:04
@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Sep 2, 2025
@seanzatzdev seanzatzdev added >feature :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP Team:Data Management Meta label for data/management team and removed needs:triage Requires assignment of a team area label labels Sep 2, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@elasticsearchmachine
Copy link
Collaborator

Hi @seanzatzdev, I've created a changelog YAML for you.

@github-actions
Copy link
Contributor

github-actions bot commented Sep 2, 2025

ℹ️ Important: Docs version tagging

👋 Thanks for updating the docs! Just a friendly reminder that our docs are now cumulative. This means all 9.x versions are documented on the same page and published off of the main branch, instead of creating separate pages for each minor version.

We use applies_to tags to mark version-specific features and changes.

Expand for a quick overview

When to use applies_to tags:

✅ At the page level to indicate which products/deployments the content applies to (mandatory)
✅ When features change state (e.g. preview, ga) in a specific version
✅ When availability differs across deployments and environments

What NOT to do:

❌ Don't remove or replace information that applies to an older version
❌ Don't add new information that applies to a specific version without an applies_to tag
❌ Don't forget that applies_to tags can be used at the page, section, and inline level

🤔 Need help?

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of ideas!

Copy link
Contributor

@samxbr samxbr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's just me, I find the processor name remediate a bit confusing. This processor itself transforms failure doc to its original doc, but there are more to actually "remediate" the failure doc outside of this processor like reindexing. I wonder if this processor should be named something more accurate to what it does. To throw some ideas, something like failure_source, failure_to_origin?

@seanzatzdev seanzatzdev changed the title Add remediate processor to remediate failurestore docs. Add recover_failure_document processor to restore failurestore docs to original form Sep 5, 2025
@seanzatzdev
Copy link
Contributor Author

Thanks for the feedback @jbaiera and @samxbr , I've updated my PR accordingly

Copy link
Contributor

@samxbr samxbr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR generally LGTM, I want to leave the final approval to @jbaiera since he is the expert on failure store.

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really great! I left just a few small comments, but once those are handled I think it LGTM!

@seanzatzdev seanzatzdev enabled auto-merge (squash) September 9, 2025 23:46
@seanzatzdev
Copy link
Contributor Author

@jbaiera thanks for the great feedback!

@seanzatzdev seanzatzdev merged commit b78acc2 into elastic:main Sep 10, 2025
35 checks passed
@seanzatzdev seanzatzdev deleted the remediate-processor branch September 10, 2025 16:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >feature Team:Data Management Meta label for data/management team v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add a remediate processor that transforms a failure store document into an indexable form.

4 participants