Skip to content

Commit bee7dbf

Browse files
committed
DATAES-947 - Use Flux.expand(…) for recursive reactive paging.
We now use Flux.expand(…) to recursively fetch search results (SearchRequest followed by multiple SearchScrollRequests) until consuming all search hits. Previously we used inbound/outbound sinks to mimic a continuations by sending a request once the previous request finished. The recursive operator allows now for a simplified operator chain along with improved readability.
1 parent d80a4bd commit bee7dbf

File tree

1 file changed

+14
-44
lines changed

1 file changed

+14
-44
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.http.util.EntityUtils;
4747
import org.elasticsearch.ElasticsearchException;
4848
import org.elasticsearch.ElasticsearchStatusException;
49-
import org.elasticsearch.action.ActionRequest;
5049
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
5150
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
5251
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -467,59 +466,30 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
467466
searchRequest.scroll(scrollTimeout);
468467
}
469468

470-
Sinks.Many<ActionRequest> requests = Sinks.many().unicast().onBackpressureBuffer();
471-
Sinks.Many<SearchResponse> inbound = Sinks.many().unicast().onBackpressureBuffer();
472-
473-
Flux<SearchResponse> exchange = requests.asFlux().flatMap(it -> {
474-
475-
if (it instanceof SearchRequest) {
476-
return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers);
477-
} else if (it instanceof SearchScrollRequest) {
478-
return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers);
479-
} else if (it instanceof ClearScrollRequest) {
480-
return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers)
481-
.flatMap(discard -> Flux.empty());
482-
}
483-
484-
return Flux.error(new IllegalArgumentException(String
485-
.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it)));
486-
});
487-
488469
return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
489470

490-
scrollState -> {
491-
492-
Flux<SearchHit> searchHits = inbound.asFlux().<SearchResponse> handle((searchResponse, sink) -> {
471+
state -> {
493472

494-
scrollState.updateScrollId(searchResponse.getScrollId());
495-
if (isEmpty(searchResponse.getHits())) {
473+
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers)
474+
.expand(searchResponse -> {
496475

497-
inbound.tryEmitComplete();
498-
requests.tryEmitComplete();
476+
state.updateScrollId(searchResponse.getScrollId());
477+
if (isEmpty(searchResponse.getHits())) {
478+
return Mono.empty();
479+
}
499480

500-
} else {
501-
502-
sink.next(searchResponse);
503-
504-
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
505-
.scroll(scrollTimeout);
506-
requests.emitNext(searchScrollRequest, Sinks.EmitFailureHandler.FAIL_FAST);
507-
}
508-
509-
}).map(SearchResponse::getHits) //
510-
.flatMap(Flux::fromIterable);
511-
512-
return searchHits.doOnSubscribe(ignore -> {
513-
exchange.subscribe(new SinkSubscriber(inbound));
514-
requests.emitNext(searchRequest, Sinks.EmitFailureHandler.FAIL_FAST);
515-
});
481+
return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout),
482+
requestCreator.scroll(), SearchResponse.class, headers);
516483

484+
});
517485
}, state -> cleanupScroll(headers, state), //
518486
(state, ex) -> cleanupScroll(headers, state), //
519-
state -> cleanupScroll(headers, state)); //
487+
state -> cleanupScroll(headers, state)) //
488+
.filter(it -> !isEmpty(it.getHits())) //
489+
.map(SearchResponse::getHits) //
490+
.flatMapIterable(Function.identity()); //
520491
}
521492

522-
523493
private static boolean isEmpty(@Nullable SearchHits hits) {
524494
return hits != null && hits.getHits() != null && hits.getHits().length == 0;
525495
}

0 commit comments

Comments
 (0)