Skip to content

Commit b954f9e

Browse files
authored
[8.15] Fix handling of bulk requests with semantic text fields and delete ops (#117233)
* Fix handling of bulk requests with semantic text fields and delete ops (#116942) Previously, delete operations were not processed correctly when followed by operations containing semantic text fields. This issue caused the positions of subsequent operations in the items array to shift incorrectly by one. This PR resolves the discrepancy and includes additional tests to ensure proper behavior. * fix yml test
1 parent 618a3ae commit b954f9e

File tree

6 files changed

+87
-10
lines changed

6 files changed

+87
-10
lines changed

docs/changelog/116942.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116942
2+
summary: Fix handling of bulk requests with semantic text fields and delete ops
3+
area: Relevance
4+
type: bug
5+
issues: []

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.bulk.BulkItemResponse;
1212
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1313
import org.elasticsearch.action.bulk.BulkResponse;
14+
import org.elasticsearch.action.delete.DeleteRequestBuilder;
1415
import org.elasticsearch.action.index.IndexRequestBuilder;
1516
import org.elasticsearch.action.search.SearchRequest;
1617
import org.elasticsearch.action.search.SearchResponse;
@@ -30,8 +31,10 @@
3031
import java.util.Collection;
3132
import java.util.Collections;
3233
import java.util.HashMap;
34+
import java.util.HashSet;
3335
import java.util.Locale;
3436
import java.util.Map;
37+
import java.util.Set;
3538

3639
import static org.hamcrest.Matchers.equalTo;
3740

@@ -86,30 +89,38 @@ public void testBulkOperations() throws Exception {
8689

8790
int totalBulkReqs = randomIntBetween(2, 100);
8891
long totalDocs = 0;
92+
Set<String> ids = new HashSet<>();
8993
for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
9094
BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
9195
int totalBulkSize = randomIntBetween(1, 100);
9296
for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
93-
String id = Long.toString(totalDocs);
97+
if (ids.size() > 0 && rarely(random())) {
98+
String id = randomFrom(ids);
99+
ids.remove(id);
100+
DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id);
101+
bulkReqBuilder.add(request);
102+
continue;
103+
}
104+
String id = Long.toString(totalDocs++);
94105
boolean isIndexRequest = randomBoolean();
95106
Map<String, Object> source = new HashMap<>();
96107
source.put("sparse_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000));
97108
source.put("dense_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000));
98109
if (isIndexRequest) {
99110
bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source));
100-
totalDocs++;
111+
ids.add(id);
101112
} else {
102113
boolean isUpsert = randomBoolean();
103114
UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source);
104-
if (isUpsert || totalDocs == 0) {
115+
if (isUpsert || ids.size() == 0) {
105116
request.setDocAsUpsert(true);
106-
totalDocs++;
107117
} else {
108118
// Update already existing document
109-
id = Long.toString(randomLongBetween(0, totalDocs - 1));
119+
id = randomFrom(ids);
110120
}
111121
request.setId(id);
112122
bulkReqBuilder.add(request);
123+
ids.add(id);
113124
}
114125
}
115126
BulkResponse bulkResponse = bulkReqBuilder.get();
@@ -134,7 +145,7 @@ public void testBulkOperations() throws Exception {
134145
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
135146
SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get();
136147
try {
137-
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs));
148+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) ids.size()));
138149
} finally {
139150
searchResponse.decRef();
140151
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ public Set<NodeFeature> getFeatures() {
2626

2727
@Override
2828
public Set<NodeFeature> getTestFeatures() {
29-
return Set.of(SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX, SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX);
29+
return Set.of(
30+
SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX,
31+
SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX,
32+
SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX
33+
);
3034
}
3135
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,8 +412,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
412412
*/
413413
private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) {
414414
Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new LinkedHashMap<>();
415-
int itemIndex = 0;
416-
for (var item : bulkShardRequest.items()) {
415+
for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) {
416+
var item = bulkShardRequest.items()[itemIndex];
417417
if (item.getPrimaryResponse() != null) {
418418
// item was already aborted/processed by a filter in the chain upstream (e.g. security)
419419
continue;
@@ -440,6 +440,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
440440
// ignore delete request
441441
continue;
442442
}
443+
443444
final Map<String, Object> docMap = indexRequest.sourceAsMap();
444445
for (var entry : fieldInferenceMap.values()) {
445446
String field = entry.getName();
@@ -481,7 +482,6 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
481482
}
482483
}
483484
}
484-
itemIndex++;
485485
}
486486
return fieldRequestsMap;
487487
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
public class SemanticTextFieldMapper extends FieldMapper implements InferenceFieldMapper {
8484
public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix");
8585
public static final NodeFeature SEMANTIC_TEXT_ZERO_SIZE_FIX = new NodeFeature("semantic_text.zero_size_fix");
86+
public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix");
8687

8788
public static final String CONTENT_TYPE = "semantic_text";
8889

x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,3 +502,59 @@ setup:
502502
- match: { _source.level_1.dense_field.text: "another inference test" }
503503
- exists: _source.level_1.dense_field.inference.chunks.0.embeddings
504504
- match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" }
505+
506+
---
507+
"Deletes on bulk operation":
508+
- requires:
509+
cluster_features: semantic_text.delete_fix
510+
reason: Delete operations are properly applied when subsequent operations include a semantic text field.
511+
512+
- do:
513+
bulk:
514+
index: test-index
515+
refresh: true
516+
body: |
517+
{"index":{"_id": "1"}}
518+
{"dense_field": ["you know, for testing", "now with chunks"]}
519+
{"index":{"_id": "2"}}
520+
{"dense_field": ["some more tests", "that include chunks"]}
521+
522+
- do:
523+
search:
524+
index: test-index
525+
body:
526+
query:
527+
semantic:
528+
field: dense_field
529+
query: "you know, for testing"
530+
531+
- match: { hits.total.value: 2 }
532+
- match: { hits.total.relation: eq }
533+
- match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] }
534+
- match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] }
535+
536+
- do:
537+
bulk:
538+
index: test-index
539+
refresh: true
540+
body: |
541+
{"delete":{ "_id": "2"}}
542+
{"update":{"_id": "1"}}
543+
{"doc":{"dense_field": "updated text", "sparse_field": "another text"}}
544+
545+
- match: { errors: false }
546+
547+
548+
- do:
549+
search:
550+
index: test-index
551+
body:
552+
query:
553+
semantic:
554+
field: dense_field
555+
query: "you know, for testing"
556+
557+
- match: { hits.total.value: 1 }
558+
- match: { hits.total.relation: eq }
559+
- match: { hits.hits.0._source.dense_field.text: "updated text" }
560+
- match: { hits.hits.0._source.sparse_field.text: "another text" }

0 commit comments

Comments
 (0)