Skip to content

Commit a103e3c

Browse files
authored
Infrastructure for metering the update requests (#105063)
udpate request that are sending a document (or part of it) should allow for metering the size of that doc the update request that are using a script should not be metered - reported size 0. this commit is following up on #104859 The parsing is of the update's document is being done in UpdateHelper - the same pattern we use to meter parsing in IngestService. If the script is being used, the size observed will be 0. The value observed is then reported in the TransportShardBulkAction and thanks to the value being 0 or positive it will not be metering the modified document again. This commit also renames the getDocumentParsingSupplier to getDocumentParsingProvider (this was accidentally omitted in the #104859)
1 parent 44b0047 commit a103e3c

File tree

11 files changed

+92
-41
lines changed

11 files changed

+92
-41
lines changed

docs/changelog/105063.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 105063
2+
summary: Infrastructure for metering the update requests
3+
area: Infra/Metrics
4+
type: enhancement
5+
issues: []

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverWithPipelinesIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public static class TestDocumentParsingProviderPlugin extends Plugin implements
7878
public TestDocumentParsingProviderPlugin() {}
7979

8080
@Override
81-
public DocumentParsingProvider getDocumentParsingSupplier() {
81+
public DocumentParsingProvider getDocumentParsingProvider() {
8282
// returns a static instance, because we want to assert that the wrapping is called only once
8383
return new DocumentParsingProvider() {
8484
@Override

server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ public void testDocumentIsReportedUponBulk() throws IOException {
3939
new IndexRequest(TEST_INDEX_NAME).id("1").source(jsonBuilder().startObject().field("test", "I am sam i am").endObject())
4040
).actionGet();
4141
assertTrue(hasWrappedParser);
42-
// there are more assertions in a TestDocumentParsingSupplierPlugin
42+
// there are more assertions in a TestDocumentParsingProviderPlugin
4343

4444
hasWrappedParser = false;
4545
// the format of the request does not matter
4646
client().index(
4747
new IndexRequest(TEST_INDEX_NAME).id("2").source(cborBuilder().startObject().field("test", "I am sam i am").endObject())
4848
).actionGet();
4949
assertTrue(hasWrappedParser);
50-
// there are more assertions in a TestDocumentParsingSupplierPlugin
50+
// there are more assertions in a TestDocumentParsingProviderPlugin
5151

5252
hasWrappedParser = false;
5353
// white spaces does not matter
@@ -59,7 +59,7 @@ public void testDocumentIsReportedUponBulk() throws IOException {
5959
}
6060
""", XContentType.JSON)).actionGet();
6161
assertTrue(hasWrappedParser);
62-
// there are more assertions in a TestDocumentParsingSupplierPlugin
62+
// there are more assertions in a TestDocumentParsingProviderPlugin
6363
}
6464

6565
@Override
@@ -72,7 +72,7 @@ public static class TestDocumentParsingProviderPlugin extends Plugin implements
7272
public TestDocumentParsingProviderPlugin() {}
7373

7474
@Override
75-
public DocumentParsingProvider getDocumentParsingSupplier() {
75+
public DocumentParsingProvider getDocumentParsingProvider() {
7676
return new DocumentParsingProvider() {
7777

7878
@Override

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -455,16 +455,20 @@ public void onFailure(Exception e) {
455455
}
456456

457457
/**
458-
* Creates a new document size observerl
458+
* Creates a new document size observer
459459
* @param documentParsingProvider a provider to create a new observer.
460460
* @param request an index request to provide information about bytes being already parsed.
461-
* @return a Fixed version of DocumentSizeObserver if parsing already happened (in IngestService).
461+
* @return a Fixed version of DocumentSizeObserver if parsing already happened (in IngestService, UpdateHelper)
462+
* and there is a value to be reported >0
462463
* It would be pre-populated with information about how many bytes were already parsed
463-
* or return a new 'empty' DocumentSizeObserver.
464+
* or a noop instance if parsed bytes in IngestService/UpdateHelper was 0 (like when empty doc or script in update)
465+
* or return a new DocumentSizeObserver that will be used when parsing.
464466
*/
465467
private static DocumentSizeObserver getDocumentSizeObserver(DocumentParsingProvider documentParsingProvider, IndexRequest request) {
466468
if (request.getNormalisedBytesParsed() != -1) {
467469
return documentParsingProvider.newFixedSizeDocumentObserver(request.getNormalisedBytesParsed());
470+
} else if (request.getNormalisedBytesParsed() == 0) {
471+
return DocumentSizeObserver.EMPTY_INSTANCE;
468472
}
469473
return documentParsingProvider.newDocumentSizeObserver();
470474
}

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@
5959

6060
/**
6161
* Index request to index a typed JSON document into a specific index and make it searchable.
62-
*
62+
* <p>
6363
* The index requires the {@link #index()}, {@link #id(String)} and
6464
* {@link #source(byte[], XContentType)} to be set.
65-
*
65+
* <p>
6666
* The source (content to index) can be set in its bytes form using ({@link #source(byte[], XContentType)}),
6767
* its string form ({@link #source(String, XContentType)}) or using a {@link org.elasticsearch.xcontent.XContentBuilder}
6868
* ({@link #source(org.elasticsearch.xcontent.XContentBuilder)}).
69-
*
69+
* <p>
7070
* If the {@link #id(String)} is not set, it will be automatically generated.
7171
*
7272
* @see IndexResponse
@@ -453,7 +453,7 @@ public IndexRequest source(Map<String, ?> source, XContentType contentType, bool
453453

454454
/**
455455
* Sets the document source to index.
456-
*
456+
* <p>
457457
* Note, its preferable to either set it using {@link #source(org.elasticsearch.xcontent.XContentBuilder)}
458458
* or using the {@link #source(byte[], XContentType)}.
459459
*/
@@ -632,7 +632,7 @@ public IndexRequest versionType(VersionType versionType) {
632632
/**
633633
* only perform this indexing request if the document was last modification was assigned the given
634634
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
635-
*
635+
* <p>
636636
* If the document last modification was assigned a different sequence number a
637637
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
638638
*/
@@ -647,7 +647,7 @@ public IndexRequest setIfSeqNo(long seqNo) {
647647
/**
648648
* only performs this indexing request if the document was last modification was assigned the given
649649
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
650-
*
650+
* <p>
651651
* If the document last modification was assigned a different term a
652652
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
653653
*/
@@ -670,7 +670,7 @@ public long ifSeqNo() {
670670

671671
/**
672672
* If set, only perform this indexing request if the document was last modification was assigned this primary term.
673-
*
673+
* <p>
674674
* If the document last modification was assigned a different term a
675675
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
676676
*/
@@ -933,16 +933,36 @@ public void setRawTimestamp(Object rawTimestamp) {
933933
this.rawTimestamp = rawTimestamp;
934934
}
935935

936+
/**
937+
* Returns a number of bytes observed when parsing a document in earlier stages of ingestion (like update/ingest service)
938+
* Defaults to -1 when a document size was not observed in earlier stages.
939+
* @return a number of bytes observed
940+
*/
936941
public long getNormalisedBytesParsed() {
937942
return normalisedBytesParsed;
938943
}
939944

940-
public void setNormalisedBytesParsed(long normalisedBytesParsed) {
945+
/**
946+
* Sets number of bytes observed by a <code>DocumentSizeObserver</code>
947+
* @return an index request
948+
*/
949+
public IndexRequest setNormalisedBytesParsed(long normalisedBytesParsed) {
941950
this.normalisedBytesParsed = normalisedBytesParsed;
951+
return this;
952+
}
953+
954+
/**
955+
* when observing document size while parsing, this method indicates that this request should not be recorded.
956+
* @return an index request
957+
*/
958+
public IndexRequest noParsedBytesToReport() {
959+
this.normalisedBytesParsed = 0;
960+
return this;
942961
}
943962

944963
/**
945964
* Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true
965+
*
946966
* @param pipeline
947967
*/
948968
public void addPipeline(String pipeline) {
@@ -957,6 +977,7 @@ public void addPipeline(String pipeline) {
957977
/**
958978
* This returns the list of pipelines executed on the document for this request. If listExecutedPipelines is false, the response will be
959979
* null, even if pipelines were executed. If listExecutedPipelines is true but no pipelines were executed, the list will be empty.
980+
*
960981
* @return
961982
*/
962983
@Nullable

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.elasticsearch.index.mapper.RoutingFieldMapper;
2727
import org.elasticsearch.index.shard.IndexShard;
2828
import org.elasticsearch.index.shard.ShardId;
29+
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
30+
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
2931
import org.elasticsearch.script.Script;
3032
import org.elasticsearch.script.ScriptService;
3133
import org.elasticsearch.script.UpdateCtxMap;
@@ -47,9 +49,11 @@ public class UpdateHelper {
4749
private static final Logger logger = LogManager.getLogger(UpdateHelper.class);
4850

4951
private final ScriptService scriptService;
52+
private final DocumentParsingProvider documentParsingProvider;
5053

51-
public UpdateHelper(ScriptService scriptService) {
54+
public UpdateHelper(ScriptService scriptService, DocumentParsingProvider documentParsingProvider) {
5255
this.scriptService = scriptService;
56+
this.documentParsingProvider = documentParsingProvider;
5357
}
5458

5559
/**
@@ -174,14 +178,19 @@ static String calculateRouting(GetResult getResult, @Nullable IndexRequest updat
174178
* Prepare the request for merging the existing document with a new one, can optionally detect a noop change. Returns a {@code Result}
175179
* containing a new {@code IndexRequest} to be executed on the primary and replicas.
176180
*/
177-
static Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
181+
Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
178182
final IndexRequest currentRequest = request.doc();
179183
final String routing = calculateRouting(getResult, currentRequest);
184+
final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver();
180185
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
181186
final XContentType updateSourceContentType = sourceAndContent.v1();
182187
final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();
183188

184-
final boolean noop = XContentHelper.update(updatedSourceAsMap, currentRequest.sourceAsMap(), detectNoop) == false;
189+
final boolean noop = XContentHelper.update(
190+
updatedSourceAsMap,
191+
currentRequest.sourceAsMap(documentSizeObserver),
192+
detectNoop
193+
) == false;
185194

186195
// We can only actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle cases
187196
// where users repopulating multi-fields or adding synonyms, etc.
@@ -216,7 +225,8 @@ static Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request,
216225
.setIfPrimaryTerm(getResult.getPrimaryTerm())
217226
.waitForActiveShards(request.waitForActiveShards())
218227
.timeout(request.timeout())
219-
.setRefreshPolicy(request.getRefreshPolicy());
228+
.setRefreshPolicy(request.getRefreshPolicy())
229+
.setNormalisedBytesParsed(documentSizeObserver.normalisedBytesParsed());
220230
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
221231
}
222232
}
@@ -258,7 +268,8 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
258268
.setIfPrimaryTerm(getResult.getPrimaryTerm())
259269
.waitForActiveShards(request.waitForActiveShards())
260270
.timeout(request.timeout())
261-
.setRefreshPolicy(request.getRefreshPolicy());
271+
.setRefreshPolicy(request.getRefreshPolicy())
272+
.noParsedBytesToReport();
262273
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
263274
}
264275
case DELETE -> {

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,12 @@ static NodeConstruction prepareConstruction(
256256

257257
SearchModule searchModule = constructor.createSearchModule(settingsModule.getSettings(), threadPool);
258258
constructor.createClientAndRegistries(settingsModule.getSettings(), threadPool, searchModule);
259+
DocumentParsingProvider documentParsingProvider = constructor.getDocumentParsingProvider();
259260

260261
ScriptService scriptService = constructor.createScriptService(settingsModule, threadPool, serviceProvider);
261262

263+
constructor.createUpdateHelper(documentParsingProvider, scriptService);
264+
262265
constructor.construct(
263266
threadPool,
264267
settingsModule,
@@ -267,7 +270,8 @@ static NodeConstruction prepareConstruction(
267270
constructor.createAnalysisRegistry(),
268271
serviceProvider,
269272
forbidPrivateIndexSettings,
270-
telemetryProvider
273+
telemetryProvider,
274+
documentParsingProvider
271275
);
272276

273277
return constructor;
@@ -570,14 +574,18 @@ private ScriptService createScriptService(SettingsModule settingsModule, ThreadP
570574
threadPool::absoluteTimeInMillis
571575
);
572576
ScriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
573-
modules.add(b -> {
574-
b.bind(ScriptService.class).toInstance(scriptService);
575-
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
576-
});
577+
modules.add(b -> { b.bind(ScriptService.class).toInstance(scriptService); });
577578

578579
return scriptService;
579580
}
580581

582+
private UpdateHelper createUpdateHelper(DocumentParsingProvider documentParsingProvider, ScriptService scriptService) {
583+
UpdateHelper updateHelper = new UpdateHelper(scriptService, documentParsingProvider);
584+
585+
modules.add(b -> { b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService, documentParsingProvider)); });
586+
return updateHelper;
587+
}
588+
581589
private AnalysisRegistry createAnalysisRegistry() throws IOException {
582590
AnalysisRegistry registry = new AnalysisModule(
583591
environment,
@@ -596,7 +604,8 @@ private void construct(
596604
AnalysisRegistry analysisRegistry,
597605
NodeServiceProvider serviceProvider,
598606
boolean forbidPrivateIndexSettings,
599-
TelemetryProvider telemetryProvider
607+
TelemetryProvider telemetryProvider,
608+
DocumentParsingProvider documentParsingProvider
600609
) throws IOException {
601610

602611
Settings settings = settingsModule.getSettings();
@@ -612,12 +621,10 @@ private void construct(
612621
).collect(Collectors.toSet()),
613622
telemetryProvider.getTracer()
614623
);
615-
final Tracer tracer = telemetryProvider.getTracer();
616624

617625
ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
618626
clusterService.addStateApplier(scriptService);
619627

620-
DocumentParsingProvider documentParsingProvider = getDocumentParsingSupplier();
621628
modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
622629

623630
final IngestService ingestService = new IngestService(
@@ -1298,8 +1305,8 @@ private void postInjection(
12981305
logger.info("initialized");
12991306
}
13001307

1301-
private DocumentParsingProvider getDocumentParsingSupplier() {
1302-
return getSinglePlugin(DocumentParsingProviderPlugin.class).map(DocumentParsingProviderPlugin::getDocumentParsingSupplier)
1308+
private DocumentParsingProvider getDocumentParsingProvider() {
1309+
return getSinglePlugin(DocumentParsingProviderPlugin.class).map(DocumentParsingProviderPlugin::getDocumentParsingProvider)
13031310
.orElse(DocumentParsingProvider.EMPTY_INSTANCE);
13041311
}
13051312

server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProviderPlugin.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@
99
package org.elasticsearch.plugins.internal;
1010

1111
/**
12-
* An internal plugin that will return a supplier of DocumentParsingSupplier.
12+
* An internal plugin that will return a DocumentParsingProvider.
1313
*/
1414
public interface DocumentParsingProviderPlugin {
1515

1616
/**
17-
* @return a DocumentParsingSupplier to create instances of observer and reporter of parsing events
17+
* @return a DocumentParsingProvider to create instances of observer and reporter of parsing events
1818
*/
19-
DocumentParsingProvider getDocumentParsingSupplier();
19+
DocumentParsingProvider getDocumentParsingProvider();
2020
}

server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.env.Environment;
2323
import org.elasticsearch.index.get.GetResult;
2424
import org.elasticsearch.index.shard.ShardId;
25+
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
2526
import org.elasticsearch.script.MockScriptEngine;
2627
import org.elasticsearch.script.Script;
2728
import org.elasticsearch.script.ScriptEngine;
@@ -59,6 +60,7 @@
5960
import static org.hamcrest.Matchers.equalTo;
6061
import static org.hamcrest.Matchers.instanceOf;
6162
import static org.hamcrest.Matchers.notNullValue;
63+
import static org.mockito.Mockito.mock;
6264

6365
public class UpdateRequestTests extends ESTestCase {
6466

@@ -114,7 +116,7 @@ public void setUp() throws Exception {
114116
final MockScriptEngine engine = new MockScriptEngine("mock", scripts, Collections.emptyMap());
115117
Map<String, ScriptEngine> engines = Collections.singletonMap(engine.getType(), engine);
116118
ScriptService scriptService = new ScriptService(baseSettings, engines, ScriptModule.CORE_CONTEXTS, () -> 1L);
117-
updateHelper = new UpdateHelper(scriptService);
119+
updateHelper = new UpdateHelper(scriptService, DocumentParsingProvider.EMPTY_INSTANCE);
118120
}
119121

120122
@SuppressWarnings("unchecked")
@@ -590,22 +592,22 @@ public void testNoopDetection() throws Exception {
590592
try (var parser = createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))) {
591593
request = new UpdateRequest("test", "1").fromXContent(parser);
592594
}
593-
594-
UpdateHelper.Result result = UpdateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
595+
UpdateHelper updateHelper = new UpdateHelper(mock(ScriptService.class), DocumentParsingProvider.EMPTY_INSTANCE);
596+
UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
595597

596598
assertThat(result.action(), instanceOf(UpdateResponse.class));
597599
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.NOOP));
598600

599601
// Try again, with detectNoop turned off
600-
result = UpdateHelper.prepareUpdateIndexRequest(shardId, request, getResult, false);
602+
result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, false);
601603
assertThat(result.action(), instanceOf(IndexRequest.class));
602604
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED));
603605
assertThat(result.updatedSourceAsMap().get("body").toString(), equalTo("foo"));
604606

605607
try (var parser = createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"bar\"}}"))) {
606608
// Change the request to be a different doc
607609
request = new UpdateRequest("test", "1").fromXContent(parser);
608-
result = UpdateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
610+
result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
609611

610612
assertThat(result.action(), instanceOf(IndexRequest.class));
611613
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED));

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2200,7 +2200,7 @@ protected void assertSnapshotOrGenericThread() {
22002200
threadPool,
22012201
shardStateAction,
22022202
mappingUpdatedAction,
2203-
new UpdateHelper(scriptService),
2203+
new UpdateHelper(scriptService, DocumentParsingProvider.EMPTY_INSTANCE),
22042204
actionFilters,
22052205
indexingMemoryLimits,
22062206
EmptySystemIndices.INSTANCE,

0 commit comments

Comments
 (0)