Skip to content

Commit 78cf0ef

Browse files
committed
Additional PR changes and cleanup
1 parent fbfd61b commit 78cf0ef

File tree

4 files changed

+13
-10
lines changed

4 files changed

+13
-10
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,12 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
171171

172172
@Override
173173
public boolean equals(Object o) {
174-
return o == this || (o instanceof BulkResponse that
175-
&& tookInMillis == that.tookInMillis
176-
&& ingestTookInMillis == that.ingestTookInMillis
177-
&& Arrays.equals(responses, that.responses)
178-
&& Objects.equals(incrementalState, that.incrementalState));
179-
}
174+
return o == this
175+
|| (o instanceof BulkResponse that
176+
&& tookInMillis == that.tookInMillis
177+
&& ingestTookInMillis == that.ingestTookInMillis
178+
&& Arrays.equals(responses, that.responses)
179+
&& Objects.equals(incrementalState, that.incrementalState));
180180
}
181181

182182
@Override

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,16 +413,15 @@ private void applyPipelinesAndDoInternalExecute(
413413
for (StreamType streamType : enabledStreamTypes) {
414414
for (int i = 0; i < bulkRequest.requests.size(); i++) {
415415
DocWriteRequest<?> req = bulkRequestModifier.bulkRequest.requests.get(i);
416-
String StreamTypePrefix = streamType.getStreamName() + ".";
417416

418-
if (req instanceof IndexRequest ir && ir.index().startsWith(prefix) && ir.isPipelineResolved() == false) {
417+
if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) {
419418
IllegalArgumentException e = new IllegalArgumentException(
420419
"Direct writes to child streams are prohibited. Index directly into the ["
421420
+ streamType.getStreamName()
422421
+ "] stream instead"
423422
);
424423
Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
425-
if (Boolean.TRUE.equals(failureStore)) {
424+
if (Boolean.TRUE.equals(failureStoreEnabled)) {
426425
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
427426
} else {
428427
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED);

server/src/main/java/org/elasticsearch/common/streams/StreamType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,8 @@ public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) {
3333
};
3434
}
3535

36+
public boolean matchesStreamPrefix(String indexName) {
37+
return indexName.startsWith(streamName + ".");
38+
}
39+
3640
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,7 @@ private void executePipelines(
12411241

12421242
for (StreamType streamType : StreamType.values()) {
12431243
if (streamType.streamTypeIsEnabled(project)) {
1244-
if (newIndex.startsWith(streamType.getStreamName() + ".")
1244+
if (streamType.matchesStreamPrefix(newIndex)
12451245
&& ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) {
12461246
exceptionHandler.accept(
12471247
new IngestPipelineException(

0 commit comments

Comments
 (0)