2222import io .netty .handler .ssl .JdkSslContext ;
2323import io .netty .handler .timeout .ReadTimeoutHandler ;
2424import io .netty .handler .timeout .WriteTimeoutHandler ;
25- import reactor .core .Exceptions ;
2625import reactor .core .publisher .Flux ;
27- import reactor .core .publisher .FluxProcessor ;
2826import reactor .core .publisher .Mono ;
2927import reactor .core .publisher .Sinks ;
3028import reactor .netty .http .client .HttpClient ;
107105import org .elasticsearch .search .aggregations .Aggregation ;
108106import org .elasticsearch .search .suggest .Suggest ;
109107import org .reactivestreams .Publisher ;
108+ import org .reactivestreams .Subscriber ;
109+ import org .reactivestreams .Subscription ;
110+
110111import org .springframework .data .elasticsearch .client .ClientConfiguration ;
111112import org .springframework .data .elasticsearch .client .ClientLogger ;
112113import org .springframework .data .elasticsearch .client .ElasticsearchHost ;
@@ -467,9 +468,7 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
467468 }
468469
469470 Sinks .Many <ActionRequest > requests = Sinks .many ().unicast ().onBackpressureBuffer ();
470-
471- FluxProcessor <SearchResponse , SearchResponse > inbound = FluxProcessor
472- .fromSink (Sinks .many ().unicast ().onBackpressureBuffer ());
471+ Sinks .Many <SearchResponse > inbound = Sinks .many ().unicast ().onBackpressureBuffer ();
473472
474473 Flux <SearchResponse > exchange = requests .asFlux ().flatMap (it -> {
475474
@@ -490,12 +489,12 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
490489
491490 scrollState -> {
492491
493- Flux <SearchHit > searchHits = inbound .<SearchResponse > handle ((searchResponse , sink ) -> {
492+ Flux <SearchHit > searchHits = inbound .asFlux (). <SearchResponse > handle ((searchResponse , sink ) -> {
494493
495494 scrollState .updateScrollId (searchResponse .getScrollId ());
496495 if (isEmpty (searchResponse .getHits ())) {
497496
498- inbound .onComplete ();
497+ inbound .tryEmitComplete ();
499498 requests .tryEmitComplete ();
500499
501500 } else {
@@ -504,30 +503,22 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
504503
505504 SearchScrollRequest searchScrollRequest = new SearchScrollRequest (scrollState .getScrollId ())
506505 .scroll (scrollTimeout );
507- tryEmitNext ( requests , searchScrollRequest );
506+ requests . emitNext ( searchScrollRequest , Sinks . EmitFailureHandler . FAIL_FAST );
508507 }
509508
510509 }).map (SearchResponse ::getHits ) //
511510 .flatMap (Flux ::fromIterable );
512511
513512 return searchHits .doOnSubscribe (ignore -> {
514- exchange .subscribe (inbound );
515- tryEmitNext ( requests , searchRequest );
513+ exchange .subscribe (new SinkSubscriber ( inbound ) );
514+ requests . emitNext ( searchRequest , Sinks . EmitFailureHandler . FAIL_FAST );
516515 });
517516
518517 }, state -> cleanupScroll (headers , state ), //
519518 (state , ex ) -> cleanupScroll (headers , state ), //
520519 state -> cleanupScroll (headers , state )); //
521520 }
522521
523- private void tryEmitNext (Sinks .Many <ActionRequest > sink , ActionRequest request ) {
524-
525- Sinks .Emission emission = sink .tryEmitNext (request );
526-
527- if (emission == Sinks .Emission .FAIL_OVERFLOW ) {
528- sink .tryEmitError (Exceptions .failWithOverflow ("Backpressure overflow during Sinks.Many#emitNext" ));
529- }
530- }
531522
532523 private static boolean isEmpty (@ Nullable SearchHits hits ) {
533524 return hits != null && hits .getHits () != null && hits .getHits ().length == 0 ;
@@ -964,5 +955,34 @@ public Collection<ElasticsearchHost> hosts() {
964955 }
965956 }
966957
958+ private static class SinkSubscriber implements Subscriber <SearchResponse > {
959+
960+ private final Sinks .Many <SearchResponse > inbound ;
961+
962+ public SinkSubscriber (Sinks .Many <SearchResponse > inbound ) {
963+ this .inbound = inbound ;
964+ }
965+
966+ @ Override
967+ public void onSubscribe (Subscription s ) {
968+ s .request (Long .MAX_VALUE );
969+ }
970+
971+ @ Override
972+ public void onNext (SearchResponse searchResponse ) {
973+ inbound .emitNext (searchResponse , Sinks .EmitFailureHandler .FAIL_FAST );
974+ }
975+
976+ @ Override
977+ public void onError (Throwable t ) {
978+ inbound .emitError (t , Sinks .EmitFailureHandler .FAIL_FAST );
979+ }
980+
981+ @ Override
982+ public void onComplete () {
983+ inbound .emitComplete (Sinks .EmitFailureHandler .FAIL_FAST );
984+ }
985+ }
986+
967987 // endregion
968988}
0 commit comments