-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Restrict Indexing To Child Streams When Streams Is Enabled #132011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
c0f1a12
f8fa32b
34082ef
2457383
a2eab2a
1a861cd
5faf7fb
1c4225b
fbfd61b
78cf0ef
5e1d615
387b4e3
d54b7b9
e5581aa
a7e6f7a
3569947
34e944e
3e82b35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| --- | ||
| "Check User Can't Write To Substream Directly": | ||
| - do: | ||
| streams.logs_enable: { } | ||
szybia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| - is_true: acknowledged | ||
|
|
||
| - do: | ||
| streams.status: { } | ||
| - is_true: logs.enabled | ||
|
|
||
| - do: | ||
| bulk: | ||
| body: | | ||
| { "index": { "_index": "logs.foo" } } | ||
| { "foo": "bar" } | ||
| - match: { errors: true } | ||
| - match: { items.0.index.status: 400 } | ||
| - match: { items.0.index.error.type: "illegal_argument_exception" } | ||
| - match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } | ||
|
|
||
| --- | ||
| "Check User Can't Write To Substream Directly With Single Doc": | ||
| - do: | ||
| streams.logs_enable: { } | ||
| - is_true: acknowledged | ||
|
|
||
| - do: | ||
| streams.status: { } | ||
| - is_true: logs.enabled | ||
|
|
||
| - do: | ||
| catch: bad_request | ||
| index: | ||
| index: logs.foo | ||
| id: "1" | ||
| body: | ||
| foo: bar | ||
| - match: { error.type: "illegal_argument_exception" } | ||
| - match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } | ||
|
|
||
| --- | ||
| "Check Bulk Index With Reroute Processor To Substream Is Rejected": | ||
| - do: | ||
| streams.logs_enable: { } | ||
| - is_true: acknowledged | ||
|
|
||
| - do: | ||
| streams.status: { } | ||
| - is_true: logs.enabled | ||
|
|
||
| - do: | ||
| ingest.put_pipeline: | ||
| id: "reroute-to-logs-foo" | ||
| body: | ||
| processors: | ||
| - reroute: | ||
| destination: "logs.foo" | ||
| - do: | ||
| indices.create: | ||
| index: "bad-index" | ||
| body: | ||
| settings: | ||
| index.default_pipeline: "reroute-to-logs-foo" | ||
| - do: | ||
| bulk: | ||
| body: | | ||
| { "index": { "_index": "bad-index" } } | ||
| { "foo": "bar" } | ||
| - match: { errors: true } | ||
| - match: { items.0.index.status: 400 } | ||
| - match: { items.0.index.error.type: "illegal_argument_exception" } | ||
| - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" } | ||
|
|
||
| --- | ||
| "Check Bulk Index With Script Processor To Substream Is Rejected": | ||
| - do: | ||
| streams.logs_enable: { } | ||
| - is_true: acknowledged | ||
|
|
||
| - do: | ||
| streams.status: { } | ||
| - is_true: logs.enabled | ||
|
|
||
| - do: | ||
| ingest.put_pipeline: | ||
| id: "script-to-logs-foo" | ||
| body: | ||
| processors: | ||
| - script: | ||
| source: "ctx._index = 'logs.foo'" | ||
| - do: | ||
| indices.create: | ||
| index: "bad-index-script" | ||
| body: | ||
| settings: | ||
| index.default_pipeline: "script-to-logs-foo" | ||
| - do: | ||
| bulk: | ||
| body: | | ||
| { "index": { "_index": "bad-index-script" } } | ||
| { "foo": "bar" } | ||
| - match: { errors: true } | ||
| - match: { items.0.index.status: 400 } | ||
| - match: { items.0.index.error.type: "illegal_argument_exception" } | ||
| - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" } | ||
|
|
||
| --- | ||
| "Check Delete By Query Directly On Substream After Reroute Succeeds": | ||
| - do: | ||
| streams.logs_enable: { } | ||
| - is_true: acknowledged | ||
|
|
||
| - do: | ||
| streams.status: { } | ||
| - is_true: logs.enabled | ||
|
|
||
| - do: | ||
| ingest.put_pipeline: | ||
| id: "reroute-to-logs-foo-success" | ||
| body: | ||
| processors: | ||
| - reroute: | ||
| destination: "logs.foo" | ||
| - do: | ||
| indices.create: | ||
| index: "logs" | ||
| body: | ||
| settings: | ||
| index.default_pipeline: "reroute-to-logs-foo-success" | ||
| - do: | ||
| bulk: | ||
| refresh: true | ||
| body: | | ||
| { "index": { "_index": "logs" } } | ||
| { "foo": "bar", "baz": "qux" } | ||
| - match: { errors: false } | ||
| - match: { items.0.index.status: 201 } | ||
|
|
||
| - do: | ||
| delete_by_query: | ||
| index: logs.foo | ||
| refresh: true | ||
| body: | ||
| query: | ||
| match: | ||
| foo: "bar" | ||
| - match: { deleted: 1 } | ||
| - match: { total: 1 } | ||
|
|
||
| - do: | ||
| search: | ||
| index: logs.foo | ||
| body: | ||
| query: | ||
| match_all: {} | ||
| - match: { hits.total.value: 0 } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,9 @@ | |
| import org.elasticsearch.xcontent.ToXContent; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.Iterator; | ||
| import java.util.Objects; | ||
|
|
||
| /** | ||
| * A response of a bulk execution. Holding a response for each item responding (in order) of the | ||
|
|
@@ -166,4 +168,21 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params | |
| return builder.startArray(ITEMS); | ||
| }), Iterators.forArray(responses), Iterators.<ToXContent>single((builder, p) -> builder.endArray().endObject())); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
|
||
| if (o instanceof BulkResponse that) { | ||
| return tookInMillis == that.tookInMillis | ||
| && ingestTookInMillis == that.ingestTookInMillis | ||
| && Arrays.equals(responses, that.responses) | ||
| && Objects.equals(incrementalState, that.incrementalState); | ||
| } else { | ||
| return false; | ||
| } | ||
lukewhiting marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(Arrays.hashCode(responses), tookInMillis, ingestTookInMillis, incrementalState); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| import org.elasticsearch.cluster.project.ProjectResolver; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.common.streams.StreamType; | ||
| import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
| import org.elasticsearch.core.Assertions; | ||
| import org.elasticsearch.core.Releasable; | ||
|
|
@@ -44,12 +45,16 @@ | |
| import org.elasticsearch.transport.TransportService; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.EnumSet; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.concurrent.Executor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.LongSupplier; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| /** | ||
| * This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable | ||
|
|
@@ -396,8 +401,40 @@ private void applyPipelinesAndDoInternalExecute( | |
| ActionListener<BulkResponse> listener | ||
| ) throws IOException { | ||
| final long relativeStartTimeNanos = relativeTimeNanos(); | ||
| if (applyPipelines(task, bulkRequest, executor, listener) == false) { | ||
| doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); | ||
|
|
||
| // Validate child stream writes before processing pipelines | ||
| ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); | ||
| Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values()) | ||
| .filter(t -> t.streamTypeIsEnabled(projectMetadata)) | ||
| .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); | ||
|
||
|
|
||
| BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); | ||
|
|
||
| for (StreamType streamType : enabledStreamTypes) { | ||
| for (int i = 0; i < bulkRequest.requests.size(); i++) { | ||
|
||
| DocWriteRequest<?> req = bulkRequestModifier.bulkRequest.requests.get(i); | ||
| String prefix = streamType.getStreamName() + "."; | ||
lukewhiting marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if (req instanceof IndexRequest ir && ir.index().startsWith(prefix) && ir.isPipelineResolved() == false) { | ||
lukewhiting marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
szybia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| IllegalArgumentException e = new IllegalArgumentException( | ||
| "Direct writes to child streams are prohibited. Index directly into the [" | ||
| + streamType.getStreamName() | ||
| + "] stream instead" | ||
| ); | ||
| Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); | ||
lukewhiting marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (failureStore != null && failureStore) { | ||
lukewhiting marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| bulkRequestModifier.markItemForFailureStore(i, req.index(), e); | ||
| } else { | ||
| bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); | ||
lukewhiting marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); | ||
|
|
||
| if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { | ||
szybia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.common.streams; | ||
|
|
||
| import org.elasticsearch.cluster.metadata.ProjectMetadata; | ||
| import org.elasticsearch.cluster.metadata.StreamsMetadata; | ||
|
|
||
| public enum StreamType { | ||
|
|
||
| LOGS("logs"); | ||
szybia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private final String streamName; | ||
|
|
||
| StreamType(String streamName) { | ||
| this.streamName = streamName; | ||
| } | ||
|
|
||
| public String getStreamName() { | ||
| return streamName; | ||
| } | ||
|
|
||
| public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) { | ||
lukewhiting marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); | ||
| return switch (this) { | ||
| case LOGS -> metadata.isLogsEnabled(); | ||
| }; | ||
| } | ||
|
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.