@@ -65,9 +65,8 @@ private ThreadPoolMergeQueue(ThreadPool threadPool) {
6565 void submitMergeTask (MergeTask mergeTask ) {
6666 enqueueMergeTask (mergeTask );
6767 if (mergeTask .supportsIOThrottling ()) {
68- // count submitted merge tasks that support IO auto throttling
69- activeIOThrottledMergeTasksCount .incrementAndGet ();
70- maybeUpdateTargetMBPerSec ();
68+ // count submitted merge tasks that support IO auto throttling, and maybe adjust IO rate for all
69+ maybeUpdateIORateBytesPerSec (activeIOThrottledMergeTasksCount .incrementAndGet ());
7170 }
7271 executeSmallestMergeTask ();
7372 }
@@ -121,28 +120,36 @@ private void runMergeTask(MergeTask mergeTask) {
121120 assert removed : "completed merge task [" + mergeTask + "] not registered as running" ;
122121 if (mergeTask .supportsIOThrottling ()) {
123122 activeIOThrottledMergeTasksCount .decrementAndGet ();
124- maybeUpdateTargetMBPerSec ();
125123 }
126124 }
127125 }
128126
129- private void maybeUpdateTargetMBPerSec () {
130- long currentTargetIORateBytesPerSec = targetIORateBytesPerSec .get ();
131- long newTargetIORateBytesPerSec = newTargetIORateBytesPerSec (
132- currentTargetIORateBytesPerSec ,
133- activeIOThrottledMergeTasksCount .get (),
134- maxConcurrentMerges
135- );
136- if (currentTargetIORateBytesPerSec != newTargetIORateBytesPerSec
137- && targetIORateBytesPerSec .compareAndSet (currentTargetIORateBytesPerSec , newTargetIORateBytesPerSec )) {
138- // it's OK to have this method update merge tasks concurrently, with different targetMBPerSec values,
139- // as it's not important that all merge tasks are throttled to the same IO rate at all time.
140- // For performance reasons, we don't synchronize the updates to targetMBPerSec values with the update of running merges.
141- currentlyRunningMergeTasks .forEach (mergeTask -> {
142- if (mergeTask .supportsIOThrottling ()) {
143- mergeTask .setIORateLimit (ByteSizeValue .ofBytes (newTargetIORateBytesPerSec ).getMbFrac ());
127+ private void maybeUpdateIORateBytesPerSec (int activeIOThrottledMergeTasks ) {
128+ long currentTargetIORateBytesPerSec = targetIORateBytesPerSec .get (), newTargetIORateBytesPerSec = 0L ;
129+ for (boolean isNewComputed = false ;;) {
130+ if (isNewComputed == false ) {
131+ newTargetIORateBytesPerSec = newTargetIORateBytesPerSec (
132+ currentTargetIORateBytesPerSec ,
133+ activeIOThrottledMergeTasks ,
134+ maxConcurrentMerges
135+ );
136+ if (newTargetIORateBytesPerSec == currentTargetIORateBytesPerSec ) {
137+ break ;
144138 }
145- });
139+ }
140+ if (targetIORateBytesPerSec .weakCompareAndSetVolatile (currentTargetIORateBytesPerSec , newTargetIORateBytesPerSec )) {
141+ // it's OK to have this method update merge tasks concurrently, with different targetMBPerSec values,
142+ // as it's not important that all merge tasks are throttled to the same IO rate at all time.
143+ // For performance reasons, we don't synchronize the updates to targetMBPerSec values with the update of running merges.
144+ final long finalNewTargetIORateBytesPerSec = newTargetIORateBytesPerSec ;
145+ currentlyRunningMergeTasks .forEach (mergeTask -> {
146+ if (mergeTask .supportsIOThrottling ()) {
147+ mergeTask .setIORateLimit (ByteSizeValue .ofBytes (finalNewTargetIORateBytesPerSec ).getMbFrac ());
148+ }
149+ });
150+ break ;
151+ }
152+ isNewComputed = (currentTargetIORateBytesPerSec == (currentTargetIORateBytesPerSec = targetIORateBytesPerSec .get ()));
146153 }
147154 }
148155
0 commit comments