Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ private record FieldInferenceResponse(
private record FieldInferenceResponseAccumulator(
int id,
Map<String, List<FieldInferenceResponse>> responses,
List<Exception> failures
List<Exception> failures,
Map<String, Object> source
) {
void addOrUpdateResponse(FieldInferenceResponse response) {
synchronized (this) {
Expand Down Expand Up @@ -376,17 +377,17 @@ private void onFinish() {
.chunkedInfer(inferenceProvider.model(), null, inputs, Map.of(), InputType.INGEST, TimeValue.MAX_VALUE, completionListener);
}

private FieldInferenceResponseAccumulator ensureResponseAccumulatorSlot(int id) {
private FieldInferenceResponseAccumulator ensureResponseAccumulatorSlot(int id, Map<String, Object> source) {
FieldInferenceResponseAccumulator acc = inferenceResults.get(id);
if (acc == null) {
acc = new FieldInferenceResponseAccumulator(id, new HashMap<>(), new ArrayList<>());
acc = new FieldInferenceResponseAccumulator(id, new HashMap<>(), new ArrayList<>(), source);
inferenceResults.set(id, acc);
}
return acc;
}

private void addInferenceResponseFailure(int id, Exception failure) {
var acc = ensureResponseAccumulatorSlot(id);
var acc = ensureResponseAccumulatorSlot(id, null);
acc.addFailure(failure);
}

Expand All @@ -404,7 +405,7 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
}

final IndexRequest indexRequest = getIndexRequestOrNull(item.request());
var newDocMap = indexRequest.sourceAsMap();
var newDocMap = response.source();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reuses the parsed source from line 511 instead of parsing it again.

Parsing it again causes the full input string to be extracted and stored again, which is another copy of a potentially large byte[]. The savings of this is very similar to Remove matched text from chunks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we can really call it a savings. With this change we'll keep all the parsed source in memory for the entire batch. That's what we wanted to avoid by re-parsing eagerly.
One possible optimisation here, would be to parse and write the value without keeping the intermediate Map object.

Copy link
Contributor Author

@jan-elastic jan-elastic Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change there's one less copy of the (potentially long) input string around. In my experiments that's a clear win, because the source doesn't contain anything else.

It comes at the expense of keeping a parsed version of the rest of the source around. This costs some memory indeed, and becomes a memory loss if the rest of the source is larger than the input string. You might also argue that it's a computational win, because the parsed source is cached.

I'm not sure what you mean with:

One possible optimisation here, would be to parse and write the value without keeping the intermediate Map object.

In the end, a parsed version of the source is needed for indexing. If you re-parse the source, you get an additional copy of the input string, which is what I'm trying to prevent.

If you prefer, I could keep the source around only if size(input) > size(source) / 2 or so, because in that case it's (most likely) a memory win, and re-parse it otherwise. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a batch API, we have more than one source at a time. This change would put all the parsed source in memory for the entire duration of the batch inference calls. We're not trying to optimise for the case where the bulk api contains a single document. This is something we can improve but this change alone doesn't save anything memory wise. The parsed source is ephemeral and kept for a single document at a time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input string is not ephemeral: it's referenced in the FieldInferenceRequest and the FieldInferenceResponse, and therefore arrives here.

Maybe we can instead remove the input string from the FieldInferenceResponse? That shouldn't be necessary, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the input string has to do with this change? Sorry I am not following

Copy link
Contributor

@jimczi jimczi Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the input from the response is something we forgot to follow up on after moving to the new format:
#118893 (comment)
cc @Mikep86

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I lost track of that one @jimczi, I'll create a task to address this soon. But isn't that change orthogonal to this PR? We still need the parsed source both when generating the inference request (to get the values to perform inference on) and when handling the inference response (to update source).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that change seems orthogonal indeed.

Right now, profiling shows that when you get to the line

var newDocMap = indexRequest.sourceAsMap();

there's already a parsed version of the (potentially long) input string in memory, and this line is generating an additional copy. This PR prevents that, but I agree it also introduces some other memory usage.

I'm investigating right now whether I can prevent it without that side effect. To be continued...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #124313 to reduce the overall memory needed to execute the bulk request. @Mikep86 I included the removal of the input from the response since it's a minimal change.

Map<String, Object> inferenceFieldsMap = new HashMap<>();
for (var entry : response.responses.entrySet()) {
var fieldName = entry.getKey();
Expand Down Expand Up @@ -542,7 +543,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
* This ensures that the field is treated as intentionally cleared,
* preventing any unintended carryover of prior inference results.
*/
var slot = ensureResponseAccumulatorSlot(itemIndex);
var slot = ensureResponseAccumulatorSlot(itemIndex, docMap);
slot.addOrUpdateResponse(
new FieldInferenceResponse(field, sourceField, null, order++, 0, null, EMPTY_CHUNKED_INFERENCE)
);
Expand All @@ -563,7 +564,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
}
continue;
}
ensureResponseAccumulatorSlot(itemIndex);
ensureResponseAccumulatorSlot(itemIndex, docMap);
final List<String> values;
try {
values = SemanticTextUtils.nodeStringValues(field, valueObj);
Expand Down