@@ -81,34 +81,42 @@ public PrepareBalancerAndLoadQueues(
8181 @ Override
8282 public DruidCoordinatorRuntimeParams run (DruidCoordinatorRuntimeParams params )
8383 {
84- List <ImmutableDruidServer > currentServers = prepareCurrentServers ();
85- taskMaster .resetPeonsForNewServers (currentServers );
86-
87- final CoordinatorDynamicConfig dynamicConfig = params .getCoordinatorDynamicConfig ();
88- final SegmentLoadingConfig segmentLoadingConfig
89- = SegmentLoadingConfig .create (dynamicConfig , params .getUsedSegmentCount ());
90-
91- final DruidCluster cluster = prepareCluster (dynamicConfig , segmentLoadingConfig , currentServers );
92- cancelLoadsOnDecommissioningServers (cluster );
93-
94- final CoordinatorRunStats stats = params .getCoordinatorStats ();
95- collectHistoricalStats (cluster , stats );
96- collectUsedSegmentStats (params , stats );
97- collectDebugStats (segmentLoadingConfig , stats );
98-
99- final int numBalancerThreads = segmentLoadingConfig .getBalancerComputeThreads ();
100- final BalancerStrategy balancerStrategy = balancerStrategyFactory .createBalancerStrategy (numBalancerThreads );
101- log .debug (
102- "Using balancer strategy[%s] with [%d] threads." ,
103- balancerStrategy .getClass ().getSimpleName (), numBalancerThreads
104- );
105-
106- return params .buildFromExisting ()
107- .withDruidCluster (cluster )
108- .withBalancerStrategy (balancerStrategy )
109- .withSegmentLoadingConfig (segmentLoadingConfig )
110- .withSegmentAssignerUsing (loadQueueManager )
111- .build ();
84+ // Prevent callbacks from firing while balancer/load queue accounting is taking place.
85+ // This ensures snapshot validity at the expense of potentially stale snapshots (in some cases).
86+ taskMaster .getCallbackLock ().writeLock ().lock ();
87+ try {
88+ List <ImmutableDruidServer > currentServers = prepareCurrentServers ();
89+ taskMaster .resetPeonsForNewServers (currentServers );
90+
91+ final CoordinatorDynamicConfig dynamicConfig = params .getCoordinatorDynamicConfig ();
92+ final SegmentLoadingConfig segmentLoadingConfig
93+ = SegmentLoadingConfig .create (dynamicConfig , params .getUsedSegmentCount ());
94+
95+ final DruidCluster cluster = prepareCluster (dynamicConfig , segmentLoadingConfig , currentServers );
96+ cancelLoadsOnDecommissioningServers (cluster );
97+
98+ final CoordinatorRunStats stats = params .getCoordinatorStats ();
99+ collectHistoricalStats (cluster , stats );
100+ collectUsedSegmentStats (params , stats );
101+ collectDebugStats (segmentLoadingConfig , stats );
102+
103+ final int numBalancerThreads = segmentLoadingConfig .getBalancerComputeThreads ();
104+ final BalancerStrategy balancerStrategy = balancerStrategyFactory .createBalancerStrategy (numBalancerThreads );
105+ log .debug (
106+ "Using balancer strategy[%s] with [%d] threads." ,
107+ balancerStrategy .getClass ().getSimpleName (), numBalancerThreads
108+ );
109+
110+ return params .buildFromExisting ()
111+ .withDruidCluster (cluster )
112+ .withBalancerStrategy (balancerStrategy )
113+ .withSegmentLoadingConfig (segmentLoadingConfig )
114+ .withSegmentAssignerUsing (loadQueueManager )
115+ .build ();
116+ }
117+ finally {
118+ taskMaster .getCallbackLock ().writeLock ().unlock ();
119+ }
112120 }
113121
114122 /**
0 commit comments