@@ -122,13 +122,9 @@ protected void beforeMerge(OnGoingMerge merge) {}
122122 */
123123 protected void afterMerge (OnGoingMerge merge ) {}
124124
125- protected void activateThrottling (int numActiveMerges ) {}
125+ protected void activateThrottling (int numRunningMerges , int numQueuedMerges , int configuredMaxMergeCount ) {}
126126
127- protected void deactivateThrottling (int numActiveMerges ) {}
128-
129- public int getMaxMergeCount () {
130- return config .getMaxMergeCount ();
131- }
127+ protected void deactivateThrottling (int numRunningMerges , int numQueuedMerges , int configuredMaxMergeCount ) {}
132128
133129 @ Override
134130 public MergeScheduler clone () {
@@ -178,18 +174,22 @@ private void maybeExecuteNextMerge() {
178174 }
179175
180176 private void maybeActivateThrottling () {
177+ int numRunningMerges = activeMergeTasksExecutingOnLocalSchedulerList .size ();
178+ int numQueuedMerges = activeMergeTasksLocalSchedulerQueue .size ();
179+ int configuredMaxMergeCount = config .getMaxMergeCount ();
181180 // both currently running and enqueued count as "active" for throttling purposes
182- int numActiveMerges = activeMergeTasksExecutingOnLocalSchedulerList .size () + activeMergeTasksLocalSchedulerQueue .size ();
183- if (numActiveMerges > config .getMaxMergeCount () && isThrottling .getAndSet (true ) == false ) {
184- activateThrottling (numActiveMerges );
181+ if (numRunningMerges + numQueuedMerges > configuredMaxMergeCount && isThrottling .getAndSet (true ) == false ) {
182+ activateThrottling (numRunningMerges , numQueuedMerges , configuredMaxMergeCount );
185183 }
186184 }
187185
188186 private void maybeDeactivateThrottling () {
187+ int numRunningMerges = activeMergeTasksExecutingOnLocalSchedulerList .size ();
188+ int numQueuedMerges = activeMergeTasksLocalSchedulerQueue .size ();
189+ int configuredMaxMergeCount = config .getMaxMergeCount ();
189190 // both currently running and enqueued count as "active" for throttling purposes
190- int numActiveMerges = activeMergeTasksExecutingOnLocalSchedulerList .size () + activeMergeTasksLocalSchedulerQueue .size ();
191- if (numActiveMerges <= config .getMaxMergeCount () && isThrottling .getAndSet (false )) {
192- deactivateThrottling (numActiveMerges );
191+ if (numRunningMerges + numQueuedMerges <= configuredMaxMergeCount && isThrottling .getAndSet (false )) {
192+ deactivateThrottling (numRunningMerges , numQueuedMerges , configuredMaxMergeCount );
193193 }
194194 }
195195
0 commit comments