Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion modules/streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ esplugin {

restResources {
restApi {
include '_common', 'streams'
include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
}
}

Expand All @@ -38,4 +38,6 @@ artifacts {

dependencies {
testImplementation project(path: ':test:test-clusters')
clusterModules project(':modules:ingest-common')
clusterModules project(':modules:reindex')
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public static Iterable<Object[]> parameters() throws Exception {
}

@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("streams")
.module("ingest-common")
.module("reindex")
.feature(FeatureFlag.LOGS_STREAM)
.build();

@Override
protected String getTestRestCluster() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
---
"Check User Can't Write To Substream Directly":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
bulk:
body: |
{ "index": { "_index": "logs.foo" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }

---
"Check User Can't Write To Substream Directly With Single Doc":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
catch: bad_request
index:
index: logs.foo
id: "1"
body:
foo: bar
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }

---
"Check Bulk Index With Reroute Processor To Substream Is Rejected":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "reroute-to-logs-foo"
body:
processors:
- reroute:
destination: "logs.foo"
- do:
indices.create:
index: "bad-index"
body:
settings:
index.default_pipeline: "reroute-to-logs-foo"
- do:
bulk:
body: |
{ "index": { "_index": "bad-index" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" }

---
"Check Bulk Index With Script Processor To Substream Is Rejected":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "script-to-logs-foo"
body:
processors:
- script:
source: "ctx._index = 'logs.foo'"
- do:
indices.create:
index: "bad-index-script"
body:
settings:
index.default_pipeline: "script-to-logs-foo"
- do:
bulk:
body: |
{ "index": { "_index": "bad-index-script" } }
{ "foo": "bar" }
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" }

---
"Check Delete By Query Directly On Substream After Reroute Succeeds":
- do:
streams.logs_enable: { }
- is_true: acknowledged

- do:
streams.status: { }
- is_true: logs.enabled

- do:
ingest.put_pipeline:
id: "reroute-to-logs-foo-success"
body:
processors:
- reroute:
destination: "logs.foo"
- do:
indices.create:
index: "logs"
body:
settings:
index.default_pipeline: "reroute-to-logs-foo-success"
- do:
bulk:
refresh: true
body: |
{ "index": { "_index": "logs" } }
{ "foo": "bar", "baz": "qux" }
- match: { errors: false }
- match: { items.0.index.status: 201 }

- do:
delete_by_query:
index: logs.foo
refresh: true
body:
query:
match:
foo: "bar"
- match: { deleted: 1 }
- match: { total: 1 }

- do:
search:
index: logs.foo
body:
query:
match_all: {}
- match: { hits.total.value: 0 }
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
exports org.elasticsearch.common.regex;
exports org.elasticsearch.common.scheduler;
exports org.elasticsearch.common.settings;
exports org.elasticsearch.common.streams;
exports org.elasticsearch.common.text;
exports org.elasticsearch.common.time;
exports org.elasticsearch.common.transport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -101,22 +102,33 @@ BulkRequest getBulkRequest() {
}
}

ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener);
}

ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener);
}

/**
* If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
* updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
* service with the results returned from running the remaining write operations.
*
* @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
* @param ingestTimeProviderFunction A function to provide the ingest time taken for this response
* @param actionListener The action listener that expects the final bulk response.
* @return An action listener that combines ingest failure results with the results from writing the remaining documents.
*/
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
private ActionListener<BulkResponse> doWrapActionListenerIfNeeded(
Function<BulkResponse, Long> ingestTimeProviderFunction,
ActionListener<BulkResponse> actionListener
) {
if (itemResponses.isEmpty()) {
return actionListener.map(
response -> new BulkResponse(
response.getItems(),
response.getTook().getMillis(),
ingestTookInMillis,
response.getTookInMillis(),
ingestTimeProviderFunction.apply(response),
response.getIncrementalState()
)
);
Expand All @@ -143,6 +155,8 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
assertResponsesAreCorrect(bulkResponses, allResponses);
}

var ingestTookInMillis = ingestTimeProviderFunction.apply(response);

return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;

/**
* A response of a bulk execution. Holding a response for each item responding (in order) of the
Expand Down Expand Up @@ -166,4 +168,21 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
return builder.startArray(ITEMS);
}), Iterators.forArray(responses), Iterators.<ToXContent>single((builder, p) -> builder.endArray().endObject()));
}

@Override
public boolean equals(Object o) {
if (o instanceof BulkResponse that) {
return tookInMillis == that.tookInMillis
&& ingestTookInMillis == that.ingestTookInMillis
&& Objects.deepEquals(responses, that.responses)
&& Objects.equals(incrementalState, that.incrementalState);
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(responses), tookInMillis, ingestTookInMillis, incrementalState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.streams.StreamType;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Releasable;
Expand All @@ -44,12 +45,16 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
* This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable
Expand Down Expand Up @@ -396,8 +401,40 @@ private void applyPipelinesAndDoInternalExecute(
ActionListener<BulkResponse> listener
) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);

// Validate child stream writes before processing pipelines
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
.filter(t -> t.streamTypeIsEnabled(projectMetadata))
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));

BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);

for (StreamType streamType : enabledStreamTypes) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> req = bulkRequestModifier.bulkRequest.requests.get(i);
String prefix = streamType.getStreamName() + ".";

if (req instanceof IndexRequest ir && ir.index().startsWith(prefix) && ir.isPipelineResolved() == false) {
IllegalArgumentException e = new IllegalArgumentException(
"Direct writes to child streams are prohibited. Index directly into the ["
+ streamType.getStreamName()
+ "] stream instead"
);
Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
if (failureStore != null && failureStore) {
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
} else {
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED);
}
}
}
}

var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);

if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) {
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.streams;

import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.StreamsMetadata;

public enum StreamType {

LOGS("logs");

private final String streamName;

StreamType(String streamName) {
this.streamName = streamName;
}

public String getStreamName() {
return streamName;
}

public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) {
StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
return switch (this) {
case LOGS -> metadata.isLogsEnabled();
};
}

}
Loading