Skip to content

Commit 06a96e9

Browse files
committed
Address review comments
1 parent 381a85a commit 06a96e9

File tree

1 file changed

+40
-37
lines changed

1 file changed

+40
-37
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.metadata.InferenceFieldMetadata;
2626
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2727
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.common.bytes.BytesReference;
2829
import org.elasticsearch.common.settings.Setting;
2930
import org.elasticsearch.common.unit.ByteSizeValue;
3031
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -49,6 +50,7 @@
4950
import org.elasticsearch.xcontent.XContentBuilder;
5051
import org.elasticsearch.xcontent.XContentParser;
5152
import org.elasticsearch.xcontent.XContentParserConfiguration;
53+
import org.elasticsearch.xcontent.XContentType;
5254
import org.elasticsearch.xpack.core.XPackField;
5355
import org.elasticsearch.xpack.core.inference.results.ChunkedInferenceError;
5456
import org.elasticsearch.xpack.inference.InferenceException;
@@ -91,6 +93,8 @@ public class ShardBulkInferenceActionFilter implements MappedActionFilter {
9193
public static Setting<ByteSizeValue> INDICES_INFERENCE_BATCH_SIZE = Setting.byteSizeSetting(
9294
"indices.inference.batch_size",
9395
DEFAULT_BATCH_SIZE,
96+
ByteSizeValue.ONE,
97+
ByteSizeValue.ofBytes(100),
9498
Setting.Property.NodeScope,
9599
Setting.Property.OperatorDynamic
96100
);
@@ -252,17 +256,15 @@ private void executeNext(int itemOffset) {
252256
Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new HashMap<>();
253257
long totalInputLength = 0;
254258
int itemIndex = itemOffset;
255-
for (; itemIndex < bulkShardRequest.items().length; itemIndex++) {
259+
while (itemIndex < items.length && totalInputLength < batchSizeInBytes) {
256260
var item = items[itemIndex];
257261
totalInputLength += addFieldInferenceRequests(item, itemIndex, fieldRequestsMap);
258-
if (totalInputLength >= batchSizeInBytes) {
259-
break;
260-
}
262+
itemIndex += 1;
261263
}
262-
int nextItemIndex = itemIndex + 1;
264+
int nextItemOffset = itemIndex;
263265
Runnable onInferenceCompletion = () -> {
264266
try {
265-
int limit = Math.min(nextItemIndex, items.length);
267+
int limit = Math.min(nextItemOffset, items.length);
266268
for (int i = itemOffset; i < limit; i++) {
267269
var result = inferenceResults.get(i);
268270
if (result == null) {
@@ -278,7 +280,7 @@ private void executeNext(int itemOffset) {
278280
inferenceResults.set(i, null);
279281
}
280282
} finally {
281-
executeNext(nextItemIndex);
283+
executeNext(nextItemOffset);
282284
}
283285
};
284286

@@ -613,43 +615,44 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
613615
}
614616
indexRequest.source(newDocMap, indexRequest.getContentType());
615617
} else {
616-
/**
617-
* Merges the original source with the new inference metadata field and writes the result
618-
* directly to an {@link XContentBuilder}, avoiding the need to materialize the original source as a {@link Map}.
619-
*/
620618
try (XContentBuilder builder = XContentBuilder.builder(indexRequest.getContentType().xContent())) {
621-
builder.startObject();
622-
623-
// append the original source
624-
try (
625-
XContentParser parser = XContentHelper.createParserNotCompressed(
626-
XContentParserConfiguration.EMPTY,
627-
indexRequest.source(),
628-
indexRequest.getContentType()
629-
)
630-
) {
631-
// skip start object
632-
parser.nextToken();
633-
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
634-
builder.copyCurrentStructure(parser);
635-
}
636-
}
637-
638-
// add the inference metadata field
639-
builder.field(InferenceMetadataFieldsMapper.NAME);
640-
try (
641-
XContentParser parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, inferenceFieldsMap)
642-
) {
643-
builder.copyCurrentStructure(parser);
644-
}
645-
646-
builder.endObject();
619+
appendSourceAndInferenceMetadata(builder, indexRequest.source(), indexRequest.getContentType(), inferenceFieldsMap);
647620
indexRequest.source(builder);
648621
}
649622
}
650623
}
651624
}
652625

626+
/**
627+
* Appends the original source and the new inference metadata field directly to the provided
628+
* {@link XContentBuilder}, avoiding the need to materialize the original source as a {@link Map}.
629+
*/
630+
private static void appendSourceAndInferenceMetadata(
631+
XContentBuilder builder,
632+
BytesReference source,
633+
XContentType xContentType,
634+
Map<String, Object> inferenceFieldsMap
635+
) throws IOException {
636+
builder.startObject();
637+
638+
// append the original source
639+
try (XContentParser parser = XContentHelper.createParserNotCompressed(XContentParserConfiguration.EMPTY, source, xContentType)) {
640+
// skip start object
641+
parser.nextToken();
642+
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
643+
builder.copyCurrentStructure(parser);
644+
}
645+
}
646+
647+
// add the inference metadata field
648+
builder.field(InferenceMetadataFieldsMapper.NAME);
649+
try (XContentParser parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, inferenceFieldsMap)) {
650+
builder.copyCurrentStructure(parser);
651+
}
652+
653+
builder.endObject();
654+
}
655+
653656
static IndexRequest getIndexRequestOrNull(DocWriteRequest<?> docWriteRequest) {
654657
if (docWriteRequest instanceof IndexRequest indexRequest) {
655658
return indexRequest;

0 commit comments

Comments
 (0)