Skip to content

Commit 8a6e125

Browse files
authored
DATAES-796 - Add method returning Mono<SearchPage<T>>.
Original PR: #540
1 parent 23ac6d7 commit 8a6e125

File tree

8 files changed

+267
-89
lines changed

8 files changed

+267
-89
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.netty.handler.timeout.WriteTimeoutHandler;
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
27-
import reactor.core.publisher.Sinks;
2827
import reactor.netty.http.client.HttpClient;
2928
import reactor.netty.transport.ProxyProvider;
3029

@@ -104,9 +103,7 @@
104103
import org.elasticsearch.search.aggregations.Aggregation;
105104
import org.elasticsearch.search.suggest.Suggest;
106105
import org.reactivestreams.Publisher;
107-
import org.reactivestreams.Subscriber;
108-
import org.reactivestreams.Subscription;
109-
106+
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
110107
import org.springframework.data.elasticsearch.client.ClientConfiguration;
111108
import org.springframework.data.elasticsearch.client.ClientLogger;
112109
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
@@ -428,6 +425,11 @@ public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest)
428425
.flatMap(Flux::fromIterable);
429426
}
430427

428+
@Override
429+
public Mono<SearchResponse> searchForResponse(HttpHeaders headers, SearchRequest searchRequest) {
430+
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers).next();
431+
}
432+
431433
@Override
432434
public Flux<Suggest> suggest(HttpHeaders headers, SearchRequest searchRequest) {
433435
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
@@ -468,21 +470,19 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
468470

469471
return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
470472

471-
state -> {
472-
473-
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers)
474-
.expand(searchResponse -> {
473+
state -> sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers)
474+
.expand(searchResponse -> {
475475

476-
state.updateScrollId(searchResponse.getScrollId());
477-
if (isEmpty(searchResponse.getHits())) {
478-
return Mono.empty();
479-
}
476+
state.updateScrollId(searchResponse.getScrollId());
477+
if (isEmpty(searchResponse.getHits())) {
478+
return Mono.empty();
479+
}
480480

481-
return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout),
482-
requestCreator.scroll(), SearchResponse.class, headers);
481+
return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout),
482+
requestCreator.scroll(), SearchResponse.class, headers);
483483

484-
});
485-
}, state -> cleanupScroll(headers, state), //
484+
}),
485+
state -> cleanupScroll(headers, state), //
486486
(state, ex) -> cleanupScroll(headers, state), //
487487
state -> cleanupScroll(headers, state)) //
488488
.filter(it -> !isEmpty(it.getHits())) //
@@ -776,6 +776,10 @@ private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseTy
776776

777777
Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class);
778778

779+
if (fromXContent == null) {
780+
return Mono.error(new UncategorizedElasticsearchException(
781+
"No method named fromXContent found in " + responseType.getCanonicalName()));
782+
}
779783
return Mono.justOrEmpty(responseType
780784
.cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
781785

@@ -925,34 +929,5 @@ public Collection<ElasticsearchHost> hosts() {
925929
}
926930
}
927931

928-
private static class SinkSubscriber implements Subscriber<SearchResponse> {
929-
930-
private final Sinks.Many<SearchResponse> inbound;
931-
932-
public SinkSubscriber(Sinks.Many<SearchResponse> inbound) {
933-
this.inbound = inbound;
934-
}
935-
936-
@Override
937-
public void onSubscribe(Subscription s) {
938-
s.request(Long.MAX_VALUE);
939-
}
940-
941-
@Override
942-
public void onNext(SearchResponse searchResponse) {
943-
inbound.emitNext(searchResponse, Sinks.EmitFailureHandler.FAIL_FAST);
944-
}
945-
946-
@Override
947-
public void onError(Throwable t) {
948-
inbound.emitError(t, Sinks.EmitFailureHandler.FAIL_FAST);
949-
}
950-
951-
@Override
952-
public void onComplete() {
953-
inbound.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
954-
}
955-
}
956-
957932
// endregion
958933
}

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,31 @@ default Flux<SearchHit> search(SearchRequest searchRequest) {
427427
*/
428428
Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest);
429429

430+
/**
431+
* Execute the given {@link SearchRequest} against the {@literal search} API returning the whole response in one Mono.
432+
*
433+
* @param searchRequest must not be {@literal null}.
434+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
435+
* elastic.co</a>
436+
* @return the {@link Mono} emitting the whole {@link SearchResponse}.
437+
* @since 4.1
438+
*/
439+
default Mono<SearchResponse> searchForResponse(SearchRequest searchRequest) {
440+
return searchForResponse(HttpHeaders.EMPTY, searchRequest);
441+
}
442+
443+
/**
444+
* Execute the given {@link SearchRequest} against the {@literal search} API returning the whole response in one Mono.
445+
*
446+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
447+
* @param searchRequest must not be {@literal null}.
448+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
449+
* elastic.co</a>
450+
* @return the {@link Mono} emitting the whole {@link SearchResponse}.
451+
* @since 4.1
452+
*/
453+
Mono<SearchResponse> searchForResponse(HttpHeaders headers, SearchRequest searchRequest);
454+
430455
/**
431456
* Execute the given {@link SearchRequest} against the {@literal search} API.
432457
*

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.springframework.data.elasticsearch.core.document.Document;
6363
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
6464
import org.springframework.data.elasticsearch.core.document.SearchDocument;
65+
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
6566
import org.springframework.data.elasticsearch.core.event.ReactiveAfterConvertCallback;
6667
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
6768
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
@@ -296,7 +297,7 @@ public <T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index)
296297

297298
MultiGetRequest request = requestFactory.multiGetRequest(query, clazz, index);
298299
return Flux.from(execute(client -> client.multiGet(request))) //
299-
.concatMap(result -> callback.doWith(DocumentAdapters.from(result)));
300+
.concatMap(result -> callback.toEntity(DocumentAdapters.from(result)));
300301
}
301302

302303
@Override
@@ -444,7 +445,7 @@ public <T> Mono<T> get(String id, Class<T> entityType, IndexCoordinates index) {
444445

445446
DocumentCallback<T> callback = new ReadDocumentCallback<>(converter, entityType, index);
446447

447-
return doGet(id, index).flatMap(it -> callback.doWith(DocumentAdapters.from(it)));
448+
return doGet(id, index).flatMap(it -> callback.toEntity(DocumentAdapters.from(it)));
448449
}
449450

450451
private Mono<GetResult> doGet(String id, IndexCoordinates index) {
@@ -656,14 +657,34 @@ protected <R extends WriteRequest<R>> R prepareWriteRequest(R request) {
656657
@Override
657658
public <T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> resultType, IndexCoordinates index) {
658659
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
659-
return doFind(query, entityType, index).concatMap(callback::doWith);
660+
return doFind(query, entityType, index).concatMap(callback::toSearchHit);
660661
}
661662

662663
@Override
663664
public <T> Flux<SearchHit<T>> search(Query query, Class<?> entityType, Class<T> returnType) {
664665
return search(query, entityType, returnType, getIndexCoordinatesFor(entityType));
665666
}
666667

668+
@Override
669+
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType) {
670+
return searchForPage(query, entityType, resultType, getIndexCoordinatesFor(entityType));
671+
}
672+
673+
@Override
674+
public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, Class<T> resultType,
675+
IndexCoordinates index) {
676+
677+
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);
678+
679+
return doFindForResponse(query, entityType, index) //
680+
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
681+
.flatMap(callback::toEntity) //
682+
.collectList() //
683+
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
684+
.mapHits(searchDocumentResponse, entities))) //
685+
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
686+
}
687+
667688
private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) {
668689

669690
return Flux.defer(() -> {
@@ -678,6 +699,15 @@ private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinate
678699
});
679700
}
680701

702+
private Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz, IndexCoordinates index) {
703+
704+
return Mono.defer(() -> {
705+
SearchRequest request = requestFactory.searchRequest(query, clazz, index);
706+
request = prepareSearchRequest(request);
707+
return doFindForResponse(request);
708+
});
709+
}
710+
681711
@Override
682712
public Flux<Aggregation> aggregate(Query query, Class<?> entityType) {
683713
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
@@ -748,6 +778,21 @@ protected Flux<SearchDocument> doFind(SearchRequest request) {
748778
.onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
749779
}
750780

781+
/**
782+
* Customization hook on the actual execution result {@link Mono}. <br />
783+
*
784+
* @param request the already prepared {@link SearchRequest} ready to be executed.
785+
* @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}.
786+
*/
787+
protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request) {
788+
789+
if (QUERY_LOGGER.isDebugEnabled()) {
790+
QUERY_LOGGER.debug("Executing doFindForResponse: {}", request);
791+
}
792+
793+
return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(SearchDocumentResponse::from);
794+
}
795+
751796
/**
752797
* Customization hook on the actual execution result {@link Publisher}. <br />
753798
*
@@ -935,7 +980,7 @@ protected <T> Mono<T> maybeCallAfterConvert(T entity, Document document, IndexCo
935980
protected interface DocumentCallback<T> {
936981

937982
@NonNull
938-
Mono<T> doWith(@Nullable Document document);
983+
Mono<T> toEntity(@Nullable Document document);
939984
}
940985

941986
protected class ReadDocumentCallback<T> implements DocumentCallback<T> {
@@ -953,7 +998,7 @@ public ReadDocumentCallback(EntityReader<? super T, Document> reader, Class<T> t
953998
}
954999

9551000
@NonNull
956-
public Mono<T> doWith(@Nullable Document document) {
1001+
public Mono<T> toEntity(@Nullable Document document) {
9571002
if (document == null) {
9581003
return Mono.empty();
9591004
}
@@ -966,7 +1011,10 @@ public Mono<T> doWith(@Nullable Document document) {
9661011
protected interface SearchDocumentCallback<T> {
9671012

9681013
@NonNull
969-
Mono<SearchHit<T>> doWith(@NonNull SearchDocument response);
1014+
Mono<T> toEntity(@NonNull SearchDocument response);
1015+
1016+
@NonNull
1017+
Mono<SearchHit<T>> toSearchHit(@NonNull SearchDocument response);
9701018
}
9711019

9721020
protected class ReadSearchDocumentCallback<T> implements SearchDocumentCallback<T> {
@@ -981,9 +1029,13 @@ public ReadSearchDocumentCallback(Class<T> type, IndexCoordinates index) {
9811029
}
9821030

9831031
@Override
984-
public Mono<SearchHit<T>> doWith(SearchDocument response) {
985-
return delegate.doWith(response)
986-
.map(entity -> SearchHitMapping.mappingFor(type, converter).mapHit(response, entity));
1032+
public Mono<T> toEntity(SearchDocument response) {
1033+
return delegate.toEntity(response);
1034+
}
1035+
1036+
@Override
1037+
public Mono<SearchHit<T>> toSearchHit(SearchDocument response) {
1038+
return toEntity(response).map(entity -> SearchHitMapping.mappingFor(type, converter).mapHit(response, entity));
9871039
}
9881040
}
9891041

0 commit comments

Comments
 (0)