2020import reactor .util .function .Tuple2 ;
2121
2222import java .net .InetSocketAddress ;
23+ import java .time .Duration ;
2324import java .util .ArrayList ;
2425import java .util .Collection ;
2526import java .util .Collections ;
2930import java .util .concurrent .ConcurrentHashMap ;
3031import java .util .function .Supplier ;
3132
33+ import org .slf4j .Logger ;
34+ import org .slf4j .LoggerFactory ;
3235import org .springframework .data .elasticsearch .client .ElasticsearchHost ;
3336import org .springframework .data .elasticsearch .client .ElasticsearchHost .State ;
3437import org .springframework .data .elasticsearch .client .NoReachableHostException ;
4750 */
4851class MultiNodeHostProvider implements HostProvider <MultiNodeHostProvider > {
4952
53+ private final static Logger LOG = LoggerFactory .getLogger (MultiNodeHostProvider .class );
54+
5055 private final WebClientProvider clientProvider ;
5156 private final Supplier <HttpHeaders > headersSupplier ;
5257 private final Map <InetSocketAddress , ElasticsearchHost > hosts ;
@@ -60,6 +65,8 @@ class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
6065 for (InetSocketAddress endpoint : endpoints ) {
6166 this .hosts .put (endpoint , new ElasticsearchHost (endpoint , State .UNKNOWN ));
6267 }
68+
69+ LOG .debug ("initialized with " + hosts );
6370 }
6471
6572 /*
@@ -68,7 +75,7 @@ class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
6875 */
6976 @ Override
7077 public Mono <ClusterInformation > clusterInfo () {
71- return nodes (null ).map (this ::updateNodeState ).buffer (hosts .size ())
78+ return checkNodes (null ).map (this ::updateNodeState ).buffer (hosts .size ())
7279 .then (Mono .just (new ClusterInformation (new LinkedHashSet <>(this .hosts .values ()))));
7380 }
7481
@@ -88,14 +95,19 @@ public WebClient createWebClient(InetSocketAddress endpoint) {
8895 @ Override
8996 public Mono <InetSocketAddress > lookupActiveHost (Verification verification ) {
9097
98+ LOG .trace ("lookupActiveHost " + verification + " from " + hosts ());
99+
91100 if (Verification .LAZY .equals (verification )) {
92101 for (ElasticsearchHost entry : hosts ()) {
93102 if (entry .isOnline ()) {
103+ LOG .trace ("lookupActiveHost returning " + entry );
94104 return Mono .just (entry .getEndpoint ());
95105 }
96106 }
107+ LOG .trace ("no online host found with LAZY" );
97108 }
98109
110+ LOG .trace ("searching for active host" );
99111 return findActiveHostInKnownActives () //
100112 .switchIfEmpty (findActiveHostInUnresolved ()) //
101113 .switchIfEmpty (findActiveHostInDead ()) //
@@ -107,20 +119,30 @@ Collection<ElasticsearchHost> getCachedHostState() {
107119 }
108120
109121 private Mono <InetSocketAddress > findActiveHostInKnownActives () {
110- return findActiveForSate (State .ONLINE );
122+ return findActiveForState (State .ONLINE );
111123 }
112124
113125 private Mono <InetSocketAddress > findActiveHostInUnresolved () {
114- return findActiveForSate (State .UNKNOWN );
126+ return findActiveForState (State .UNKNOWN );
115127 }
116128
117129 private Mono <InetSocketAddress > findActiveHostInDead () {
118- return findActiveForSate (State .OFFLINE );
130+ return findActiveForState (State .OFFLINE );
119131 }
120132
121- private Mono <InetSocketAddress > findActiveForSate (State state ) {
122- return nodes (state ).map (this ::updateNodeState ).filter (ElasticsearchHost ::isOnline )
123- .map (ElasticsearchHost ::getEndpoint ).next ();
133+ private Mono <InetSocketAddress > findActiveForState (State state ) {
134+
135+ LOG .trace ("findActiveForState state " + state + ", current hosts: " + hosts );
136+
137+ return checkNodes (state ) //
138+ .map (this ::updateNodeState ) //
139+ .filter (ElasticsearchHost ::isOnline ) //
140+ .map (elasticsearchHost -> {
141+ LOG .trace ("findActiveForState returning host " + elasticsearchHost );
142+ return elasticsearchHost ;
143+ }).map (ElasticsearchHost ::getEndpoint ) //
144+ .takeLast (1 ) //
145+ .next ();
124146 }
125147
126148 private ElasticsearchHost updateNodeState (Tuple2 <InetSocketAddress , State > tuple2 ) {
@@ -131,28 +153,36 @@ private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple
131153 return elasticsearchHost ;
132154 }
133155
134- private Flux <Tuple2 <InetSocketAddress , State >> nodes (@ Nullable State state ) {
156+ private Flux <Tuple2 <InetSocketAddress , State >> checkNodes (@ Nullable State state ) {
157+
158+ LOG .trace ("checkNodes() with state " + state );
135159
136160 return Flux .fromIterable (hosts ()) //
137161 .filter (entry -> state == null || entry .getState ().equals (state )) //
138162 .map (ElasticsearchHost ::getEndpoint ) //
139- .flatMap (host -> {
163+ .concatMap (host -> {
164+
165+ LOG .trace ("checking host " + host );
140166
141167 Mono <ClientResponse > clientResponseMono = createWebClient (host ) //
142168 .head ().uri ("/" ) //
143169 .headers (httpHeaders -> httpHeaders .addAll (headersSupplier .get ())) //
144170 .exchangeToMono (Mono ::just ) //
171+ .timeout (Duration .ofSeconds (1 )) //
145172 .doOnError (throwable -> {
173+ LOG .trace ("error checking host " + host + ", " + throwable .getMessage ());
146174 hosts .put (host , new ElasticsearchHost (host , State .OFFLINE ));
147175 clientProvider .getErrorListener ().accept (throwable );
148176 });
149177
150178 return Mono .just (host ) //
151- .zipWith ( //
152- clientResponseMono .flatMap (it -> it .releaseBody () //
153- .thenReturn (it .statusCode ().isError () ? State .OFFLINE : State .ONLINE )));
179+ .zipWith (clientResponseMono .flatMap (it -> it .releaseBody () //
180+ .thenReturn (it .statusCode ().isError () ? State .OFFLINE : State .ONLINE )));
154181 }) //
155- .onErrorContinue ((throwable , o ) -> clientProvider .getErrorListener ().accept (throwable ));
182+ .map (tuple -> {
183+ LOG .trace ("check result " + tuple );
184+ return tuple ;
185+ }).onErrorContinue ((throwable , o ) -> clientProvider .getErrorListener ().accept (throwable ));
156186 }
157187
158188 private List <ElasticsearchHost > hosts () {
0 commit comments