diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index fd56a627026b6..24b651da291af 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams' + include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search" } } @@ -38,4 +38,6 @@ artifacts { dependencies { testImplementation project(path: ':test:test-clusters') + clusterModules project(':modules:ingest-common') + clusterModules project(':modules:reindex') } diff --git a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java index 9d5a1033faf57..e01594fe51c76 100644 --- a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java +++ b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java @@ -29,7 +29,12 @@ public static Iterable parameters() throws Exception { } @ClassRule - public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build(); + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("streams") + .module("ingest-common") + .module("reindex") + .feature(FeatureFlag.LOGS_STREAM) + .build(); @Override protected String getTestRestCluster() { diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml new file mode 100644 index 0000000000000..7fb76496fab4c --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -0,0 +1,156 @@ +--- +"Check User Can't Write To Substream Directly": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + bulk: + body: | + { "index": { "_index": "logs.foo" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } + +--- +"Check User Can't Write To Substream Directly With Single Doc": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + catch: bad_request + index: + index: logs.foo + id: "1" + body: + foo: bar + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } + +--- +"Check Bulk Index With Reroute Processor To Substream Is Rejected": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "reroute-to-logs-foo" + body: + processors: + - reroute: + destination: "logs.foo" + - do: + indices.create: + index: "bad-index" + body: + settings: + index.default_pipeline: "reroute-to-logs-foo" + - do: + bulk: + body: | + { "index": { "_index": "bad-index" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" } + +--- +"Check Bulk Index With Script Processor To Substream Is Rejected": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "script-to-logs-foo" + body: + processors: + - script: + source: "ctx._index = 'logs.foo'" + - do: + indices.create: + index: "bad-index-script" + body: + settings: + index.default_pipeline: "script-to-logs-foo" + - do: + bulk: + body: | + { "index": { "_index": "bad-index-script" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" } + +--- +"Check Delete By Query Directly On Substream After Reroute Succeeds": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "reroute-to-logs-foo-success" + body: + processors: + - reroute: + destination: "logs.foo" + - do: + indices.create: + index: "logs" + body: + settings: + index.default_pipeline: "reroute-to-logs-foo-success" + - do: + bulk: + refresh: true + body: | + { "index": { "_index": "logs" } } + { "foo": "bar", "baz": "qux" } + - match: { errors: false } + - match: { items.0.index.status: 201 } + + - do: + delete_by_query: + index: logs.foo + refresh: true + body: + query: + match: + foo: "bar" + - match: { deleted: 1 } + - match: { total: 1 } + + - do: + search: + index: logs.foo + body: + query: + match_all: {} + - match: { hits.total.value: 0 } diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 90cd3c669a52c..c3d6d2f4d97df 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -213,6 +213,7 @@ exports org.elasticsearch.common.regex; exports org.elasticsearch.common.scheduler; exports org.elasticsearch.common.settings; + exports org.elasticsearch.common.streams; exports org.elasticsearch.common.text; exports org.elasticsearch.common.time; exports org.elasticsearch.common.transport; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index dda5294b2c0a2..76a0bc36469a3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -101,22 +102,33 @@ BulkRequest getBulkRequest() { } } + ActionListener wrapActionListenerIfNeeded(ActionListener actionListener) { + return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener); + } + + ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { + return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener); + } + /** * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest * service with the results returned from running the remaining write operations. * - * @param ingestTookInMillis Time elapsed for ingestion to be passed to final result. + * @param ingestTimeProviderFunction A function to provide the ingest time taken for this response * @param actionListener The action listener that expects the final bulk response. * @return An action listener that combines ingest failure results with the results from writing the remaining documents. */ - ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { + private ActionListener doWrapActionListenerIfNeeded( + Function ingestTimeProviderFunction, + ActionListener actionListener + ) { if (itemResponses.isEmpty()) { return actionListener.map( response -> new BulkResponse( response.getItems(), - response.getTook().getMillis(), - ingestTookInMillis, + response.getTookInMillis(), + ingestTimeProviderFunction.apply(response), response.getIncrementalState() ) ); @@ -143,6 +155,8 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, assertResponsesAreCorrect(bulkResponses, allResponses); } + var ingestTookInMillis = ingestTimeProviderFunction.apply(response); + return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState()); }); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index 567b433d94daf..87616ed0716c3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -19,7 +19,9 @@ import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; +import java.util.Objects; /** * A response of a bulk execution. Holding a response for each item responding (in order) of the @@ -166,4 +168,21 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.startArray(ITEMS); }), Iterators.forArray(responses), Iterators.single((builder, p) -> builder.endArray().endObject())); } + + @Override + public boolean equals(Object o) { + if (o instanceof BulkResponse that) { + return tookInMillis == that.tookInMillis + && ingestTookInMillis == that.ingestTookInMillis + && Objects.deepEquals(responses, that.responses) + && Objects.equals(incrementalState, that.incrementalState); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(responses), tookInMillis, ingestTookInMillis, incrementalState); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index f003cd3fc107d..ccbd7079fe821 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Releasable; @@ -44,12 +45,16 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable @@ -396,8 +401,40 @@ private void applyPipelinesAndDoInternalExecute( ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); - if (applyPipelines(task, bulkRequest, executor, listener) == false) { - doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); + + // Validate child stream writes before processing pipelines + ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); + Set enabledStreamTypes = Arrays.stream(StreamType.values()) + .filter(t -> t.streamTypeIsEnabled(projectMetadata)) + .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); + + BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); + + for (StreamType streamType : enabledStreamTypes) { + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest req = bulkRequestModifier.bulkRequest.requests.get(i); + String prefix = streamType.getStreamName() + "."; + + if (req instanceof IndexRequest ir && ir.index().startsWith(prefix) && ir.isPipelineResolved() == false) { + IllegalArgumentException e = new IllegalArgumentException( + "Direct writes to child streams are prohibited. Index directly into the [" + + streamType.getStreamName() + + "] stream instead" + ); + Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); + if (failureStore != null && failureStore) { + bulkRequestModifier.markItemForFailureStore(i, req.index(), e); + } else { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); + } + } + } + } + + var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); + + if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { + doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); } } diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamType.java b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java new file mode 100644 index 0000000000000..2bfe3596cf281 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.streams; + +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; + +public enum StreamType { + + LOGS("logs"); + + private final String streamName; + + StreamType(String streamName) { + this.streamName = streamName; + } + + public String getStreamName() { + return streamName; + } + + public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) { + StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + return switch (this) { + case LOGS -> metadata.isLogsEnabled(); + }; + } + +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b4233cb94e21b..91aee6f42fbd3 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -1198,6 +1199,30 @@ private void executePipelines( return; // document failed! } + for (StreamType streamType : StreamType.values()) { + if (streamType.streamTypeIsEnabled(project)) { + if (newIndex.startsWith(streamType.getStreamName() + ".") + && ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) { + exceptionHandler.accept( + new IngestPipelineException( + pipelineId, + new IllegalArgumentException( + format( + "Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " + + "this document from index [%s] to index [%s]. Reroute history: %s", + pipelineId, + originalIndex, + newIndex, + String.join(" -> ", ingestDocument.getIndexHistory()) + ) + ) + ) + ); + return; // document failed! + } + } + } + // add the index to the document's index history, and check for cycles in the visited indices boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false; if (cycle) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index a54cd08c3738a..7ecd24971844a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -80,6 +80,7 @@ import java.util.function.Function; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -544,11 +545,11 @@ public void testIngestForward() throws Exception { indexRequest.source(Collections.emptyMap()); indexRequest.setPipeline("testpipeline"); bulkRequest.add(indexRequest); - BulkResponse bulkResponse = mock(BulkResponse.class); + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[0], 1234); AtomicBoolean responseCalled = new AtomicBoolean(false); ActionListener listener = ActionTestUtils.assertNoFailureListener(response -> { responseCalled.set(true); - assertSame(bulkResponse, response); + assertThat(bulkResponse, equalTo(bulkResponse)); }); ActionTestUtils.execute(action, null, bulkRequest, listener);