11/*
2- * Copyright 2018-2020 the original author or authors.
2+ * Copyright 2018-2021 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
2121import reactor .util .function .Tuple2 ;
2222
2323import java .net .InetSocketAddress ;
24+ import java .time .Duration ;
2425import java .util .ArrayList ;
2526import java .util .Collection ;
2627import java .util .Collections ;
3031import java .util .concurrent .ConcurrentHashMap ;
3132import java .util .function .Supplier ;
3233
34+ import org .slf4j .Logger ;
35+ import org .slf4j .LoggerFactory ;
3336import org .springframework .data .elasticsearch .client .ElasticsearchHost ;
3437import org .springframework .data .elasticsearch .client .ElasticsearchHost .State ;
3538import org .springframework .data .elasticsearch .client .NoReachableHostException ;
4245 *
4346 * @author Christoph Strobl
4447 * @author Mark Paluch
48+ * @author Peter-Josef Meisch
4549 * @since 3.2
4650 */
47- class MultiNodeHostProvider implements HostProvider {
51+ class MultiNodeHostProvider implements HostProvider <MultiNodeHostProvider > {
52+
53+ private final static Logger LOG = LoggerFactory .getLogger (MultiNodeHostProvider .class );
4854
4955 private final WebClientProvider clientProvider ;
5056 private final Supplier <HttpHeaders > headersSupplier ;
5157 private final Map <InetSocketAddress , ElasticsearchHost > hosts ;
5258
53- MultiNodeHostProvider (WebClientProvider clientProvider , Supplier <HttpHeaders > headersSupplier , InetSocketAddress ... endpoints ) {
59+ MultiNodeHostProvider (WebClientProvider clientProvider , Supplier <HttpHeaders > headersSupplier ,
60+ InetSocketAddress ... endpoints ) {
5461
5562 this .clientProvider = clientProvider ;
5663 this .headersSupplier = headersSupplier ;
5764 this .hosts = new ConcurrentHashMap <>();
5865 for (InetSocketAddress endpoint : endpoints ) {
5966 this .hosts .put (endpoint , new ElasticsearchHost (endpoint , State .UNKNOWN ));
6067 }
68+
69+ LOG .debug ("initialized with " + hosts );
6170 }
6271
6372 /*
@@ -66,7 +75,7 @@ class MultiNodeHostProvider implements HostProvider {
6675 */
6776 @ Override
6877 public Mono <ClusterInformation > clusterInfo () {
69- return nodes (null ).map (this ::updateNodeState ).buffer (hosts .size ())
78+ return checkNodes (null ).map (this ::updateNodeState ).buffer (hosts .size ())
7079 .then (Mono .just (new ClusterInformation (new LinkedHashSet <>(this .hosts .values ()))));
7180 }
7281
@@ -86,14 +95,19 @@ public WebClient createWebClient(InetSocketAddress endpoint) {
8695 @ Override
8796 public Mono <InetSocketAddress > lookupActiveHost (Verification verification ) {
8897
98+ LOG .trace ("lookupActiveHost " + verification + " from " + hosts ());
99+
89100 if (Verification .LAZY .equals (verification )) {
90101 for (ElasticsearchHost entry : hosts ()) {
91102 if (entry .isOnline ()) {
103+ LOG .trace ("lookupActiveHost returning " + entry );
92104 return Mono .just (entry .getEndpoint ());
93105 }
94106 }
107+ LOG .trace ("no online host found with LAZY" );
95108 }
96109
110+ LOG .trace ("searching for active host" );
97111 return findActiveHostInKnownActives () //
98112 .switchIfEmpty (findActiveHostInUnresolved ()) //
99113 .switchIfEmpty (findActiveHostInDead ()) //
@@ -105,20 +119,30 @@ Collection<ElasticsearchHost> getCachedHostState() {
105119 }
106120
107121 private Mono <InetSocketAddress > findActiveHostInKnownActives () {
108- return findActiveForSate (State .ONLINE );
122+ return findActiveForState (State .ONLINE );
109123 }
110124
111125 private Mono <InetSocketAddress > findActiveHostInUnresolved () {
112- return findActiveForSate (State .UNKNOWN );
126+ return findActiveForState (State .UNKNOWN );
113127 }
114128
115129 private Mono <InetSocketAddress > findActiveHostInDead () {
116- return findActiveForSate (State .OFFLINE );
130+ return findActiveForState (State .OFFLINE );
117131 }
118132
119- private Mono <InetSocketAddress > findActiveForSate (State state ) {
120- return nodes (state ).map (this ::updateNodeState ).filter (ElasticsearchHost ::isOnline )
121- .map (ElasticsearchHost ::getEndpoint ).next ();
133+ private Mono <InetSocketAddress > findActiveForState (State state ) {
134+
135+ LOG .trace ("findActiveForState state " + state + ", current hosts: " + hosts );
136+
137+ return checkNodes (state ) //
138+ .map (this ::updateNodeState ) //
139+ .filter (ElasticsearchHost ::isOnline ) //
140+ .map (elasticsearchHost -> {
141+ LOG .trace ("findActiveForState returning host " + elasticsearchHost );
142+ return elasticsearchHost ;
143+ }).map (ElasticsearchHost ::getEndpoint ) //
144+ .takeLast (1 ) //
145+ .next ();
122146 }
123147
124148 private ElasticsearchHost updateNodeState (Tuple2 <InetSocketAddress , ClientResponse > tuple2 ) {
@@ -129,17 +153,19 @@ private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, ClientRespon
129153 return elasticsearchHost ;
130154 }
131155
132- private Flux <Tuple2 <InetSocketAddress , ClientResponse >> nodes (@ Nullable State state ) {
156+ private Flux <Tuple2 <InetSocketAddress , ClientResponse >> checkNodes (@ Nullable State state ) {
133157
134158 return Flux .fromIterable (hosts ()) //
135159 .filter (entry -> state == null || entry .getState ().equals (state )) //
136160 .map (ElasticsearchHost ::getEndpoint ) //
137- .flatMap (host -> {
161+ .concatMap (host -> {
138162
139163 Mono <ClientResponse > exchange = createWebClient (host ) //
140164 .head ().uri ("/" ) //
141165 .headers (httpHeaders -> httpHeaders .addAll (headersSupplier .get ())) //
142- .exchange ().doOnError (throwable -> {
166+ .exchange () //
167+ .timeout (Duration .ofSeconds (1 )) //
168+ .doOnError (throwable -> {
143169 hosts .put (host , new ElasticsearchHost (host , State .OFFLINE ));
144170 clientProvider .getErrorListener ().accept (throwable );
145171 });
0 commit comments