From 8d47828e86dc4bb82d11e291777ab00f4f5a3b8f Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Fri, 27 Jun 2025 10:42:30 -0400 Subject: [PATCH 1/5] Use length() instead of ramBytesUsed() --- .../action/filter/ShardBulkInferenceActionFilter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index 082ece347208a..3127361de6d11 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -631,7 +631,7 @@ private boolean incrementIndexingPressure(IndexRequestWithIndexingPressure index if (indexRequest.isIndexingPressureIncremented() == false) { try { // Track operation count as one operation per document source update - coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().ramBytesUsed()); + coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().length()); indexRequest.setIndexingPressureIncremented(); } catch (EsRejectedExecutionException e) { addInferenceResponseFailure( @@ -737,13 +737,13 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons indexRequest.source(builder); } } - long modifiedSourceSize = indexRequest.source().ramBytesUsed(); + long modifiedSourceSize = indexRequest.source().length(); // Add the indexing pressure from the source modifications. // Don't increment operation count because we count one source update as one operation, and we already accounted for those // in addFieldInferenceRequests. try { - coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.ramBytesUsed()); + coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.length()); } catch (EsRejectedExecutionException e) { indexRequest.source(originalSource, indexRequest.getContentType()); item.abort( From 6201f1fa2e7158ce8b14903a0865bf2a1792d516 Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Fri, 27 Jun 2025 10:51:27 -0400 Subject: [PATCH 2/5] Update docs/changelog/130221.yaml --- docs/changelog/130221.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/130221.yaml diff --git a/docs/changelog/130221.yaml b/docs/changelog/130221.yaml new file mode 100644 index 0000000000000..83f94c2ccc6f9 --- /dev/null +++ b/docs/changelog/130221.yaml @@ -0,0 +1,5 @@ +pr: 130221 +summary: Fix incorrect accounting of semantic text indexing memory pressure +area: "Distributed, Relevance" +type: bug +issues: [] From 0c8ac1d3665d4781100847750293726cb2accb0b Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Fri, 27 Jun 2025 10:52:56 -0400 Subject: [PATCH 3/5] Fix changelog --- docs/changelog/130221.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/130221.yaml b/docs/changelog/130221.yaml index 83f94c2ccc6f9..607a76f7dbf41 100644 --- a/docs/changelog/130221.yaml +++ b/docs/changelog/130221.yaml @@ -1,5 +1,5 @@ pr: 130221 summary: Fix incorrect accounting of semantic text indexing memory pressure -area: "Distributed, Relevance" +area: Distributed type: bug issues: [] From 41dec6d197a7ca202bdbc2602593e6052b447872 Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Fri, 27 Jun 2025 12:42:02 -0400 Subject: [PATCH 4/5] Added documentation about ramBytesUsed --- .../java/org/elasticsearch/common/bytes/BytesReference.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index ddcfc1ea7eed8..26df48fc9ec24 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -159,7 +159,10 @@ static BytesReference fromByteArray(ByteArray byteArray, int length) { BytesReference slice(int from, int length); /** - * The amount of memory used by this BytesReference + * The amount of memory used by this BytesReference. + *

+ * Note that this is not always the same as length and can vary by implementation. + *

*/ long ramBytesUsed(); From d31c2e8ac150e531a040edd1da9bf29e4e7bb710 Mon Sep 17 00:00:00 2001 From: Mike Pellegrini Date: Fri, 27 Jun 2025 12:53:21 -0400 Subject: [PATCH 5/5] Remove ramBytesUsed reference in test --- .../ShardBulkInferenceActionFilterTests.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java index a7cb0234aee59..5b4925d8fb0a3 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java @@ -616,14 +616,14 @@ public void testIndexingPressure() throws Exception { IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc3Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc4Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0UpdateSource)); + verify(coordinatingIndexingPressure).increment(1, length(doc0Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc2Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc3Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc4Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc0UpdateSource)); if (useLegacyFormat == false) { - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1UpdateSource)); + verify(coordinatingIndexingPressure).increment(1, length(doc1UpdateSource)); } verify(coordinatingIndexingPressure, times(useLegacyFormat ? 6 : 7)).increment(eq(0), longThat(l -> l > 0)); @@ -720,7 +720,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); verify(coordinatingIndexingPressure, times(1)).increment(anyInt(), anyLong()); // Verify that the coordinating indexing pressure is maintained through downstream action filters @@ -759,7 +759,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep public void testIndexingPressureTripsOnInferenceResponseHandling() throws Exception { final XContentBuilder doc1Source = IndexRequest.getXContentBuilder(XContentType.JSON, "sparse_field", "bar"); final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure( - Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (bytesUsed(doc1Source) + 1) + "b").build() + Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (length(doc1Source) + 1) + "b").build() ); final InferenceStats inferenceStats = new InferenceStats(mock(), mock()); @@ -802,7 +802,7 @@ public void testIndexingPressureTripsOnInferenceResponseHandling() throws Except IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); verify(coordinatingIndexingPressure).increment(eq(0), longThat(l -> l > 0)); verify(coordinatingIndexingPressure, times(2)).increment(anyInt(), anyLong()); @@ -862,14 +862,14 @@ public void testIndexingPressurePartialFailure() throws Exception { ); XContentBuilder builder = XContentFactory.jsonBuilder(); semanticTextField.toXContent(builder, EMPTY_PARAMS); - return bytesUsed(builder); + return length(builder); }; final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure( Settings.builder() .put( MAX_COORDINATING_BYTES.getKey(), - (bytesUsed(doc1Source) + bytesUsed(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding) + (length(doc1Source) + length(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding) + (estimateInferenceResultsBytes.apply(List.of("bazzz"), bazzzEmbedding) / 2)) + "b" ) .build() @@ -913,8 +913,8 @@ public void testIndexingPressurePartialFailure() throws Exception { IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc2Source)); verify(coordinatingIndexingPressure, times(2)).increment(eq(0), longThat(l -> l > 0)); verify(coordinatingIndexingPressure, times(4)).increment(anyInt(), anyLong()); @@ -1124,8 +1124,8 @@ private static BulkItemRequest[] randomBulkItemRequest( new BulkItemRequest(requestId, new IndexRequest("index").source(expectedDocMap, requestContentType)) }; } - private static long bytesUsed(XContentBuilder builder) { - return BytesReference.bytes(builder).ramBytesUsed(); + private static long length(XContentBuilder builder) { + return BytesReference.bytes(builder).length(); } @SuppressWarnings({ "unchecked" })