Skip to content

Conversation

@lukewhiting
Copy link
Contributor

This PR prevents indexing into child streams when streams mode is enabled.

In this case, we define a child stream as any index matching logs.* and restrictions apply both to direct indexing via put to index or bulk along with indirect indexing attempts via pipelines using reroute, script or other processors that change the target index or routing.

Deletes are still permitted from these child streams but updates such as _query_by_update will be prevented.

Example

Input

  • logs redirects to logs.abc.def via reroute processor on default pipeline
  • bad-index redirects to logs.abc via a script processor changing ctx._index
PUT {{host}}/_bulk
Content-Type: application/json

{ "create":{"_index": "logs" } } 
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg1.jpg HTTP/1.0\" 200 24736" }
{ "create":{"_index": "logs.abc" } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" }
{ "create":{"_index": "bad-index" } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }

Output

{
  "errors": true,
  "took": 200,
  "ingest_took": 0,
  "items": [
    {
      "create": {
        "_index": "logs.abc.def",
        "_id": "wmsjUZgBpF-FKxj59Ma4",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    },
    {
      "create": {
        "_index": "logs.abc",
        "_id": "auto-generated",
        "status": 400,
        "failure_store": "not_enabled",
        "error": {
          "type": "illegal_argument_exception",
          "reason": "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead"
        }
      }
    },
    {
      "create": {
        "_index": "logs.abc",
        "_id": "auto-generated",
        "status": 400,
        "error": {
          "type": "illegal_argument_exception",
          "reason": "Pipelines can't re-route documents to child streams, but pipeline [pipeline1] tried to reroute this document from index [bad-index] to index [logs.abc]. Reroute history: bad-index"
        }
      }
    }
  ]
}

Fixes ES-11941

@elasticsearchmachine elasticsearchmachine added the Team:Data Management Meta label for data/management team label Jul 28, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @lukewhiting, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@lukewhiting
Copy link
Contributor Author

Requested review from @jbaiera as this touches the failure store

Copilot

This comment was marked as outdated.

@lukewhiting lukewhiting force-pushed the es-11941-streams-logs-bulk-transport-changes branch from 5936fd2 to 1a861cd Compare July 28, 2025 13:19
Copy link
Contributor

@szybia szybia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess don't have a lot of context on a more higher-level to approve, but code lgtm!

left a few small suggestions and Qs for learning

@lukewhiting lukewhiting requested a review from Copilot July 29, 2025 09:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements restrictions on direct indexing to child streams when streams mode is enabled, specifically preventing writes to indices matching the logs.* pattern while allowing operations through the parent logs stream.

  • Adds validation logic to prevent direct writes to child streams via bulk operations and single document indexing
  • Introduces pipeline-level validation to prevent rerouting documents to child streams through ingest processors
  • Allows delete operations on child streams while blocking create/update operations

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
StreamType.java New enum defining stream types with validation methods for enabled streams and child stream matching
TransportAbstractBulkAction.java Adds pre-pipeline validation to reject direct writes to child streams
IngestService.java Implements pipeline validation to prevent rerouting documents to child streams
BulkRequestModifier.java Refactors listener wrapping methods to support flexible ingest time calculation
BulkResponse.java Adds equals and hashCode methods for proper response comparison
TransportBulkActionIngestTests.java Updates test to use concrete BulkResponse instead of mock for equality testing
20_substream_restrictions.yml Comprehensive integration tests covering various restriction scenarios
StreamsYamlTestSuiteIT.java Adds required modules for integration testing
build.gradle Includes additional REST API endpoints and test dependencies

Copy link
Contributor

@szybia szybia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

but someone more experienced should have the final approve 🚀

@elasticsearchmachine
Copy link
Collaborator

Hi @lukewhiting, I've created a changelog YAML for you.


for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(project)) {
if (streamType.matchesStreamPrefix(newIndex)
&& ingestDocument.getIndexHistory().contains(streamType.getStreamName()) == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would let me through if I rerouted from say logs to logs.abc.def. That is, it allows me to write to any descendent stream, not just a direct child stream. I assume that's OK, right? The real goal is to prevent things from outside of the stream writing to child streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct but yeah, I don't think it's in scope to enforce the hierarchy in ES. At least not at this stage.

Copy link
Member

@masseyke masseyke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a couple of comments, but LGTM.

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. There's a couple easy to miss things that might need to be addressed for failure store, and I left a few small questions and suggestions but otherwise it's looking good. Marking as approved for once the important things are addressed.

BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);

for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're iterating using the locally available bulk request, but you're dereferencing the bulk request from the request modifier to get the documents. This assumes that the request modifier will never make any changes to its internal state. I think we can avoid that kind of snag if we iterate over the request items and check each stream type per document instead of iterating over the request items multiple times for each eventual stream type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked around where we use bulk request modifier in other places and it's actually an iterable itself. In the IngestService we just iterate over it like a regular iterable, and maintain a slot counter separately. Might read more clearly if we do that here too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have switched this to use the bulk modifier as an iterator and moved the stream types to be the inner iterator so best of both worlds :-)

}

@Override
public boolean equals(Object o) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we adding this for completeness sake or are we using it somewhere? Checking bulk response equality seems like something that could be unintentionally expensive for a large or complicated request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh this was needed to fix a unit test which relied on a sameInstance assertion which became invalid after we started wrapping everything at the higher level. However it's now no longer required with the short circuit wrapping logic added here: #132011 (comment) so have reverted the change as it goes back to being the same instance if wrapped with no modifications.

Comment on lines 1250 to 1251
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute "
+ "this document from index [%s] to index [%s]. Reroute history: %s",
Copy link
Member

@jbaiera jbaiera Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the term "reroute" - That reads to me to mean using the reroute processor, but I think you can get to here via any method of changing the index name. Also maybe we should elaborate on what we mean by child stream.

A rough suggestion like

Suggested change
"Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute "
+ "this document from index [%s] to index [%s]. Reroute history: %s",
"pipeline [%s] can't change the target index (from [%s] to [%s] child stream [%s]) for document [%s]. History: [%s]",

Or along those lines? e.g. (from [my-index-name] to [logs] child stream [logs.nginx.prod])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the message although omitted the document as this error is rendered in line with the document ID / slot.

@lukewhiting lukewhiting merged commit 12a57e0 into elastic:main Aug 13, 2025
33 checks passed
@lukewhiting lukewhiting deleted the es-11941-streams-logs-bulk-transport-changes branch August 14, 2025 10:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Data Management/Data streams Data streams and their lifecycles >enhancement Team:Data Management Meta label for data/management team v9.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants