Skip to content

Commit 723281c

Browse files
committed
SNAPSHOT - Bulk Ops
1 parent a8da9a1 commit 723281c

File tree

5 files changed

+61
-17
lines changed

5 files changed

+61
-17
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.elasticsearch.cluster.routing.IndexRouting;
4444
import org.elasticsearch.cluster.service.ClusterService;
4545
import org.elasticsearch.common.collect.Iterators;
46+
import org.elasticsearch.common.streams.StreamType;
47+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
4648
import org.elasticsearch.common.util.concurrent.AtomicArray;
4749
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4850
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@@ -59,6 +61,8 @@
5961

6062
import java.io.IOException;
6163
import java.util.ArrayList;
64+
import java.util.Arrays;
65+
import java.util.EnumSet;
6266
import java.util.HashMap;
6367
import java.util.Iterator;
6468
import java.util.List;
@@ -70,6 +74,7 @@
7074
import java.util.function.BiConsumer;
7175
import java.util.function.Consumer;
7276
import java.util.function.LongSupplier;
77+
import java.util.stream.Collectors;
7378

7479
import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN;
7580
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
@@ -104,6 +109,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
104109
private final FailureStoreMetrics failureStoreMetrics;
105110
private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
106111
private final boolean clusterHasFailureStoreFeature;
112+
private final StreamsPermissionsUtils streamsPermissionsUtils;
107113

108114
BulkOperation(
109115
Task task,
@@ -139,7 +145,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
139145
new FailureStoreDocumentConverter(),
140146
failureStoreMetrics,
141147
dataStreamFailureStoreSettings,
142-
clusterHasFailureStoreFeature
148+
clusterHasFailureStoreFeature,
149+
StreamsPermissionsUtils.getInstance()
143150
);
144151
}
145152

@@ -160,7 +167,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
160167
FailureStoreDocumentConverter failureStoreDocumentConverter,
161168
FailureStoreMetrics failureStoreMetrics,
162169
DataStreamFailureStoreSettings dataStreamFailureStoreSettings,
163-
boolean clusterHasFailureStoreFeature
170+
boolean clusterHasFailureStoreFeature,
171+
StreamsPermissionsUtils streamsPermissionsUtils
164172
) {
165173
super(listener);
166174
this.task = task;
@@ -182,6 +190,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
182190
this.failureStoreMetrics = failureStoreMetrics;
183191
this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
184192
this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature;
193+
this.streamsPermissionsUtils = streamsPermissionsUtils;
185194
}
186195

187196
@Override
@@ -274,6 +283,30 @@ private long buildTookInMillis(long startTimeNanos) {
274283
}
275284

276285
private Map<ShardId, List<BulkItemRequest>> groupBulkRequestsByShards(ClusterState clusterState) {
286+
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);
287+
288+
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
289+
.filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata))
290+
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
291+
292+
for (StreamType streamType : enabledStreamTypes) {
293+
for (int i = 0; i < bulkRequest.requests.size(); i++) {
294+
DocWriteRequest<?> req = bulkRequest.requests.get(i);
295+
String prefix = streamType.getStreamName() + ".";
296+
if (req != null && req.index().startsWith(prefix)) {
297+
IllegalArgumentException exception = new IllegalArgumentException(
298+
"Bulk requests for streams with type ["
299+
+ streamType.getStreamName()
300+
+ "] are not supported, use the ["
301+
+ streamType.getStreamName()
302+
+ "] API instead."
303+
);
304+
IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception);
305+
addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus);
306+
}
307+
}
308+
}
309+
277310
return groupRequestsByShards(
278311
clusterState,
279312
Iterators.enumerate(bulkRequest.requests.iterator(), BulkItemRequest::new),

server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java renamed to server/src/main/java/org/elasticsearch/common/streams/StreamType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99

1010
package org.elasticsearch.common.streams;
1111

12-
public enum StreamTypes {
12+
public enum StreamType {
1313

1414
LOGS("logs");
1515

1616
private final String streamName;
1717

18-
StreamTypes(String streamName) {
18+
StreamType(String streamName) {
1919
this.streamName = streamName;
2020
}
2121

server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static StreamsPermissionsUtils getInstance() {
3434

3535
public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination)
3636
throws IllegalArgumentException {
37-
for (StreamTypes streamType : StreamTypes.values()) {
37+
for (StreamType streamType : StreamType.values()) {
3838
String streamName = streamType.getStreamName();
3939
if (streamTypeIsEnabled(streamType, projectMetadata)
4040
&& destination.startsWith(streamName + ".")
@@ -51,7 +51,7 @@ public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata
5151
}
5252
}
5353

54-
public boolean streamTypeIsEnabled(StreamTypes streamType, ProjectMetadata projectMetadata) {
54+
public boolean streamTypeIsEnabled(StreamType streamType, ProjectMetadata projectMetadata) {
5555
StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
5656
return switch (streamType) {
5757
case LOGS -> metadata.isLogsEnabled();

server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@
4242
import org.elasticsearch.cluster.metadata.ProjectMetadata;
4343
import org.elasticsearch.cluster.metadata.Template;
4444
import org.elasticsearch.cluster.node.DiscoveryNode;
45+
import org.elasticsearch.cluster.project.ProjectResolver;
4546
import org.elasticsearch.cluster.project.TestProjectResolvers;
4647
import org.elasticsearch.cluster.service.ClusterService;
4748
import org.elasticsearch.common.settings.ClusterSettings;
4849
import org.elasticsearch.common.settings.Settings;
50+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
4951
import org.elasticsearch.common.util.concurrent.AtomicArray;
5052
import org.elasticsearch.common.util.concurrent.EsExecutors;
5153
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -215,10 +217,18 @@ public class BulkOperationTests extends ESTestCase {
215217

216218
private TestThreadPool threadPool;
217219

220+
private StreamsPermissionsUtils streamsPermissionsUtilsMock;
221+
private ProjectResolver projectResolverMock;
222+
private IndexNameExpressionResolver indexNameExpressionResolverMock;
223+
218224
@Before
219-
public void setupThreadpool() {
225+
public void setupTest() {
220226
threadPool = new TestThreadPool(getClass().getName());
221227
threadPool.getThreadContext().putHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, projectId.id());
228+
streamsPermissionsUtilsMock = mock(StreamsPermissionsUtils.class);
229+
when(streamsPermissionsUtilsMock.streamTypeIsEnabled(any(), any())).thenReturn(false);
230+
projectResolverMock = mock(ProjectResolver.class);
231+
indexNameExpressionResolverMock = mock(IndexNameExpressionResolver.class);
222232
}
223233

224234
@After
@@ -688,8 +698,8 @@ public void testRetryableBlockAcceptsFailureStoreDocument() throws Exception {
688698
}
689699

690700
/**
691-
* A bulk operation to a data stream with a failure store enabled may still partially fail if the cluster is experiencing a
692-
* non-retryable block when the redirected documents would be sent to the shard-level action.
701+
* A bulk operation to a data stream with a failure store enabled may still partially fail if the redirected documents experience
702+
* a shard-level failure while writing to the failure store indices.
693703
*/
694704
public void testBlockedClusterRejectsFailureStoreDocument() throws Exception {
695705
// Requests that go to two separate shards
@@ -1239,7 +1249,8 @@ private BulkOperation newBulkOperation(
12391249
failureStoreDocumentConverter,
12401250
FailureStoreMetrics.NOOP,
12411251
dataStreamFailureStoreSettings,
1242-
failureStoreNodeFeatureEnabled
1252+
failureStoreNodeFeatureEnabled,
1253+
streamsPermissionsUtilsMock
12431254
);
12441255
}
12451256

server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,22 @@ public void testGetInstanceReturnsSingleton() {
4747
public void testStreamTypeIsEnabledReturnsTrueWhenLogsEnabled() {
4848
when(streamsMetadataMock.isLogsEnabled()).thenReturn(true);
4949

50-
boolean result = utils.streamTypeIsEnabled(StreamTypes.LOGS, projectMetadataMock);
50+
boolean result = utils.streamTypeIsEnabled(StreamType.LOGS, projectMetadataMock);
5151
assertTrue(result);
5252
}
5353

5454
public void testStreamTypeIsEnabledReturnsFalseWhenLogsDisabled() {
5555
when(streamsMetadataMock.isLogsEnabled()).thenReturn(false);
5656

57-
boolean result = utils.streamTypeIsEnabled(StreamTypes.LOGS, projectMetadataMock);
57+
boolean result = utils.streamTypeIsEnabled(StreamType.LOGS, projectMetadataMock);
5858
assertFalse(result);
5959
}
6060

6161
public void testIfRetrouteToSubstreamNotAllowedThrows() {
6262
when(streamsMetadataMock.isLogsEnabled()).thenReturn(true);
6363

6464
Set<String> indexHistory = new HashSet<>(); // empty, so reroute not allowed
65-
String destination = StreamTypes.LOGS.getStreamName() + ".substream";
65+
String destination = StreamType.LOGS.getStreamName() + ".substream";
6666

6767
IllegalArgumentException ex = expectThrows(
6868
IllegalArgumentException.class,
@@ -77,7 +77,7 @@ public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenStreamTypeDi
7777
when(streamsMetadataMock.isLogsEnabled()).thenReturn(false);
7878

7979
Set<String> indexHistory = Collections.emptySet();
80-
String destination = StreamTypes.LOGS.getStreamName() + ".substream";
80+
String destination = StreamType.LOGS.getStreamName() + ".substream";
8181

8282
// Should not throw since stream type is disabled
8383
utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination);
@@ -87,7 +87,7 @@ public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenDestinationN
8787
when(streamsMetadataMock.isLogsEnabled()).thenReturn(true);
8888

8989
Set<String> indexHistory = Collections.emptySet();
90-
String destination = StreamTypes.LOGS.getStreamName(); // not a substream
90+
String destination = StreamType.LOGS.getStreamName(); // not a substream
9191

9292
// Should not throw since destination is not a substream
9393
utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination);
@@ -97,8 +97,8 @@ public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenIndexHistory
9797
when(streamsMetadataMock.isLogsEnabled()).thenReturn(true);
9898

9999
Set<String> indexHistory = new HashSet<>();
100-
indexHistory.add(StreamTypes.LOGS.getStreamName());
101-
String destination = StreamTypes.LOGS.getStreamName() + ".substream";
100+
indexHistory.add(StreamType.LOGS.getStreamName());
101+
String destination = StreamType.LOGS.getStreamName() + ".substream";
102102

103103
// Should not throw since indexHistory contains the stream name
104104
utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination);

0 commit comments

Comments
 (0)