@@ -427,14 +427,11 @@ private void removeIndicesAndShards(final ClusterChangedEvent event) {
427
427
* @param state new cluster state
428
428
*/
429
429
private void createIndicesAndUpdateShards (final ClusterState state ) {
430
- DiscoveryNodes nodes = state .nodes ();
431
- RoutingNode localRoutingNode = state .getRoutingNodes ().node (nodes .getLocalNodeId ());
430
+ RoutingNode localRoutingNode = state .getRoutingNodes ().node (state .nodes ().getLocalNodeId ());
432
431
if (localRoutingNode == null ) {
433
432
return ;
434
433
}
435
434
436
- RoutingTable routingTable = state .routingTable ();
437
-
438
435
// create map of indices to create with shards to fail if index creation fails or create or update shards if an existing index
439
436
// service is found
440
437
final Map <Index , List <ShardRouting >> indicesToCreate = new HashMap <>();
@@ -455,7 +452,7 @@ private void createIndicesAndUpdateShards(final ClusterState state) {
455
452
if (indexService == null ) {
456
453
indicesToCreate .computeIfAbsent (index , k -> new ArrayList <>()).add (shardRouting );
457
454
} else {
458
- createOrUpdateShard (state , nodes , routingTable , shardRouting , indexService );
455
+ createOrUpdateShard (state , shardRouting , indexService );
459
456
}
460
457
}
461
458
}
@@ -485,24 +482,18 @@ private void createIndicesAndUpdateShards(final ClusterState state) {
485
482
}
486
483
// we succeeded in creating the index service, so now we can create the missing shards assigned to this node
487
484
for (ShardRouting shardRouting : entry .getValue ()) {
488
- createOrUpdateShard (state , nodes , routingTable , shardRouting , indexService );
485
+ createOrUpdateShard (state , shardRouting , indexService );
489
486
}
490
487
}
491
488
}
492
489
493
- private void createOrUpdateShard (
494
- ClusterState state ,
495
- DiscoveryNodes nodes ,
496
- RoutingTable routingTable ,
497
- ShardRouting shardRouting ,
498
- AllocatedIndex <? extends Shard > indexService
499
- ) {
490
+ private void createOrUpdateShard (ClusterState state , ShardRouting shardRouting , AllocatedIndex <? extends Shard > indexService ) {
500
491
Shard shard = indexService .getShardOrNull (shardRouting .shardId ().id ());
501
492
if (shard == null ) {
502
493
assert shardRouting .initializing () : shardRouting + " should have been removed by failMissingShards" ;
503
- createShard (nodes , routingTable , shardRouting , state );
494
+ createShard (shardRouting , state );
504
495
} else {
505
- updateShard (nodes , shardRouting , shard , routingTable , state );
496
+ updateShard (shardRouting , shard , state );
506
497
}
507
498
}
508
499
@@ -546,19 +537,21 @@ private void updateIndices(ClusterChangedEvent event) {
546
537
}
547
538
}
548
539
549
- private void createShard (DiscoveryNodes nodes , RoutingTable routingTable , ShardRouting shardRouting , ClusterState state ) {
540
+ private void createShard (ShardRouting shardRouting , ClusterState state ) {
550
541
assert shardRouting .initializing () : "only allow shard creation for initializing shard but was " + shardRouting ;
551
542
552
- DiscoveryNode sourceNode = null ;
553
- if (shardRouting .recoverySource ().getType () == Type .PEER ) {
554
- sourceNode = findSourceNodeForPeerRecovery (routingTable , nodes , shardRouting );
555
- if (sourceNode == null ) {
556
- logger .trace ("ignoring initializing shard {} - no source node can be found." , shardRouting .shardId ());
557
- return ;
543
+ try {
544
+ final DiscoveryNode sourceNode ;
545
+ if (shardRouting .recoverySource ().getType () == Type .PEER ) {
546
+ sourceNode = findSourceNodeForPeerRecovery (state .routingTable (), state .nodes (), shardRouting );
547
+ if (sourceNode == null ) {
548
+ logger .trace ("ignoring initializing shard {} - no source node can be found." , shardRouting .shardId ());
549
+ return ;
550
+ }
551
+ } else {
552
+ sourceNode = null ;
558
553
}
559
- }
560
554
561
- try {
562
555
final long primaryTerm = state .metadata ().index (shardRouting .index ()).primaryTerm (shardRouting .id ());
563
556
logger .debug ("{} creating shard with primary term [{}]" , shardRouting .shardId (), primaryTerm );
564
557
indicesService .createShard (
@@ -569,21 +562,15 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
569
562
failedShardHandler ,
570
563
this ::updateGlobalCheckpointForShard ,
571
564
retentionLeaseSyncer ,
572
- nodes .getLocalNode (),
565
+ state . nodes () .getLocalNode (),
573
566
sourceNode
574
567
);
575
568
} catch (Exception e ) {
576
569
failAndRemoveShard (shardRouting , true , "failed to create shard" , e , state );
577
570
}
578
571
}
579
572
580
- private void updateShard (
581
- DiscoveryNodes nodes ,
582
- ShardRouting shardRouting ,
583
- Shard shard ,
584
- RoutingTable routingTable ,
585
- ClusterState clusterState
586
- ) {
573
+ private void updateShard (ShardRouting shardRouting , Shard shard , ClusterState clusterState ) {
587
574
final ShardRouting currentRoutingEntry = shard .routingEntry ();
588
575
assert currentRoutingEntry .isSameAllocation (shardRouting )
589
576
: "local shard has a different allocation id but wasn't cleaned by removeShards. "
@@ -597,7 +584,7 @@ private void updateShard(
597
584
final IndexMetadata indexMetadata = clusterState .metadata ().index (shard .shardId ().getIndex ());
598
585
primaryTerm = indexMetadata .primaryTerm (shard .shardId ().id ());
599
586
final Set <String > inSyncIds = indexMetadata .inSyncAllocationIds (shard .shardId ().id ());
600
- final IndexShardRoutingTable indexShardRoutingTable = routingTable .shardRoutingTable (shardRouting .shardId ());
587
+ final IndexShardRoutingTable indexShardRoutingTable = clusterState . routingTable () .shardRoutingTable (shardRouting .shardId ());
601
588
shard .updateShardState (
602
589
shardRouting ,
603
590
primaryTerm ,
@@ -621,15 +608,15 @@ private void updateShard(
621
608
"{} master marked shard as initializing, but shard has state [{}], resending shard started to {}" ,
622
609
shardRouting .shardId (),
623
610
state ,
624
- nodes .getMasterNode ()
611
+ clusterState . nodes () .getMasterNode ()
625
612
);
626
613
}
627
- if (nodes .getMasterNode () != null ) {
614
+ if (clusterState . nodes () .getMasterNode () != null ) {
628
615
shardStateAction .shardStarted (
629
616
shardRouting ,
630
617
primaryTerm ,
631
618
"master "
632
- + nodes .getMasterNode ()
619
+ + clusterState . nodes () .getMasterNode ()
633
620
+ " marked shard as initializing, but shard state is ["
634
621
+ state
635
622
+ "], mark shard as started" ,
0 commit comments