Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/129320.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 129320
summary: Refine indexing pressure accounting in semantic bulk inference filter
area: Relevance
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,9 @@ private void recordRequestCountMetrics(Model model, int incrementBy, Throwable t
*/
private long addFieldInferenceRequests(BulkItemRequest item, int itemIndex, Map<String, List<FieldInferenceRequest>> requestsMap) {
boolean isUpdateRequest = false;
final IndexRequestWithIndexingPressure indexRequest;
final IndexRequest indexRequest;
if (item.request() instanceof IndexRequest ir) {
indexRequest = new IndexRequestWithIndexingPressure(ir);
indexRequest = ir;
} else if (item.request() instanceof UpdateRequest updateRequest) {
isUpdateRequest = true;
if (updateRequest.script() != null) {
Expand All @@ -493,13 +493,13 @@ private long addFieldInferenceRequests(BulkItemRequest item, int itemIndex, Map<
);
return 0;
}
indexRequest = new IndexRequestWithIndexingPressure(updateRequest.doc());
indexRequest = updateRequest.doc();
} else {
// ignore delete request
return 0;
}

final Map<String, Object> docMap = indexRequest.getIndexRequest().sourceAsMap();
final Map<String, Object> docMap = indexRequest.sourceAsMap();
long inputLength = 0;
for (var entry : fieldInferenceMap.values()) {
String field = entry.getName();
Expand Down Expand Up @@ -535,10 +535,6 @@ private long addFieldInferenceRequests(BulkItemRequest item, int itemIndex, Map<
* This ensures that the field is treated as intentionally cleared,
* preventing any unintended carryover of prior inference results.
*/
if (incrementIndexingPressure(indexRequest, itemIndex) == false) {
return inputLength;
}

var slot = ensureResponseAccumulatorSlot(itemIndex);
slot.addOrUpdateResponse(
new FieldInferenceResponse(field, sourceField, null, order++, 0, null, EMPTY_CHUNKED_INFERENCE)
Expand Down Expand Up @@ -578,10 +574,6 @@ private long addFieldInferenceRequests(BulkItemRequest item, int itemIndex, Map<
List<FieldInferenceRequest> requests = requestsMap.computeIfAbsent(inferenceId, k -> new ArrayList<>());
int offsetAdjustment = 0;
for (String v : values) {
if (incrementIndexingPressure(indexRequest, itemIndex) == false) {
return inputLength;
}

if (v.isBlank()) {
slot.addOrUpdateResponse(
new FieldInferenceResponse(field, sourceField, v, order++, 0, null, EMPTY_CHUNKED_INFERENCE)
Expand All @@ -604,50 +596,6 @@ private long addFieldInferenceRequests(BulkItemRequest item, int itemIndex, Map<
return inputLength;
}

private static class IndexRequestWithIndexingPressure {
private final IndexRequest indexRequest;
private boolean indexingPressureIncremented;

private IndexRequestWithIndexingPressure(IndexRequest indexRequest) {
this.indexRequest = indexRequest;
this.indexingPressureIncremented = false;
}

private IndexRequest getIndexRequest() {
return indexRequest;
}

private boolean isIndexingPressureIncremented() {
return indexingPressureIncremented;
}

private void setIndexingPressureIncremented() {
this.indexingPressureIncremented = true;
}
}

private boolean incrementIndexingPressure(IndexRequestWithIndexingPressure indexRequest, int itemIndex) {
boolean success = true;
if (indexRequest.isIndexingPressureIncremented() == false) {
try {
// Track operation count as one operation per document source update
coordinatingIndexingPressure.increment(1, indexRequest.getIndexRequest().source().ramBytesUsed());
indexRequest.setIndexingPressureIncremented();
} catch (EsRejectedExecutionException e) {
addInferenceResponseFailure(
itemIndex,
new InferenceException(
"Insufficient memory available to update source on document [" + indexRequest.getIndexRequest().id() + "]",
e
)
);
success = false;
}
}

return success;
}

private FieldInferenceResponseAccumulator ensureResponseAccumulatorSlot(int id) {
FieldInferenceResponseAccumulator acc = inferenceResults.get(id);
if (acc == null) {
Expand Down Expand Up @@ -740,16 +688,17 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
long modifiedSourceSize = indexRequest.source().ramBytesUsed();

// Add the indexing pressure from the source modifications.
// Don't increment operation count because we count one source update as one operation, and we already accounted for those
// in addFieldInferenceRequests.
try {
coordinatingIndexingPressure.increment(0, modifiedSourceSize - originalSource.ramBytesUsed());
coordinatingIndexingPressure.increment(1, modifiedSourceSize - originalSource.ramBytesUsed());
} catch (EsRejectedExecutionException e) {
indexRequest.source(originalSource, indexRequest.getContentType());
inferenceStats.bulkRejection().incrementBy(1);
item.abort(
item.index(),
new InferenceException(
"Insufficient memory available to insert inference results into document [" + indexRequest.id() + "]",
"Unable to insert inference results into document ["
+ indexRequest.id()
+ "] due to memory pressure. Please retry the bulk request with fewer documents or smaller document sizes.",
e
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.util.Map;
import java.util.Objects;

public record InferenceStats(LongCounter requestCount, LongHistogram inferenceDuration) {
public record InferenceStats(LongCounter requestCount, LongHistogram inferenceDuration, LongCounter bulkRejection) {

public InferenceStats {
Objects.requireNonNull(requestCount);
Objects.requireNonNull(inferenceDuration);
Objects.requireNonNull(bulkRejection);
}

public static InferenceStats create(MeterRegistry meterRegistry) {
Expand All @@ -38,6 +39,11 @@ public static InferenceStats create(MeterRegistry meterRegistry) {
"es.inference.requests.time",
"Inference API request counts for a particular service, task type, model ID",
"ms"
),
meterRegistry.registerLongCounter(
"es.inference.bulk.rejection.total",
"Count of bulk request rejections for semantic text processing due to insufficient available memory",
"operations"
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setUp() throws Exception {
licenseState = mock();
modelRegistry = mock();
serviceRegistry = mock();
inferenceStats = new InferenceStats(mock(), mock());
inferenceStats = new InferenceStats(mock(), mock(), mock());
streamingTaskManager = mock();

action = createAction(
Expand Down
Loading
Loading