Skip to content

Commit dab09e2

Browse files
authored
[8.16] Fix handling of bulk requests with semantic text fields and delete ops (#116961)
* Fix handling of bulk requests with semantic text fields and delete ops (#116942) Previously, delete operations were not processed correctly when followed by operations containing semantic text fields. This issue caused the positions of subsequent operations in the items array to shift incorrectly by one. This PR resolves the discrepancy and includes additional tests to ensure proper behavior. * fix compilation
1 parent a80bd58 commit dab09e2

File tree

6 files changed

+80
-11
lines changed

6 files changed

+80
-11
lines changed

docs/changelog/116942.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116942
2+
summary: Fix handling of bulk requests with semantic text fields and delete ops
3+
area: Relevance
4+
type: bug
5+
issues: []

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.bulk.BulkItemResponse;
1212
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1313
import org.elasticsearch.action.bulk.BulkResponse;
14+
import org.elasticsearch.action.delete.DeleteRequestBuilder;
1415
import org.elasticsearch.action.index.IndexRequestBuilder;
1516
import org.elasticsearch.action.search.SearchRequest;
1617
import org.elasticsearch.action.search.SearchResponse;
@@ -30,8 +31,10 @@
3031
import java.util.Collection;
3132
import java.util.Collections;
3233
import java.util.HashMap;
34+
import java.util.HashSet;
3335
import java.util.Locale;
3436
import java.util.Map;
37+
import java.util.Set;
3538

3639
import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticTextInput;
3740
import static org.hamcrest.Matchers.equalTo;
@@ -87,30 +90,38 @@ public void testBulkOperations() throws Exception {
8790

8891
int totalBulkReqs = randomIntBetween(2, 100);
8992
long totalDocs = 0;
93+
Set<String> ids = new HashSet<>();
9094
for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
9195
BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
9296
int totalBulkSize = randomIntBetween(1, 100);
9397
for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
94-
String id = Long.toString(totalDocs);
98+
if (ids.size() > 0 && rarely(random())) {
99+
String id = randomFrom(ids);
100+
ids.remove(id);
101+
DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id);
102+
bulkReqBuilder.add(request);
103+
continue;
104+
}
105+
String id = Long.toString(totalDocs++);
95106
boolean isIndexRequest = randomBoolean();
96107
Map<String, Object> source = new HashMap<>();
97108
source.put("sparse_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
98109
source.put("dense_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
99110
if (isIndexRequest) {
100111
bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source));
101-
totalDocs++;
112+
ids.add(id);
102113
} else {
103114
boolean isUpsert = randomBoolean();
104115
UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source);
105-
if (isUpsert || totalDocs == 0) {
116+
if (isUpsert || ids.size() == 0) {
106117
request.setDocAsUpsert(true);
107-
totalDocs++;
108118
} else {
109119
// Update already existing document
110-
id = Long.toString(randomLongBetween(0, totalDocs - 1));
120+
id = randomFrom(ids);
111121
}
112122
request.setId(id);
113123
bulkReqBuilder.add(request);
124+
ids.add(id);
114125
}
115126
}
116127
BulkResponse bulkResponse = bulkReqBuilder.get();
@@ -135,7 +146,7 @@ public void testBulkOperations() throws Exception {
135146
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
136147
SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get();
137148
try {
138-
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs));
149+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) ids.size()));
139150
} finally {
140151
searchResponse.decRef();
141152
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public Set<NodeFeature> getFeatures() {
3838
public Set<NodeFeature> getTestFeatures() {
3939
return Set.of(
4040
SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX,
41-
SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX
41+
SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX,
42+
SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX
4243
);
4344
}
4445
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
413413
*/
414414
private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) {
415415
Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new LinkedHashMap<>();
416-
int itemIndex = 0;
417-
for (var item : bulkShardRequest.items()) {
416+
for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) {
417+
var item = bulkShardRequest.items()[itemIndex];
418418
if (item.getPrimaryResponse() != null) {
419419
// item was already aborted/processed by a filter in the chain upstream (e.g. security)
420420
continue;
@@ -441,6 +441,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
441441
// ignore delete request
442442
continue;
443443
}
444+
444445
final Map<String, Object> docMap = indexRequest.sourceAsMap();
445446
for (var entry : fieldInferenceMap.values()) {
446447
String field = entry.getName();
@@ -483,7 +484,6 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
483484
}
484485
}
485486
}
486-
itemIndex++;
487487
}
488488
return fieldRequestsMap;
489489
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFie
8888
public static final NodeFeature SEMANTIC_TEXT_SEARCH_INFERENCE_ID = new NodeFeature("semantic_text.search_inference_id");
8989
public static final NodeFeature SEMANTIC_TEXT_DEFAULT_ELSER_2 = new NodeFeature("semantic_text.default_elser_2");
9090
public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix");
91-
9291
public static final NodeFeature SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX = new NodeFeature("semantic_text.single_field_update_fix");
92+
public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix");
9393

9494
public static final String CONTENT_TYPE = "semantic_text";
9595
public static final String DEFAULT_ELSER_2_INFERENCE_ID = DEFAULT_ELSER_ID;

x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,3 +624,55 @@ setup:
624624
- match: { _source.level_1.dense_field.text: "another inference test" }
625625
- exists: _source.level_1.dense_field.inference.chunks.0.embeddings
626626
- match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" }
627+
628+
---
629+
"Deletes on bulk operation":
630+
- requires:
631+
cluster_features: semantic_text.delete_fix
632+
reason: Delete operations are properly applied when subsequent operations include a semantic text field.
633+
634+
- do:
635+
bulk:
636+
index: test-index
637+
refresh: true
638+
body: |
639+
{"index":{"_id": "1"}}
640+
{"dense_field": ["you know, for testing", "now with chunks"]}
641+
{"index":{"_id": "2"}}
642+
{"dense_field": ["some more tests", "that include chunks"]}
643+
644+
- do:
645+
search:
646+
index: test-index
647+
body:
648+
query:
649+
semantic:
650+
field: dense_field
651+
query: "you know, for testing"
652+
653+
- match: { hits.total.value: 2 }
654+
- match: { hits.total.relation: eq }
655+
- match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] }
656+
- match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] }
657+
658+
- do:
659+
bulk:
660+
index: test-index
661+
refresh: true
662+
body: |
663+
{"delete":{ "_id": "2"}}
664+
{"update":{"_id": "1"}}
665+
{"doc":{"dense_field": "updated text"}}
666+
667+
- do:
668+
search:
669+
index: test-index
670+
body:
671+
query:
672+
semantic:
673+
field: dense_field
674+
query: "you know, for testing"
675+
676+
- match: { hits.total.value: 1 }
677+
- match: { hits.total.relation: eq }
678+
- match: { hits.hits.0._source.dense_field.text: "updated text" }

0 commit comments

Comments
 (0)