Skip to content

Commit 4859624

Browse files
authored
Fix incorrect accounting of semantic text indexing memory pressure (#130221) (#130249)
1 parent 8518c19 commit 4859624

File tree

4 files changed

+28
-20
lines changed

4 files changed

+28
-20
lines changed

docs/changelog/130221.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130221
2+
summary: Fix incorrect accounting of semantic text indexing memory pressure
3+
area: Distributed
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ static BytesReference fromByteArray(ByteArray byteArray, int length) {
159159
BytesReference slice(int from, int length);
160160

161161
/**
162-
* The amount of memory used by this BytesReference
162+
* The amount of memory used by this BytesReference.
163+
* <p>
164+
* Note that this is not always the same as length and can vary by implementation.
165+
* </p>
163166
*/
164167
long ramBytesUsed();
165168

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ private boolean incrementIndexingPressure(IndexRequestWithIndexingPressure index
608608
if (indexRequest.isIndexingPressureIncremented() == false) {
609609
try {
610610
// Track operation count as one operation per document source update
611-
coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().ramBytesUsed());
611+
coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().length());
612612
indexRequest.setIndexingPressureIncremented();
613613
} catch (EsRejectedExecutionException e) {
614614
addInferenceResponseFailure(
@@ -714,13 +714,13 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
714714
indexRequest.source(builder);
715715
}
716716
}
717-
long modifiedSourceSize = indexRequest.source().ramBytesUsed();
717+
long modifiedSourceSize = indexRequest.source().length();
718718

719719
// Add the indexing pressure from the source modifications.
720720
// Don't increment operation count because we count one source update as one operation, and we already accounted for those
721721
// in addFieldInferenceRequests.
722722
try {
723-
coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.ramBytesUsed());
723+
coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.length());
724724
} catch (EsRejectedExecutionException e) {
725725
indexRequest.source(originalSource, indexRequest.getContentType());
726726
item.abort(

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -556,14 +556,14 @@ public void testIndexingPressure() throws Exception {
556556

557557
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
558558
assertThat(coordinatingIndexingPressure, notNullValue());
559-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0Source));
560-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
561-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source));
562-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc3Source));
563-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc4Source));
564-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0UpdateSource));
559+
verify(coordinatingIndexingPressure).increment(1, length(doc0Source));
560+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
561+
verify(coordinatingIndexingPressure).increment(1, length(doc2Source));
562+
verify(coordinatingIndexingPressure).increment(1, length(doc3Source));
563+
verify(coordinatingIndexingPressure).increment(1, length(doc4Source));
564+
verify(coordinatingIndexingPressure).increment(1, length(doc0UpdateSource));
565565
if (useLegacyFormat == false) {
566-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1UpdateSource));
566+
verify(coordinatingIndexingPressure).increment(1, length(doc1UpdateSource));
567567
}
568568

569569
verify(coordinatingIndexingPressure, times(useLegacyFormat ? 6 : 7)).increment(eq(0), longThat(l -> l > 0));
@@ -658,7 +658,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep
658658

659659
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
660660
assertThat(coordinatingIndexingPressure, notNullValue());
661-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
661+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
662662
verify(coordinatingIndexingPressure, times(1)).increment(anyInt(), anyLong());
663663

664664
// Verify that the coordinating indexing pressure is maintained through downstream action filters
@@ -697,7 +697,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep
697697
public void testIndexingPressureTripsOnInferenceResponseHandling() throws Exception {
698698
final XContentBuilder doc1Source = IndexRequest.getXContentBuilder(XContentType.JSON, "sparse_field", "bar");
699699
final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure(
700-
Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (bytesUsed(doc1Source) + 1) + "b").build()
700+
Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (length(doc1Source) + 1) + "b").build()
701701
);
702702

703703
final StaticModel sparseModel = StaticModel.createRandomInstance(TaskType.SPARSE_EMBEDDING);
@@ -738,7 +738,7 @@ public void testIndexingPressureTripsOnInferenceResponseHandling() throws Except
738738

739739
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
740740
assertThat(coordinatingIndexingPressure, notNullValue());
741-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
741+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
742742
verify(coordinatingIndexingPressure).increment(eq(0), longThat(l -> l > 0));
743743
verify(coordinatingIndexingPressure, times(2)).increment(anyInt(), anyLong());
744744

@@ -798,14 +798,14 @@ public void testIndexingPressurePartialFailure() throws Exception {
798798
);
799799
XContentBuilder builder = XContentFactory.jsonBuilder();
800800
semanticTextField.toXContent(builder, EMPTY_PARAMS);
801-
return bytesUsed(builder);
801+
return length(builder);
802802
};
803803

804804
final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure(
805805
Settings.builder()
806806
.put(
807807
MAX_COORDINATING_BYTES.getKey(),
808-
(bytesUsed(doc1Source) + bytesUsed(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding)
808+
(length(doc1Source) + length(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding)
809809
+ (estimateInferenceResultsBytes.apply(List.of("bazzz"), bazzzEmbedding) / 2)) + "b"
810810
)
811811
.build()
@@ -847,8 +847,8 @@ public void testIndexingPressurePartialFailure() throws Exception {
847847

848848
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
849849
assertThat(coordinatingIndexingPressure, notNullValue());
850-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
851-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source));
850+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
851+
verify(coordinatingIndexingPressure).increment(1, length(doc2Source));
852852
verify(coordinatingIndexingPressure, times(2)).increment(eq(0), longThat(l -> l > 0));
853853
verify(coordinatingIndexingPressure, times(4)).increment(anyInt(), anyLong());
854854

@@ -1053,8 +1053,8 @@ private static BulkItemRequest[] randomBulkItemRequest(
10531053
new BulkItemRequest(requestId, new IndexRequest("index").source(expectedDocMap, requestContentType)) };
10541054
}
10551055

1056-
private static long bytesUsed(XContentBuilder builder) {
1057-
return BytesReference.bytes(builder).ramBytesUsed();
1056+
private static long length(XContentBuilder builder) {
1057+
return BytesReference.bytes(builder).length();
10581058
}
10591059

10601060
@SuppressWarnings({ "unchecked" })

0 commit comments

Comments
 (0)