diff --git a/docs/changelog/130221.yaml b/docs/changelog/130221.yaml new file mode 100644 index 0000000000000..607a76f7dbf41 --- /dev/null +++ b/docs/changelog/130221.yaml @@ -0,0 +1,5 @@ +pr: 130221 +summary: Fix incorrect accounting of semantic text indexing memory pressure +area: Distributed +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index ddcfc1ea7eed8..26df48fc9ec24 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -159,7 +159,10 @@ static BytesReference fromByteArray(ByteArray byteArray, int length) { BytesReference slice(int from, int length); /** - * The amount of memory used by this BytesReference + * The amount of memory used by this BytesReference. + *
+ * Note that this is not always the same as length and can vary by implementation. + *
*/ long ramBytesUsed(); diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java index 082ece347208a..3127361de6d11 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java @@ -631,7 +631,7 @@ private boolean incrementIndexingPressure(IndexRequestWithIndexingPressure index if (indexRequest.isIndexingPressureIncremented() == false) { try { // Track operation count as one operation per document source update - coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().ramBytesUsed()); + coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().length()); indexRequest.setIndexingPressureIncremented(); } catch (EsRejectedExecutionException e) { addInferenceResponseFailure( @@ -737,13 +737,13 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons indexRequest.source(builder); } } - long modifiedSourceSize = indexRequest.source().ramBytesUsed(); + long modifiedSourceSize = indexRequest.source().length(); // Add the indexing pressure from the source modifications. // Don't increment operation count because we count one source update as one operation, and we already accounted for those // in addFieldInferenceRequests. try { - coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.ramBytesUsed()); + coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.length()); } catch (EsRejectedExecutionException e) { indexRequest.source(originalSource, indexRequest.getContentType()); item.abort( diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java index a7cb0234aee59..5b4925d8fb0a3 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterTests.java @@ -616,14 +616,14 @@ public void testIndexingPressure() throws Exception { IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc3Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc4Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc0UpdateSource)); + verify(coordinatingIndexingPressure).increment(1, length(doc0Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc2Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc3Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc4Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc0UpdateSource)); if (useLegacyFormat == false) { - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1UpdateSource)); + verify(coordinatingIndexingPressure).increment(1, length(doc1UpdateSource)); } verify(coordinatingIndexingPressure, times(useLegacyFormat ? 6 : 7)).increment(eq(0), longThat(l -> l > 0)); @@ -720,7 +720,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); verify(coordinatingIndexingPressure, times(1)).increment(anyInt(), anyLong()); // Verify that the coordinating indexing pressure is maintained through downstream action filters @@ -759,7 +759,7 @@ public void testIndexingPressureTripsOnInferenceRequestGeneration() throws Excep public void testIndexingPressureTripsOnInferenceResponseHandling() throws Exception { final XContentBuilder doc1Source = IndexRequest.getXContentBuilder(XContentType.JSON, "sparse_field", "bar"); final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure( - Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (bytesUsed(doc1Source) + 1) + "b").build() + Settings.builder().put(MAX_COORDINATING_BYTES.getKey(), (length(doc1Source) + 1) + "b").build() ); final InferenceStats inferenceStats = new InferenceStats(mock(), mock()); @@ -802,7 +802,7 @@ public void testIndexingPressureTripsOnInferenceResponseHandling() throws Except IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); verify(coordinatingIndexingPressure).increment(eq(0), longThat(l -> l > 0)); verify(coordinatingIndexingPressure, times(2)).increment(anyInt(), anyLong()); @@ -862,14 +862,14 @@ public void testIndexingPressurePartialFailure() throws Exception { ); XContentBuilder builder = XContentFactory.jsonBuilder(); semanticTextField.toXContent(builder, EMPTY_PARAMS); - return bytesUsed(builder); + return length(builder); }; final InstrumentedIndexingPressure indexingPressure = new InstrumentedIndexingPressure( Settings.builder() .put( MAX_COORDINATING_BYTES.getKey(), - (bytesUsed(doc1Source) + bytesUsed(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding) + (length(doc1Source) + length(doc2Source) + estimateInferenceResultsBytes.apply(List.of("bar"), barEmbedding) + (estimateInferenceResultsBytes.apply(List.of("bazzz"), bazzzEmbedding) / 2)) + "b" ) .build() @@ -913,8 +913,8 @@ public void testIndexingPressurePartialFailure() throws Exception { IndexingPressure.Coordinating coordinatingIndexingPressure = indexingPressure.getCoordinating(); assertThat(coordinatingIndexingPressure, notNullValue()); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc1Source)); - verify(coordinatingIndexingPressure).increment(1, bytesUsed(doc2Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc1Source)); + verify(coordinatingIndexingPressure).increment(1, length(doc2Source)); verify(coordinatingIndexingPressure, times(2)).increment(eq(0), longThat(l -> l > 0)); verify(coordinatingIndexingPressure, times(4)).increment(anyInt(), anyLong()); @@ -1124,8 +1124,8 @@ private static BulkItemRequest[] randomBulkItemRequest( new BulkItemRequest(requestId, new IndexRequest("index").source(expectedDocMap, requestContentType)) }; } - private static long bytesUsed(XContentBuilder builder) { - return BytesReference.bytes(builder).ramBytesUsed(); + private static long length(XContentBuilder builder) { + return BytesReference.bytes(builder).length(); } @SuppressWarnings({ "unchecked" })