3535import org .elasticsearch .index .reindex .AbstractBulkByScrollRequest ;
3636import org .elasticsearch .index .reindex .BulkByScrollResponse ;
3737import org .elasticsearch .index .reindex .BulkByScrollTask ;
38- import org .elasticsearch .index .reindex .ClientScrollableHitSource ;
38+ import org .elasticsearch .index .reindex .ClientScrollablePaginatedHitSource ;
39+ import org .elasticsearch .index .reindex .PaginatedHitSource ;
40+ import org .elasticsearch .index .reindex .PaginatedHitSource .SearchFailure ;
3941import org .elasticsearch .index .reindex .ResumeInfo .WorkerResumeInfo ;
40- import org .elasticsearch .index .reindex .ScrollableHitSource ;
41- import org .elasticsearch .index .reindex .ScrollableHitSource .SearchFailure ;
4242import org .elasticsearch .index .reindex .WorkerBulkByScrollTaskState ;
4343import org .elasticsearch .script .CtxMap ;
4444import org .elasticsearch .script .Metadata ;
@@ -99,14 +99,14 @@ public abstract class AbstractAsyncBulkByScrollAction<
9999 private final ParentTaskAssigningClient bulkClient ;
100100 private final ActionListener <BulkByScrollResponse > listener ;
101101 private final Retry bulkRetry ;
102- private final ScrollableHitSource scrollSource ;
102+ private final PaginatedHitSource paginatedHitSource ;
103103
104104 /**
105105 * This BiFunction is used to apply various changes depending of the Reindex action and the search hit,
106106 * from copying search hit metadata (parent, routing, etc) to potentially transforming the
107107 * {@link RequestWrapper} completely.
108108 */
109- private final BiFunction <RequestWrapper <?>, ScrollableHitSource .Hit , RequestWrapper <?>> scriptApplier ;
109+ private final BiFunction <RequestWrapper <?>, PaginatedHitSource .Hit , RequestWrapper <?>> scriptApplier ;
110110 private int lastBatchSize ;
111111 /**
112112 * Keeps track of the total number of bulk operations performed
@@ -176,15 +176,15 @@ public abstract class AbstractAsyncBulkByScrollAction<
176176 this .listener = listener ;
177177 BackoffPolicy backoffPolicy = buildBackoffPolicy ();
178178 bulkRetry = new Retry (BackoffPolicy .wrap (backoffPolicy , worker ::countBulkRetry ), threadPool );
179- scrollSource = buildScrollableResultSource (
179+ paginatedHitSource = buildScrollableResultSource (
180180 backoffPolicy ,
181181 prepareSearchRequest (mainRequest , needsSourceDocumentVersions , needsSourceDocumentSeqNoAndPrimaryTerm , needsVectors )
182182 );
183183 scriptApplier = Objects .requireNonNull (buildScriptApplier (), "script applier must not be null" );
184184 }
185185
186186 /**
187- * Prepares a search request to be used in a ScrollableHitSource .
187+ * Prepares a search request to be used in a {@link PaginatedHitSource} .
188188 * Preparation might set a sort order (if not set already) and disable scroll if max docs is small enough.
189189 */
190190 // Visible for testing
@@ -238,7 +238,7 @@ static <Request extends AbstractBulkByScrollRequest<Request>> SearchRequest prep
238238 *
239239 * Public for testings....
240240 */
241- public BiFunction <RequestWrapper <?>, ScrollableHitSource .Hit , RequestWrapper <?>> buildScriptApplier () {
241+ public BiFunction <RequestWrapper <?>, PaginatedHitSource .Hit , RequestWrapper <?>> buildScriptApplier () {
242242 // The default script applier executes a no-op
243243 return (request , searchHit ) -> request ;
244244 }
@@ -248,12 +248,12 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
248248 * metadata or scripting. That will be handled by copyMetadata and
249249 * apply functions that can be overridden.
250250 */
251- protected abstract RequestWrapper <?> buildRequest (ScrollableHitSource .Hit doc );
251+ protected abstract RequestWrapper <?> buildRequest (PaginatedHitSource .Hit doc );
252252
253253 /**
254254 * Copies the metadata from a hit to the request.
255255 */
256- protected RequestWrapper <?> copyMetadata (RequestWrapper <?> request , ScrollableHitSource .Hit doc ) {
256+ protected RequestWrapper <?> copyMetadata (RequestWrapper <?> request , PaginatedHitSource .Hit doc ) {
257257 copyRouting (request , doc .getRouting ());
258258 return request ;
259259 }
@@ -270,7 +270,7 @@ protected void copyRouting(RequestWrapper<?> request, String routing) {
270270 * from the bulk request. It is also where we fail on invalid search hits, like
271271 * when the document has no source but it's required.
272272 */
273- protected boolean accept (ScrollableHitSource .Hit doc ) {
273+ protected boolean accept (PaginatedHitSource .Hit doc ) {
274274 if (doc .getSource () == null ) {
275275 /*
276276 * Either the document didn't store _source or we didn't fetch it for some reason. Since we don't allow the user to
@@ -282,9 +282,9 @@ protected boolean accept(ScrollableHitSource.Hit doc) {
282282 return true ;
283283 }
284284
285- protected BulkRequest buildBulk (Iterable <? extends ScrollableHitSource .Hit > docs ) {
285+ protected BulkRequest buildBulk (Iterable <? extends PaginatedHitSource .Hit > docs ) {
286286 BulkRequest bulkRequest = new BulkRequest ();
287- for (ScrollableHitSource .Hit doc : docs ) {
287+ for (PaginatedHitSource .Hit doc : docs ) {
288288 if (accept (doc )) {
289289 RequestWrapper <?> request = scriptApplier .apply (copyMetadata (buildRequest (doc ), doc ), doc );
290290 if (request != null ) {
@@ -295,8 +295,8 @@ protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs
295295 return bulkRequest ;
296296 }
297297
298- protected ScrollableHitSource buildScrollableResultSource (BackoffPolicy backoffPolicy , SearchRequest searchRequest ) {
299- return new ClientScrollableHitSource (
298+ protected PaginatedHitSource buildScrollableResultSource (BackoffPolicy backoffPolicy , SearchRequest searchRequest ) {
299+ return new ClientScrollablePaginatedHitSource (
300300 logger ,
301301 backoffPolicy ,
302302 threadPool ,
@@ -338,17 +338,17 @@ public void start() {
338338 WorkerResumeInfo workerResumeInfo = resumeInfo .getWorker ().get ();
339339 startTime .set (workerResumeInfo .startTime ());
340340 worker .restoreState (workerResumeInfo .status ());
341- scrollSource .resume (workerResumeInfo );
341+ paginatedHitSource .resume (workerResumeInfo );
342342 } else {
343343 startTime .set (System .nanoTime ());
344- scrollSource .start ();
344+ paginatedHitSource .start ();
345345 }
346346 } catch (Exception e ) {
347347 finishHim (e );
348348 }
349349 }
350350
351- void onScrollResponse (ScrollableHitSource .AsyncResponse asyncResponse ) {
351+ void onScrollResponse (PaginatedHitSource .AsyncResponse asyncResponse ) {
352352 onScrollResponse (new ScrollConsumableHitsResponse (asyncResponse ));
353353 }
354354
@@ -362,10 +362,10 @@ void onScrollResponse(ScrollConsumableHitsResponse asyncResponse) {
362362 * Process a scroll response.
363363 * @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
364364 * @param lastBatchSizeToUse the size of the last batch. Used to calculate the throttling delay.
365- * @param asyncResponse the response to process from ScrollableHitSource
365+ * @param asyncResponse the response to process from {@link PaginatedHitSource}
366366 */
367367 void onScrollResponse (long lastBatchStartTimeNS , int lastBatchSizeToUse , ScrollConsumableHitsResponse asyncResponse ) {
368- ScrollableHitSource .Response response = asyncResponse .response ();
368+ PaginatedHitSource .Response response = asyncResponse .response ();
369369 logger .debug ("[{}]: got scroll response with [{}] hits" , task .getId (), asyncResponse .remainingHits ());
370370 if (task .isCancelled ()) {
371371 logger .debug ("[{}]: finishing early because the task was cancelled" , task .getId ());
@@ -420,7 +420,7 @@ void prepareBulkRequest(long thisBatchStartTimeNS, ScrollConsumableHitsResponse
420420 return ;
421421 }
422422 worker .countBatch ();
423- final List <? extends ScrollableHitSource .Hit > hits ;
423+ final List <? extends PaginatedHitSource .Hit > hits ;
424424
425425 if (mainRequest .getMaxDocs () != MAX_DOCS_ALL_MATCHES ) {
426426 // Truncate the hits if we have more than the request max docs
@@ -528,7 +528,7 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) {
528528 return ;
529529 }
530530
531- if (scrollSource .hasScroll () == false ) {
531+ if (paginatedHitSource .hasScroll () == false ) {
532532 // Index contains fewer matching docs than max_docs (found < max_docs <= scroll size)
533533 refreshAndFinish (emptyList (), emptyList (), false );
534534 return ;
@@ -611,7 +611,7 @@ protected void finishHim(Exception failure) {
611611 */
612612 protected void finishHim (Exception failure , List <Failure > indexingFailures , List <SearchFailure > searchFailures , boolean timedOut ) {
613613 logger .debug ("[{}]: finishing without any catastrophic failures" , task .getId ());
614- scrollSource .close (threadPool .getThreadContext ().preserveContext (() -> {
614+ paginatedHitSource .close (threadPool .getThreadContext ().preserveContext (() -> {
615615 if (failure == null ) {
616616 BulkByScrollResponse response = buildResponse (
617617 timeValueNanos (System .nanoTime () - startTime .get ()),
@@ -645,7 +645,7 @@ void addDestinationIndices(Collection<String> indices) {
645645 * Set the last returned scrollId. Exists entirely for testing.
646646 */
647647 void setScroll (String scroll ) {
648- scrollSource .setScroll (scroll );
648+ paginatedHitSource .setScroll (scroll );
649649 }
650650
651651 /**
@@ -841,7 +841,7 @@ public static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) {
841841 */
842842 public abstract static class ScriptApplier <T extends Metadata >
843843 implements
844- BiFunction <RequestWrapper <?>, ScrollableHitSource .Hit , RequestWrapper <?>> {
844+ BiFunction <RequestWrapper <?>, PaginatedHitSource .Hit , RequestWrapper <?>> {
845845
846846 // "index" is the default operation
847847 protected static final String INDEX = "index" ;
@@ -867,7 +867,7 @@ public ScriptApplier(
867867 }
868868
869869 @ Override
870- public RequestWrapper <?> apply (RequestWrapper <?> request , ScrollableHitSource .Hit doc ) {
870+ public RequestWrapper <?> apply (RequestWrapper <?> request , PaginatedHitSource .Hit doc ) {
871871 if (script == null ) {
872872 return request ;
873873 }
@@ -883,7 +883,7 @@ public RequestWrapper<?> apply(RequestWrapper<?> request, ScrollableHitSource.Hi
883883 return requestFromOp (request , metadata .getOp ());
884884 }
885885
886- protected abstract CtxMap <T > execute (ScrollableHitSource .Hit doc , Map <String , Object > source );
886+ protected abstract CtxMap <T > execute (PaginatedHitSource .Hit doc , Map <String , Object > source );
887887
888888 protected abstract void updateRequest (RequestWrapper <?> request , T metadata );
889889
@@ -909,24 +909,24 @@ protected RequestWrapper<?> requestFromOp(RequestWrapper<?> request, String op)
909909 }
910910
911911 static class ScrollConsumableHitsResponse {
912- private final ScrollableHitSource .AsyncResponse asyncResponse ;
913- private final List <? extends ScrollableHitSource .Hit > hits ;
912+ private final PaginatedHitSource .AsyncResponse asyncResponse ;
913+ private final List <? extends PaginatedHitSource .Hit > hits ;
914914 private int consumedOffset = 0 ;
915915
916- ScrollConsumableHitsResponse (ScrollableHitSource .AsyncResponse asyncResponse ) {
916+ ScrollConsumableHitsResponse (PaginatedHitSource .AsyncResponse asyncResponse ) {
917917 this .asyncResponse = asyncResponse ;
918918 this .hits = asyncResponse .response ().getHits ();
919919 }
920920
921- ScrollableHitSource .Response response () {
921+ PaginatedHitSource .Response response () {
922922 return asyncResponse .response ();
923923 }
924924
925- List <? extends ScrollableHitSource .Hit > consumeRemainingHits () {
925+ List <? extends PaginatedHitSource .Hit > consumeRemainingHits () {
926926 return consumeHits (remainingHits ());
927927 }
928928
929- List <? extends ScrollableHitSource .Hit > consumeHits (int numberOfHits ) {
929+ List <? extends PaginatedHitSource .Hit > consumeHits (int numberOfHits ) {
930930 if (numberOfHits < 0 ) {
931931 throw new IllegalArgumentException ("Invalid number of hits to consume [" + numberOfHits + "]" );
932932 }
0 commit comments