Skip to content

Commit aef5c27

Browse files
authored
Initial implementation of random sampling (#135660)
1 parent b353381 commit aef5c27

File tree

6 files changed

+959
-51
lines changed

6 files changed

+959
-51
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
5353
import org.elasticsearch.common.Priority;
5454
import org.elasticsearch.common.TriConsumer;
55-
import org.elasticsearch.common.bytes.BytesReference;
5655
import org.elasticsearch.common.collect.ImmutableOpenMap;
5756
import org.elasticsearch.common.logging.DeprecationCategory;
5857
import org.elasticsearch.common.logging.DeprecationLogger;
@@ -1391,45 +1390,11 @@ private void attemptToSampleData(
13911390
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
13921391
* before overwriting it here. We can discard it after sampling.
13931392
*/
1394-
samplingService.maybeSample(projectMetadata, indexRequest.index(), () -> {
1395-
IndexRequest original = copyIndexRequestForSampling(indexRequest);
1396-
updateIndexRequestMetadata(original, originalDocumentMetadata);
1397-
return original;
1398-
}, ingestDocument);
1393+
samplingService.maybeSample(projectMetadata, originalDocumentMetadata.getIndex(), indexRequest, ingestDocument);
13991394

14001395
}
14011396
}
14021397

1403-
/**
1404-
* Creates a copy of an IndexRequest to be used by random sampling.
1405-
* @param original The IndexRequest to be copied
1406-
* @return A copy of the IndexRequest
1407-
*/
1408-
private IndexRequest copyIndexRequestForSampling(IndexRequest original) {
1409-
IndexRequest clonedRequest = new IndexRequest(original.index());
1410-
clonedRequest.id(original.id());
1411-
clonedRequest.routing(original.routing());
1412-
clonedRequest.version(original.version());
1413-
clonedRequest.versionType(original.versionType());
1414-
clonedRequest.setPipeline(original.getPipeline());
1415-
clonedRequest.setFinalPipeline(original.getFinalPipeline());
1416-
clonedRequest.setIfSeqNo(original.ifSeqNo());
1417-
clonedRequest.setIfPrimaryTerm(original.ifPrimaryTerm());
1418-
clonedRequest.setRefreshPolicy(original.getRefreshPolicy());
1419-
clonedRequest.waitForActiveShards(original.waitForActiveShards());
1420-
clonedRequest.timeout(original.timeout());
1421-
clonedRequest.opType(original.opType());
1422-
clonedRequest.setParentTask(original.getParentTask());
1423-
clonedRequest.setRequireDataStream(original.isRequireDataStream());
1424-
clonedRequest.setRequireAlias(original.isRequireAlias());
1425-
clonedRequest.setIncludeSourceOnError(original.getIncludeSourceOnError());
1426-
BytesReference source = original.source();
1427-
if (source != null) {
1428-
clonedRequest.source(source, original.getContentType());
1429-
}
1430-
return clonedRequest;
1431-
}
1432-
14331398
private static void executePipeline(
14341399
final IngestDocument ingestDocument,
14351400
final Pipeline pipeline,

0 commit comments

Comments
 (0)