Skip to content

Commit 5534829

Browse files
committed
SNAPSHOT - Refactor to move substream check earlier in indexing process
1 parent d4bc4fd commit 5534829

File tree

2 files changed

+54
-27
lines changed

2 files changed

+54
-27
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.elasticsearch.cluster.routing.IndexRouting;
4444
import org.elasticsearch.cluster.service.ClusterService;
4545
import org.elasticsearch.common.collect.Iterators;
46-
import org.elasticsearch.common.streams.StreamType;
4746
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
4847
import org.elasticsearch.common.util.concurrent.AtomicArray;
4948
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -61,8 +60,6 @@
6160

6261
import java.io.IOException;
6362
import java.util.ArrayList;
64-
import java.util.Arrays;
65-
import java.util.EnumSet;
6663
import java.util.HashMap;
6764
import java.util.Iterator;
6865
import java.util.List;
@@ -74,7 +71,6 @@
7471
import java.util.function.BiConsumer;
7572
import java.util.function.Consumer;
7673
import java.util.function.LongSupplier;
77-
import java.util.stream.Collectors;
7874

7975
import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN;
8076
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
@@ -283,29 +279,29 @@ private long buildTookInMillis(long startTimeNanos) {
283279
}
284280

285281
private Map<ShardId, List<BulkItemRequest>> groupBulkRequestsByShards(ClusterState clusterState) {
286-
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);
287-
288-
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
289-
.filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata))
290-
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
291-
292-
for (StreamType streamType : enabledStreamTypes) {
293-
for (int i = 0; i < bulkRequest.requests.size(); i++) {
294-
DocWriteRequest<?> req = bulkRequest.requests.get(i);
295-
String prefix = streamType.getStreamName() + ".";
296-
if (req != null && req.index().startsWith(prefix)) {
297-
IllegalArgumentException exception = new IllegalArgumentException(
298-
"Writes to child stream ["
299-
+ req.index()
300-
+ "] are not allowed, use the parent stream instead: ["
301-
+ streamType.getStreamName()
302-
+ "]"
303-
);
304-
IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception);
305-
addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus);
306-
}
307-
}
308-
}
282+
// ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);
283+
//
284+
// Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
285+
// .filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata))
286+
// .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
287+
//
288+
// for (StreamType streamType : enabledStreamTypes) {
289+
// for (int i = 0; i < bulkRequest.requests.size(); i++) {
290+
// DocWriteRequest<?> req = bulkRequest.requests.get(i);
291+
// String prefix = streamType.getStreamName() + ".";
292+
// if (req != null && req.index().startsWith(prefix)) {
293+
// IllegalArgumentException exception = new IllegalArgumentException(
294+
// "Writes to child stream ["
295+
// + req.index()
296+
// + "] are not allowed, use the parent stream instead: ["
297+
// + streamType.getStreamName()
298+
// + "]"
299+
// );
300+
// IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception);
301+
// addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus);
302+
// }
303+
// }
304+
// }
309305

310306
return groupRequestsByShards(
311307
clusterState,

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.elasticsearch.cluster.project.ProjectResolver;
3232
import org.elasticsearch.cluster.service.ClusterService;
3333
import org.elasticsearch.common.io.stream.Writeable;
34+
import org.elasticsearch.common.streams.StreamType;
35+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
3436
import org.elasticsearch.common.util.concurrent.EsExecutors;
3537
import org.elasticsearch.core.Assertions;
3638
import org.elasticsearch.core.Releasable;
@@ -44,12 +46,16 @@
4446
import org.elasticsearch.transport.TransportService;
4547

4648
import java.io.IOException;
49+
import java.util.Arrays;
50+
import java.util.EnumSet;
4751
import java.util.HashMap;
4852
import java.util.Map;
4953
import java.util.Objects;
54+
import java.util.Set;
5055
import java.util.concurrent.Executor;
5156
import java.util.concurrent.TimeUnit;
5257
import java.util.function.LongSupplier;
58+
import java.util.stream.Collectors;
5359

5460
/**
5561
* This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable
@@ -396,6 +402,31 @@ private void applyPipelinesAndDoInternalExecute(
396402
ActionListener<BulkResponse> listener
397403
) throws IOException {
398404
final long relativeStartTimeNanos = relativeTimeNanos();
405+
406+
// Validate child stream writes before processing pipelines
407+
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
408+
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
409+
.filter(t -> StreamsPermissionsUtils.getInstance().streamTypeIsEnabled(t, projectMetadata))
410+
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
411+
412+
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
413+
414+
for (StreamType streamType : enabledStreamTypes) {
415+
for (int i = 0; i < bulkRequest.requests.size(); i++) {
416+
DocWriteRequest<?> req = bulkRequest.requests.get(i);
417+
String prefix = streamType.getStreamName() + ".";
418+
if (req != null && req.index() != null && req.index().startsWith(prefix)) {
419+
IllegalArgumentException e = new IllegalArgumentException("Can't write to child stream");
420+
Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
421+
if (failureStore != null && failureStore) {
422+
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
423+
} else {
424+
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
425+
}
426+
}
427+
}
428+
}
429+
399430
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
400431
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
401432
}

0 commit comments

Comments
 (0)