39
39
import org .elasticsearch .tasks .Task ;
40
40
import org .elasticsearch .threadpool .ThreadPool ;
41
41
import org .elasticsearch .transport .BytesTransportRequest ;
42
+ import org .elasticsearch .transport .NodeNotConnectedException ;
43
+ import org .elasticsearch .transport .Transport ;
42
44
import org .elasticsearch .transport .TransportException ;
43
45
import org .elasticsearch .transport .TransportRequestOptions ;
44
46
import org .elasticsearch .transport .TransportService ;
@@ -225,8 +227,7 @@ public PublicationContext newPublicationContext(ClusterStatePublicationEvent clu
225
227
}
226
228
}
227
229
228
- private ReleasableBytesReference serializeFullClusterState (ClusterState clusterState , DiscoveryNode node ) {
229
- final TransportVersion serializeVersion = node .getVersion ().transportVersion ;
230
+ private ReleasableBytesReference serializeFullClusterState (ClusterState clusterState , DiscoveryNode node , TransportVersion version ) {
230
231
final RecyclerBytesStreamOutput bytesStream = transportService .newNetworkBytesStream ();
231
232
boolean success = false ;
232
233
try {
@@ -236,7 +237,7 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
236
237
CompressorFactory .COMPRESSOR .threadLocalOutputStream (Streams .flushOnCloseStream (bytesStream ))
237
238
)
238
239
) {
239
- stream .setTransportVersion (serializeVersion );
240
+ stream .setTransportVersion (version );
240
241
stream .writeBoolean (true );
241
242
clusterState .writeTo (stream );
242
243
uncompressedBytes = stream .position ();
@@ -248,7 +249,7 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
248
249
logger .trace (
249
250
"serialized full cluster state version [{}] using transport version [{}] with size [{}]" ,
250
251
clusterState .version (),
251
- serializeVersion ,
252
+ version ,
252
253
result .length ()
253
254
);
254
255
success = true ;
@@ -260,9 +261,13 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
260
261
}
261
262
}
262
263
263
- private ReleasableBytesReference serializeDiffClusterState (ClusterState newState , Diff <ClusterState > diff , DiscoveryNode node ) {
264
+ private ReleasableBytesReference serializeDiffClusterState (
265
+ ClusterState newState ,
266
+ Diff <ClusterState > diff ,
267
+ DiscoveryNode node ,
268
+ TransportVersion version
269
+ ) {
264
270
final long clusterStateVersion = newState .version ();
265
- final TransportVersion serializeVersion = node .getVersion ().transportVersion ;
266
271
final RecyclerBytesStreamOutput bytesStream = transportService .newNetworkBytesStream ();
267
272
boolean success = false ;
268
273
try {
@@ -272,10 +277,10 @@ private ReleasableBytesReference serializeDiffClusterState(ClusterState newState
272
277
CompressorFactory .COMPRESSOR .threadLocalOutputStream (Streams .flushOnCloseStream (bytesStream ))
273
278
)
274
279
) {
275
- stream .setTransportVersion (serializeVersion );
280
+ stream .setTransportVersion (version );
276
281
stream .writeBoolean (false );
277
282
diff .writeTo (stream );
278
- if (serializeVersion .onOrAfter (INCLUDES_LAST_COMMITTED_DATA_VERSION )) {
283
+ if (version .onOrAfter (INCLUDES_LAST_COMMITTED_DATA_VERSION )) {
279
284
stream .writeBoolean (newState .metadata ().clusterUUIDCommitted ());
280
285
newState .getLastCommittedConfiguration ().writeTo (stream );
281
286
}
@@ -288,7 +293,7 @@ private ReleasableBytesReference serializeDiffClusterState(ClusterState newState
288
293
logger .trace (
289
294
"serialized cluster state diff for version [{}] using transport version [{}] with size [{}]" ,
290
295
clusterStateVersion ,
291
- serializeVersion ,
296
+ version ,
292
297
result .length ()
293
298
);
294
299
success = true ;
@@ -315,6 +320,7 @@ public class PublicationContext extends AbstractRefCounted {
315
320
private final Task task ;
316
321
private final boolean sendFullVersion ;
317
322
323
+ private final Map <DiscoveryNode , Transport .Connection > nodeConnections = new HashMap <>();
318
324
// All the values of these maps have one ref for the context (while it's open) and one for each in-flight message.
319
325
private final Map <TransportVersion , ReleasableBytesReference > serializedStates = new ConcurrentHashMap <>();
320
326
private final Map <TransportVersion , ReleasableBytesReference > serializedDiffs = new HashMap <>();
@@ -337,12 +343,23 @@ void buildDiffAndSerializeStates() {
337
343
// publication to local node bypasses any serialization
338
344
continue ;
339
345
}
346
+
347
+ Transport .Connection connection ;
348
+ try {
349
+ connection = transportService .getConnection (node );
350
+ } catch (NodeNotConnectedException e ) {
351
+ // can't send to this node, don't need to serialize anything for it
352
+ logger .debug (() -> format ("No connection to [%s] available, skipping serialization" , node ), e );
353
+ continue ;
354
+ }
355
+
356
+ nodeConnections .put (node , connection );
340
357
if (sendFullVersion || previousState .nodes ().nodeExists (node ) == false ) {
341
- serializedStates .computeIfAbsent (node . getVersion (). transportVersion , v -> serializeFullClusterState (newState , node ));
358
+ serializedStates .computeIfAbsent (connection . getTransportVersion () , v -> serializeFullClusterState (newState , node , v ));
342
359
} else {
343
360
serializedDiffs .computeIfAbsent (
344
- node . getVersion (). transportVersion ,
345
- v -> serializeDiffClusterState (newState , diffSupplier .getOrCompute (), node )
361
+ connection . getTransportVersion () ,
362
+ v -> serializeDiffClusterState (newState , diffSupplier .getOrCompute (), node , v )
346
363
);
347
364
}
348
365
}
@@ -405,34 +422,46 @@ public String toString() {
405
422
406
423
private void sendFullClusterState (DiscoveryNode destination , ActionListener <PublishWithJoinResponse > listener ) {
407
424
assert refCount () > 0 ;
408
- ReleasableBytesReference bytes = serializedStates .get (destination .getVersion ().transportVersion );
425
+ Transport .Connection connection = nodeConnections .get (destination );
426
+ if (connection == null ) {
427
+ logger .debug ("No connection to [{}] available, skipping send" , destination );
428
+ listener .onFailure (new NodeNotConnectedException (destination , "No connection available" ));
429
+ return ;
430
+ }
431
+
432
+ var version = connection .getTransportVersion ();
433
+ ReleasableBytesReference bytes = serializedStates .get (version );
409
434
if (bytes == null ) {
410
435
try {
411
- bytes = serializedStates .computeIfAbsent (
412
- destination .getVersion ().transportVersion ,
413
- v -> serializeFullClusterState (newState , destination )
414
- );
436
+ bytes = serializedStates .computeIfAbsent (version , v -> serializeFullClusterState (newState , destination , v ));
415
437
} catch (Exception e ) {
416
438
logger .warn (() -> format ("failed to serialize cluster state before publishing it to node %s" , destination ), e );
417
439
listener .onFailure (e );
418
440
return ;
419
441
}
420
442
}
421
- sendClusterState (destination , bytes , listener );
443
+ sendClusterState (connection , bytes , listener );
422
444
}
423
445
424
446
private void sendClusterStateDiff (DiscoveryNode destination , ActionListener <PublishWithJoinResponse > listener ) {
425
- final ReleasableBytesReference bytes = serializedDiffs .get (destination .getVersion ().transportVersion );
447
+ Transport .Connection connection = nodeConnections .get (destination );
448
+ if (connection == null ) {
449
+ logger .debug ("No connection to [{}] available, skipping send" , destination );
450
+ listener .onFailure (new NodeNotConnectedException (destination , "No connection available" ));
451
+ return ;
452
+ }
453
+
454
+ final ReleasableBytesReference bytes = serializedDiffs .get (connection .getTransportVersion ());
426
455
assert bytes != null
427
- : "failed to find serialized diff for node " + destination + " of version [" + destination . getVersion () + "]" ;
456
+ : "failed to find serialized diff for node " + destination + " of version [" + connection . getTransportVersion () + "]" ;
428
457
429
458
// acquire a ref to the context just in case we need to try again with the full cluster state
430
459
if (tryIncRef () == false ) {
431
460
assert false ;
432
461
listener .onFailure (new IllegalStateException ("publication context released before transmission" ));
433
462
return ;
434
463
}
435
- sendClusterState (destination , bytes , ActionListener .runAfter (listener .delegateResponse ((delegate , e ) -> {
464
+ sendClusterState (connection , bytes , ActionListener .runAfter (listener .delegateResponse ((delegate , e ) -> {
436
465
if (e instanceof final TransportException transportException ) {
437
466
if (transportException .unwrapCause () instanceof IncompatibleClusterStateVersionException ) {
438
467
logger .debug (
@@ -453,7 +482,7 @@ private void sendClusterStateDiff(DiscoveryNode destination, ActionListener<Publ
453
482
}
454
483
455
484
private void sendClusterState (
456
- DiscoveryNode destination ,
485
+ Transport . Connection connection ,
457
486
ReleasableBytesReference bytes ,
458
487
ActionListener <PublishWithJoinResponse > listener
459
488
) {
@@ -464,9 +493,9 @@ private void sendClusterState(
464
493
return ;
465
494
}
466
495
transportService .sendChildRequest (
467
- destination ,
496
+ connection ,
468
497
PUBLISH_STATE_ACTION_NAME ,
469
- new BytesTransportRequest (bytes , destination . getVersion (). transportVersion ),
498
+ new BytesTransportRequest (bytes , connection . getTransportVersion () ),
470
499
task ,
471
500
STATE_REQUEST_OPTIONS ,
472
501
new CleanableResponseHandler <>(listener , PublishWithJoinResponse ::new , ThreadPool .Names .CLUSTER_COORDINATION , bytes ::decRef )
0 commit comments