Skip to content

Commit cf03c66

Browse files
authored
Infrastructure to meter updates by script for ra-s nontimeseries (#108910)
this commit refactors the metering for billing api so that we can hide the implementation details of DocumentSizeObserver creation and adds additional field `originatesFromScript` on IndexRequest There will no longer need to have a code checking if the request was already parsed in ingest service or updatehelper. This logic will be hidden in the implementation.
1 parent cada2ea commit cf03c66

File tree

11 files changed

+82
-63
lines changed

11 files changed

+82
-63
lines changed

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverWithPipelinesIT.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.plugins.internal;
1010

11+
import org.elasticsearch.action.DocWriteRequest;
1112
import org.elasticsearch.action.index.IndexRequest;
1213
import org.elasticsearch.action.ingest.PutPipelineRequest;
1314
import org.elasticsearch.common.bytes.BytesArray;
@@ -89,13 +90,12 @@ public DocumentParsingProvider getDocumentParsingProvider() {
8990
// returns a static instance, because we want to assert that the wrapping is called only once
9091
return new DocumentParsingProvider() {
9192
@Override
92-
public DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
93-
providedFixedSize.set(normalisedBytesParsed);
94-
return new TestDocumentSizeObserver(normalisedBytesParsed);
95-
}
96-
97-
@Override
98-
public DocumentSizeObserver newDocumentSizeObserver() {
93+
public <T> DocumentSizeObserver newDocumentSizeObserver(DocWriteRequest<T> request) {
94+
if (request instanceof IndexRequest indexRequest && indexRequest.getNormalisedBytesParsed() > 0) {
95+
long normalisedBytesParsed = indexRequest.getNormalisedBytesParsed();
96+
providedFixedSize.set(normalisedBytesParsed);
97+
return new TestDocumentSizeObserver(normalisedBytesParsed);
98+
}
9999
return new TestDocumentSizeObserver(0L);
100100
}
101101

@@ -137,6 +137,7 @@ public Map<String, Object> map() throws IOException {
137137
public long normalisedBytesParsed() {
138138
return mapCounter;
139139
}
140+
140141
}
141142

142143
}

server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverIT.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.plugins.internal;
1010

11+
import org.elasticsearch.action.DocWriteRequest;
1112
import org.elasticsearch.action.index.IndexRequest;
1213
import org.elasticsearch.index.IndexSettings;
1314
import org.elasticsearch.index.engine.EngineFactory;
@@ -107,7 +108,8 @@ public IndexResult index(Index index) throws IOException {
107108
config().getMapperService(),
108109
DocumentSizeAccumulator.EMPTY_INSTANCE
109110
);
110-
documentParsingReporter.onIndexingCompleted(index.parsedDoc());
111+
ParsedDocument parsedDocument = index.parsedDoc();
112+
documentParsingReporter.onIndexingCompleted(parsedDocument);
111113

112114
return result;
113115
}
@@ -122,15 +124,9 @@ public TestDocumentParsingProviderPlugin() {}
122124
@Override
123125
public DocumentParsingProvider getDocumentParsingProvider() {
124126
return new DocumentParsingProvider() {
125-
126-
@Override
127-
public DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
128-
return new TestDocumentSizeObserver();
129-
}
130-
131127
@Override
132-
public DocumentSizeObserver newDocumentSizeObserver() {
133-
return new TestDocumentSizeObserver();
128+
public <T> DocumentSizeObserver newDocumentSizeObserver(DocWriteRequest<T> request) {
129+
return new TestDocumentSizeObserver(0L);
134130
}
135131

136132
@Override
@@ -155,19 +151,23 @@ public TestDocumentSizeReporter(String indexName) {
155151

156152
@Override
157153
public void onIndexingCompleted(ParsedDocument parsedDocument) {
158-
DocumentSizeObserver documentSizeObserver = parsedDocument.getDocumentSizeObserver();
159-
COUNTER.addAndGet(documentSizeObserver.normalisedBytesParsed());
154+
COUNTER.addAndGet(parsedDocument.getDocumentSizeObserver().normalisedBytesParsed());
160155
assertThat(indexName, equalTo(TEST_INDEX_NAME));
161156
}
162157
}
163158

164159
public static class TestDocumentSizeObserver implements DocumentSizeObserver {
165160
long counter = 0;
166161

162+
public TestDocumentSizeObserver(long counter) {
163+
this.counter = counter;
164+
}
165+
167166
@Override
168167
public XContentParser wrapParser(XContentParser xContentParser) {
169168
hasWrappedParser = true;
170169
return new FilterXContentParserWrapper(xContentParser) {
170+
171171
@Override
172172
public Token nextToken() throws IOException {
173173
counter++;
@@ -180,5 +180,6 @@ public Token nextToken() throws IOException {
180180
public long normalisedBytesParsed() {
181181
return counter;
182182
}
183+
183184
}
184185
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ static TransportVersion def(int id) {
211211
public static final TransportVersion ML_INFERENCE_AMAZON_BEDROCK_ADDED = def(8_702_00_0);
212212
public static final TransportVersion ML_INFERENCE_DONT_DELETE_WHEN_SEMANTIC_TEXT_EXISTS = def(8_703_00_0);
213213
public static final TransportVersion INFERENCE_ADAPTIVE_ALLOCATIONS = def(8_704_00_0);
214+
public static final TransportVersion INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN = def(8_705_00_0);
214215

215216
/*
216217
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,8 @@ static boolean executeBulkItemRequest(
362362
);
363363
} else {
364364
final IndexRequest request = context.getRequestToExecute();
365-
DocumentSizeObserver documentSizeObserver = getDocumentSizeObserver(documentParsingProvider, request);
365+
366+
DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver(request);
366367

367368
context.setDocumentSizeObserver(documentSizeObserver);
368369
final SourceToParse sourceToParse = new SourceToParse(
@@ -458,25 +459,6 @@ public void onFailure(Exception e) {
458459
return false;
459460
}
460461

461-
/**
462-
* Creates a new document size observer
463-
* @param documentParsingProvider a provider to create a new observer.
464-
* @param request an index request to provide information about bytes being already parsed.
465-
* @return a Fixed version of DocumentSizeObserver if parsing already happened (in IngestService, UpdateHelper)
466-
* and there is a value to be reported >0
467-
* It would be pre-populated with information about how many bytes were already parsed
468-
* or a noop instance if parsed bytes in IngestService/UpdateHelper was 0 (like when empty doc or script in update)
469-
* or return a new DocumentSizeObserver that will be used when parsing.
470-
*/
471-
private static DocumentSizeObserver getDocumentSizeObserver(DocumentParsingProvider documentParsingProvider, IndexRequest request) {
472-
if (request.getNormalisedBytesParsed() > 0) {
473-
return documentParsingProvider.newFixedSizeDocumentObserver(request.getNormalisedBytesParsed());
474-
} else if (request.getNormalisedBytesParsed() == 0) {
475-
return DocumentSizeObserver.EMPTY_INSTANCE;
476-
} // request.getNormalisedBytesParsed() -1, meaning normalisedBytesParsed isn't set as parsing wasn't done yet
477-
return documentParsingProvider.newDocumentSizeObserver();
478-
}
479-
480462
private static Engine.Result exceptionToResult(Exception e, IndexShard primary, boolean isDelete, long version, String id) {
481463
assert id != null;
482464
return isDelete ? primary.getFailedDeleteResult(e, version, id) : primary.getFailedIndexResult(e, version, id);

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
146146
*/
147147
private Object rawTimestamp;
148148
private long normalisedBytesParsed = -1;
149+
private boolean originatesFromUpdateByScript;
149150

150151
public IndexRequest(StreamInput in) throws IOException {
151152
this(null, in);
@@ -197,6 +198,12 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
197198
} else {
198199
requireDataStream = false;
199200
}
201+
202+
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
203+
originatesFromUpdateByScript = in.readBoolean();
204+
} else {
205+
originatesFromUpdateByScript = false;
206+
}
200207
}
201208

202209
public IndexRequest() {
@@ -757,6 +764,10 @@ private void writeBody(StreamOutput out) throws IOException {
757764
out.writeBoolean(requireDataStream);
758765
out.writeZLong(normalisedBytesParsed);
759766
}
767+
768+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
769+
out.writeBoolean(originatesFromUpdateByScript);
770+
}
760771
}
761772

762773
@Override
@@ -959,4 +970,13 @@ public List<String> getExecutedPipelines() {
959970
return Collections.unmodifiableList(executedPipelines);
960971
}
961972
}
973+
974+
public IndexRequest setOriginatesFromUpdateByScript(boolean originatesFromUpdateByScript) {
975+
this.originatesFromUpdateByScript = originatesFromUpdateByScript;
976+
return this;
977+
}
978+
979+
public boolean originatesFromUpdateByScript() {
980+
return this.originatesFromUpdateByScript;
981+
}
962982
}

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ static String calculateRouting(GetResult getResult, @Nullable IndexRequest updat
181181
Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
182182
final IndexRequest currentRequest = request.doc();
183183
final String routing = calculateRouting(getResult, currentRequest);
184-
final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver();
184+
final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver(request);
185185
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
186186
final XContentType updateSourceContentType = sourceAndContent.v1();
187187
final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();
@@ -218,7 +218,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
218218
return new Result(update, DocWriteResponse.Result.NOOP, updatedSourceAsMap, updateSourceContentType);
219219
} else {
220220
String index = request.index();
221-
final IndexRequest finalIndexRequest = new IndexRequest(index).id(request.id())
221+
IndexRequest finalIndexRequest = new IndexRequest(index).id(request.id())
222222
.routing(routing)
223223
.source(updatedSourceAsMap, updateSourceContentType)
224224
.setIfSeqNo(getResult.getSeqNo())
@@ -227,6 +227,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
227227
.timeout(request.timeout())
228228
.setRefreshPolicy(request.getRefreshPolicy())
229229
.setNormalisedBytesParsed(documentSizeObserver.normalisedBytesParsed());
230+
230231
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
231232
}
232233
}
@@ -261,15 +262,15 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
261262
switch (operation) {
262263
case INDEX -> {
263264
String index = request.index();
264-
final IndexRequest indexRequest = new IndexRequest(index).id(request.id())
265+
IndexRequest indexRequest = new IndexRequest(index).id(request.id())
265266
.routing(routing)
266267
.source(updatedSourceAsMap, updateSourceContentType)
267268
.setIfSeqNo(getResult.getSeqNo())
268269
.setIfPrimaryTerm(getResult.getPrimaryTerm())
269270
.waitForActiveShards(request.waitForActiveShards())
270271
.timeout(request.timeout())
271272
.setRefreshPolicy(request.getRefreshPolicy())
272-
.noParsedBytesToReport();
273+
.setOriginatesFromUpdateByScript(true);
273274
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
274275
}
275276
case DELETE -> {

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ protected void doRun() {
745745
}
746746
final int slot = i;
747747
final Releasable ref = refs.acquire();
748-
final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver();
748+
final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver(indexRequest);
749749
final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentSizeObserver);
750750
final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
751751
// the document listener gives us three-way logic: a document can fail processing (1), or it can

server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProvider.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.plugins.internal;
1010

11+
import org.elasticsearch.action.DocWriteRequest;
1112
import org.elasticsearch.index.mapper.MapperService;
1213

1314
/**
@@ -17,20 +18,6 @@ public interface DocumentParsingProvider {
1718
DocumentParsingProvider EMPTY_INSTANCE = new DocumentParsingProvider() {
1819
};
1920

20-
/**
21-
* @return a new 'empty' observer to use when observing parsing
22-
*/
23-
default DocumentSizeObserver newDocumentSizeObserver() {
24-
return DocumentSizeObserver.EMPTY_INSTANCE;
25-
}
26-
27-
/**
28-
* @return an observer with a previously observed value (fixed to this value, not continuing)
29-
*/
30-
default DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
31-
return DocumentSizeObserver.EMPTY_INSTANCE;
32-
}
33-
3421
/**
3522
* @return an instance of a reporter to use when parsing has been completed and indexing successful
3623
*/
@@ -49,4 +36,10 @@ default DocumentSizeAccumulator createDocumentSizeAccumulator() {
4936
return DocumentSizeAccumulator.EMPTY_INSTANCE;
5037
}
5138

39+
/**
40+
* @return an observer
41+
*/
42+
default <T> DocumentSizeObserver newDocumentSizeObserver(DocWriteRequest<T> request) {
43+
return DocumentSizeObserver.EMPTY_INSTANCE;
44+
}
5245
}

server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeObserver.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public XContentParser wrapParser(XContentParser xContentParser) {
2828
public long normalisedBytesParsed() {
2929
return 0;
3030
}
31+
3132
};
3233

3334
/**
@@ -40,7 +41,17 @@ public long normalisedBytesParsed() {
4041

4142
/**
4243
* Returns the state gathered during parsing
44+
*
4345
* @return a number representing a state parsed
4446
*/
4547
long normalisedBytesParsed();
48+
49+
/**
50+
* Indicates if an observer was used on an update request with script
51+
*
52+
* @return true if update was done by script, false otherwise
53+
*/
54+
default boolean isUpdateByScript() {
55+
return false;
56+
}
4657
}

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.threadpool.TestThreadPool;
5252
import org.elasticsearch.threadpool.ThreadPool;
5353
import org.elasticsearch.threadpool.ThreadPool.Names;
54+
import org.mockito.ArgumentCaptor;
5455

5556
import java.io.IOException;
5657
import java.util.Collections;
@@ -588,6 +589,7 @@ public void testUpdateRequestWithFailure() throws Exception {
588589
assertThat(failure.getStatus(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
589590
}
590591

592+
@SuppressWarnings("unchecked")
591593
public void testUpdateRequestWithConflictFailure() throws Exception {
592594
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);
593595
int retries = randomInt(4);
@@ -651,11 +653,14 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
651653
assertThat(failure.getCause(), equalTo(err));
652654
assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT));
653655

654-
// we have set noParsedBytesToReport on the IndexRequest, like it happens with updates by script.
655-
verify(documentParsingProvider, times(0)).newDocumentSizeObserver();
656-
verify(documentParsingProvider, times(0)).newFixedSizeDocumentObserver(any(Integer.class));
656+
// we have set 0 value on normalisedBytesParsed on the IndexRequest, like it happens with updates by script.
657+
ArgumentCaptor<IndexRequest> argument = ArgumentCaptor.forClass(IndexRequest.class);
658+
verify(documentParsingProvider, times(retries + 1)).newDocumentSizeObserver(argument.capture());
659+
IndexRequest value = argument.getValue();
660+
assertThat(value.getNormalisedBytesParsed(), equalTo(0L));
657661
}
658662

663+
@SuppressWarnings("unchecked")
659664
public void testUpdateRequestWithSuccess() throws Exception {
660665
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);
661666
DocWriteRequest<UpdateRequest> writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
@@ -715,8 +720,11 @@ public void testUpdateRequestWithSuccess() throws Exception {
715720
DocWriteResponse response = primaryResponse.getResponse();
716721
assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK));
717722
assertThat(response.getSeqNo(), equalTo(13L));
718-
verify(documentParsingProvider, times(0)).newDocumentSizeObserver();
719-
verify(documentParsingProvider, times(1)).newFixedSizeDocumentObserver(eq(100L));
723+
724+
ArgumentCaptor<IndexRequest> argument = ArgumentCaptor.forClass(IndexRequest.class);
725+
verify(documentParsingProvider, times(1)).newDocumentSizeObserver(argument.capture());
726+
IndexRequest value = argument.getValue();
727+
assertThat(value.getNormalisedBytesParsed(), equalTo(100L));
720728
}
721729

722730
public void testUpdateWithDelete() throws Exception {

0 commit comments

Comments
 (0)