22
22
import org .elasticsearch .common .settings .Setting ;
23
23
import org .elasticsearch .common .settings .Settings ;
24
24
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
25
+ import org .elasticsearch .common .util .concurrent .ListenableFuture ;
25
26
import org .elasticsearch .core .Releasable ;
26
27
import org .elasticsearch .core .Releasables ;
27
28
import org .elasticsearch .core .TimeValue ;
@@ -219,12 +220,19 @@ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletio
219
220
});
220
221
}
221
222
223
+ // placeholder listener for a fire-and-forget connection attempt
224
+ private static final ActionListener <Void > NOOP = ActionListener .wrap (r -> {}, e -> {});
225
+
222
226
private class ConnectionTarget {
223
227
private final DiscoveryNode discoveryNode ;
224
228
225
229
private final AtomicInteger consecutiveFailureCount = new AtomicInteger ();
226
230
private final AtomicReference <Releasable > connectionRef = new AtomicReference <>();
227
231
232
+ // all access to these fields is synchronized
233
+ private ActionListener <Void > pendingListener ;
234
+ private boolean connectionInProgress ;
235
+
228
236
ConnectionTarget (DiscoveryNode discoveryNode ) {
229
237
this .discoveryNode = discoveryNode ;
230
238
}
@@ -235,57 +243,101 @@ private void setConnectionRef(Releasable connectionReleasable) {
235
243
236
244
Runnable connect (ActionListener <Void > listener ) {
237
245
return () -> {
238
- final boolean alreadyConnected = transportService .nodeConnected (discoveryNode );
246
+ registerListener (listener );
247
+ doConnect ();
248
+ };
249
+ }
239
250
240
- if (alreadyConnected ) {
241
- logger .trace ("refreshing connection to {}" , discoveryNode );
242
- } else {
243
- logger .debug ("connecting to {}" , discoveryNode );
251
+ private synchronized void registerListener (ActionListener <Void > listener ) {
252
+ if (listener == null ) {
253
+ pendingListener = pendingListener == null ? NOOP : pendingListener ;
254
+ } else if (pendingListener == null || pendingListener == NOOP ) {
255
+ pendingListener = listener ;
256
+ } else if (pendingListener instanceof ListenableFuture <?>) {
257
+ ((ListenableFuture <Void >) pendingListener ).addListener (listener );
258
+ } else {
259
+ ListenableFuture <Void > wrapper = new ListenableFuture <Void >();
260
+ wrapper .addListener (pendingListener );
261
+ wrapper .addListener (listener );
262
+ pendingListener = wrapper ;
263
+ }
264
+ }
265
+
266
+ private synchronized ActionListener <Void > acquireListener () {
267
+ // Avoid concurrent connection attempts because they don't necessarily complete in order otherwise, and out-of-order completion
268
+ // might mean we end up disconnected from a node even though we triggered a call to connect() after all close() calls had
269
+ // finished.
270
+ if (connectionInProgress == false ) {
271
+ ActionListener <Void > listener = pendingListener ;
272
+ if (listener != null ) {
273
+ pendingListener = null ;
274
+ connectionInProgress = true ;
275
+ return listener ;
244
276
}
277
+ }
278
+ return null ;
279
+ }
280
+
281
+ private synchronized void releaseListener () {
282
+ assert connectionInProgress ;
283
+ connectionInProgress = false ;
284
+ }
285
+
286
+ private void doConnect () {
287
+ ActionListener <Void > listener = acquireListener ();
288
+ if (listener == null ) {
289
+ return ;
290
+ }
291
+
292
+ final boolean alreadyConnected = transportService .nodeConnected (discoveryNode );
293
+
294
+ if (alreadyConnected ) {
295
+ logger .trace ("refreshing connection to {}" , discoveryNode );
296
+ } else {
297
+ logger .debug ("connecting to {}" , discoveryNode );
298
+ }
245
299
246
- // It's possible that connectionRef is a reference to an older connection that closed out from under us, but that something
247
- // else has opened a fresh connection to the node. Therefore we always call connectToNode() and update connectionRef.
248
- transportService .connectToNode (discoveryNode , new ActionListener <Releasable >() {
249
- @ Override
250
- public void onResponse (Releasable connectionReleasable ) {
251
- if (alreadyConnected ) {
252
- logger .trace ("refreshed connection to {}" , discoveryNode );
253
- } else {
254
- logger .debug ("connected to {}" , discoveryNode );
255
- }
256
- consecutiveFailureCount .set (0 );
257
- setConnectionRef (connectionReleasable );
258
-
259
- final boolean isActive ;
260
- synchronized (mutex ) {
261
- isActive = targetsByNode .get (discoveryNode ) == ConnectionTarget .this ;
262
- }
263
- if (isActive == false ) {
264
- logger .debug ("connected to stale {} - releasing stale connection" , discoveryNode );
265
- setConnectionRef (null );
266
- }
267
- if (listener != null ) {
268
- listener .onResponse (null );
269
- }
300
+ // It's possible that connectionRef is a reference to an older connection that closed out from under us, but that something else
301
+ // has opened a fresh connection to the node. Therefore we always call connectToNode() and update connectionRef.
302
+ transportService .connectToNode (discoveryNode , ActionListener .runAfter (new ActionListener <Releasable >() {
303
+ @ Override
304
+ public void onResponse (Releasable connectionReleasable ) {
305
+ if (alreadyConnected ) {
306
+ logger .trace ("refreshed connection to {}" , discoveryNode );
307
+ } else {
308
+ logger .debug ("connected to {}" , discoveryNode );
270
309
}
310
+ consecutiveFailureCount .set (0 );
311
+ setConnectionRef (connectionReleasable );
271
312
272
- @ Override
273
- public void onFailure (Exception e ) {
274
- final int currentFailureCount = consecutiveFailureCount .incrementAndGet ();
275
- // only warn every 6th failure
276
- final Level level = currentFailureCount % 6 == 1 ? Level .WARN : Level .DEBUG ;
277
- logger .log (
278
- level ,
279
- new ParameterizedMessage ("failed to connect to {} (tried [{}] times)" , discoveryNode , currentFailureCount ),
280
- e
281
- );
313
+ final boolean isActive ;
314
+ synchronized (mutex ) {
315
+ isActive = targetsByNode .get (discoveryNode ) == ConnectionTarget .this ;
316
+ }
317
+ if (isActive == false ) {
318
+ logger .debug ("connected to stale {} - releasing stale connection" , discoveryNode );
282
319
setConnectionRef (null );
283
- if (listener != null ) {
284
- listener .onFailure (e );
285
- }
286
320
}
287
- });
288
- };
321
+ listener .onResponse (null );
322
+ }
323
+
324
+ @ Override
325
+ public void onFailure (Exception e ) {
326
+ final int currentFailureCount = consecutiveFailureCount .incrementAndGet ();
327
+ // only warn every 6th failure
328
+ final Level level = currentFailureCount % 6 == 1 ? Level .WARN : Level .DEBUG ;
329
+ logger .log (
330
+ level ,
331
+ new ParameterizedMessage ("failed to connect to {} (tried [{}] times)" , discoveryNode , currentFailureCount ),
332
+ e
333
+ );
334
+ setConnectionRef (null );
335
+ listener .onFailure (e );
336
+ }
337
+ }, () -> {
338
+ releaseListener ();
339
+ transportService .getThreadPool ().generic ().execute (this ::doConnect );
340
+ }));
289
341
}
290
342
291
343
void disconnect () {
0 commit comments