Skip to content

Commit 741a4a0

Browse files
authored
Fix incorrect accounting of semantic text indexing memory pressure (elastic#130221) (elastic#130250)
1 parent 7b59ee7 commit 741a4a0

File tree

4 files changed

+28
-20
lines changed

4 files changed

+28
-20
lines changed

docs/changelog/130221.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130221
2+
summary: Fix incorrect accounting of semantic text indexing memory pressure
3+
area: Distributed
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ static BytesReference fromByteArray(ByteArray byteArray, int length) {
159159
BytesReference slice(int from, int length);
160160

161161
/**
162-
* The amount of memory used by this BytesReference
162+
* The amount of memory used by this BytesReference.
163+
* <p>
164+
* Note that this is not always the same as length and can vary by implementation.
165+
* </p>
163166
*/
164167
long ramBytesUsed();
165168

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ private boolean incrementIndexingPressure(IndexRequestWithIndexingPressure index
631631
if (indexRequest.isIndexingPressureIncremented() == false) {
632632
try {
633633
// Track operation count as one operation per document source update
634-
coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().ramBytesUsed());
634+
coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().length());
635635
indexRequest.setIndexingPressureIncremented();
636636
} catch (EsRejectedExecutionException e) {
637637
addInferenceResponseFailure(
@@ -737,13 +737,13 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
737737
indexRequest.source(builder);
738738
}
739739
}
740-
long modifiedSourceSize = indexRequest.source().ramBytesUsed();
740+
long modifiedSourceSize = indexRequest.source().length();
741741

742742
// Add the indexing pressure from the source modifications.
743743
// Don't increment operation count because we count one source update as one operation, and we already accounted for those
744744
// in addFieldInferenceRequests.
745745
try {
746-
coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.ramBytesUsed());
746+
coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.length());
747747
} catch (EsRejectedExecutionException e) {
748748
indexRequest.source(originalSource, indexRequest.getContentType());
749749
item.abort(

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -616,14 +616,14 @@ public void testIndexingPressure() throws Exception {
616616

617617
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
618618
assertThat(coordinatingIndexingPressure, notNullValue());
619-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0Source));
620-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
621-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source));
622-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc3Source));
623-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc4Source));
624-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0UpdateSource));
619+
verify(coordinatingIndexingPressure).increment(1, length(doc0Source));
620+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
621+
verify(coordinatingIndexingPressure).increment(1, length(doc2Source));
622+
verify(coordinatingIndexingPressure).increment(1, length(doc3Source));
623+
verify(coordinatingIndexingPressure).increment(1, length(doc4Source));
624+
verify(coordinatingIndexingPressure).increment(1, length(doc0UpdateSource));
625625
if (useLegacyFormat == false) {
626-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1UpdateSource));
626+
verify(coordinatingIndexingPressure).increment(1, length(doc1UpdateSource));
627627
}
628628

629629
verify(coordinatingIndexingPressure, times(useLegacyFormat ? 6 : 7)).increment(eq(0), longThat(l -> l > 0));
@@ -720,7 +720,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep
720720

721721
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
722722
assertThat(coordinatingIndexingPressure, notNullValue());
723-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
723+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
724724
verify(coordinatingIndexingPressure, times(1)).increment(anyInt(), anyLong());
725725

726726
// Verify that the coordinating indexing pressure is maintained through downstream action filters
@@ -759,7 +759,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep
759759
public void testIndexingPressureTripsOnInferenceResponseHandling() throws Exception {
760760
final XContentBuilder doc1Source = IndexRequest.getXContentBuilder(XContentType.JSON, "sparse_field", "bar");
761761
final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure(
762-
Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (bytesUsed(doc1Source) + 1) + "b").build()
762+
Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (length(doc1Source) + 1) + "b").build()
763763
);
764764

765765
final InferenceStats inferenceStats = new InferenceStats(mock(), mock());
@@ -802,7 +802,7 @@ public void testIndexingPressureTripsOnInferenceResponseHandling() throws Except
802802

803803
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
804804
assertThat(coordinatingIndexingPressure, notNullValue());
805-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
805+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
806806
verify(coordinatingIndexingPressure).increment(eq(0), longThat(l -> l > 0));
807807
verify(coordinatingIndexingPressure, times(2)).increment(anyInt(), anyLong());
808808

@@ -862,14 +862,14 @@ public void testIndexingPressurePartialFailure() throws Exception {
862862
);
863863
XContentBuilder builder = XContentFactory.jsonBuilder();
864864
semanticTextField.toXContent(builder, EMPTY_PARAMS);
865-
return bytesUsed(builder);
865+
return length(builder);
866866
};
867867

868868
final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure(
869869
Settings.builder()
870870
.put(
871871
MAX_COORDINATING_BYTES.getKey(),
872-
(bytesUsed(doc1Source) + bytesUsed(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding)
872+
(length(doc1Source) + length(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding)
873873
+ (estimateInferenceResultsBytes.apply(List.of("bazzz"), bazzzEmbedding) / 2)) + "b"
874874
)
875875
.build()
@@ -913,8 +913,8 @@ public void testIndexingPressurePartialFailure() throws Exception {
913913

914914
IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating();
915915
assertThat(coordinatingIndexingPressure, notNullValue());
916-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source));
917-
verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source));
916+
verify(coordinatingIndexingPressure).increment(1, length(doc1Source));
917+
verify(coordinatingIndexingPressure).increment(1, length(doc2Source));
918918
verify(coordinatingIndexingPressure, times(2)).increment(eq(0), longThat(l -> l > 0));
919919
verify(coordinatingIndexingPressure, times(4)).increment(anyInt(), anyLong());
920920

@@ -1124,8 +1124,8 @@ private static BulkItemRequest[] randomBulkItemRequest(
11241124
new BulkItemRequest(requestId, new IndexRequest("index").source(expectedDocMap, requestContentType)) };
11251125
}
11261126

1127-
private static long bytesUsed(XContentBuilder builder) {
1128-
return BytesReference.bytes(builder).ramBytesUsed();
1127+
private static long length(XContentBuilder builder) {
1128+
return BytesReference.bytes(builder).length();
11291129
}
11301130

11311131
@SuppressWarnings({ "unchecked" })

0 commit comments

Comments
 (0)