Skip to content

Commit c46c85c

Browse files
Adding poller autoscaler, to dynamically resize number of pollers. (#761)
Adding poller autoscaler, to dynamically resize number of pollers.
1 parent 2270c9b commit c46c85c

File tree

9 files changed

+505
-0
lines changed

9 files changed

+505
-0
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
import java.time.Duration;
19+
import java.util.concurrent.Executors;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
public class PollerAutoScaler {
24+
25+
private static final Logger LOGGER = LoggerFactory.getLogger(PollerAutoScaler.class);
26+
27+
private final Duration coolDownTime;
28+
private final PollerUsageEstimator pollerUsageEstimator;
29+
private final Recommender recommender;
30+
private final ResizableSemaphore semaphore;
31+
private int semaphoreSize;
32+
private boolean shuttingDown;
33+
34+
public PollerAutoScaler(
35+
Duration coolDownTime, PollerUsageEstimator pollerUsageEstimator, Recommender recommender) {
36+
this.coolDownTime = coolDownTime;
37+
this.pollerUsageEstimator = pollerUsageEstimator;
38+
this.recommender = recommender;
39+
this.semaphore = new ResizableSemaphore(recommender.getUpperValue());
40+
this.semaphoreSize = recommender.getUpperValue();
41+
}
42+
43+
public void start() {
44+
Executors.newSingleThreadExecutor()
45+
.submit(
46+
new Runnable() {
47+
@Override
48+
public void run() {
49+
while (!shuttingDown) {
50+
try {
51+
Thread.sleep(coolDownTime.toMillis());
52+
if (!shuttingDown) {
53+
resizePollers();
54+
}
55+
} catch (InterruptedException e) {
56+
LOGGER.info("interrupted wait for next poller scaling");
57+
}
58+
}
59+
}
60+
});
61+
}
62+
63+
public void stop() {
64+
LOGGER.info("shutting down poller autoscaler");
65+
shuttingDown = true;
66+
}
67+
68+
protected void resizePollers() {
69+
PollerUsage pollerUsage = pollerUsageEstimator.estimate();
70+
int pollerCount =
71+
recommender.recommend(this.semaphoreSize, pollerUsage.getPollerUtilizationRate());
72+
73+
int diff = this.semaphoreSize - pollerCount;
74+
if (diff < 0) {
75+
semaphore.release(diff * -1);
76+
} else {
77+
semaphore.decreasePermits(diff);
78+
}
79+
80+
LOGGER.info(String.format("resized pollers to: %d", pollerCount));
81+
this.semaphoreSize = pollerCount;
82+
}
83+
84+
public void acquire() throws InterruptedException {
85+
semaphore.acquire();
86+
}
87+
88+
public void release() {
89+
semaphore.release();
90+
}
91+
92+
// For testing
93+
protected int getSemaphoreSize() {
94+
return semaphoreSize;
95+
}
96+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
public class PollerUsage {
19+
20+
private final float pollerUtilizationRate;
21+
22+
public PollerUsage(float pollerUtilizationRate) {
23+
this.pollerUtilizationRate = pollerUtilizationRate;
24+
}
25+
26+
public float getPollerUtilizationRate() {
27+
return pollerUtilizationRate;
28+
}
29+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
public class PollerUsageEstimator {
21+
22+
private AtomicInteger noopTaskCount = new AtomicInteger();
23+
private AtomicInteger actionableTaskCount = new AtomicInteger();
24+
25+
public void increaseNoopTaskCount() {
26+
noopTaskCount.addAndGet(1);
27+
}
28+
29+
public void increaseActionableTaskCount() {
30+
actionableTaskCount.addAndGet(1);
31+
}
32+
33+
public PollerUsage estimate() {
34+
int actionableTasks = actionableTaskCount.get();
35+
int noopTasks = noopTaskCount.get();
36+
if (noopTasks + actionableTasks == 0) {
37+
return new PollerUsage(0);
38+
}
39+
PollerUsage result = new PollerUsage((actionableTasks * 1f) / (noopTasks + actionableTasks));
40+
reset();
41+
return result;
42+
}
43+
44+
private void reset() {
45+
noopTaskCount.set(0);
46+
actionableTaskCount.set(0);
47+
}
48+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
public class Recommender {
19+
20+
private final float targetPollerUtilRate;
21+
private final int upperValue;
22+
private final int lowerValue;
23+
24+
public Recommender(float targetPollerUtilRate, int upperValue, int lowerValue) {
25+
this.targetPollerUtilRate = targetPollerUtilRate;
26+
this.upperValue = upperValue;
27+
this.lowerValue = lowerValue;
28+
}
29+
30+
public int recommend(int currentPollers, float pollerUtilizationRate) {
31+
if (pollerUtilizationRate == 1) {
32+
return upperValue;
33+
}
34+
35+
float r = currentPollers * pollerUtilizationRate / targetPollerUtilRate;
36+
return Math.round(Math.min(upperValue, Math.max(lowerValue, r)));
37+
}
38+
39+
public int getUpperValue() {
40+
return upperValue;
41+
}
42+
43+
public int getLowerValue() {
44+
return lowerValue;
45+
}
46+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
import java.util.concurrent.Semaphore;
19+
20+
public class ResizableSemaphore extends Semaphore {
21+
public ResizableSemaphore(int permits) {
22+
super(permits);
23+
}
24+
25+
public void decreasePermits(int reduction) {
26+
reducePermits(reduction);
27+
}
28+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.time.Duration;
21+
import org.junit.Test;
22+
import org.junit.runner.RunWith;
23+
import org.mockito.runners.MockitoJUnitRunner;
24+
25+
@RunWith(MockitoJUnitRunner.class)
26+
public class PollerAutoScalerTest {
27+
28+
@Test
29+
public void testAutoScalerScalesPollers() {
30+
PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator();
31+
Recommender recommender = new Recommender(0.5f, 100, 10);
32+
PollerAutoScaler pollerAutoScaler =
33+
new PollerAutoScaler(Duration.ofSeconds(1), pollerUsageEstimator, recommender);
34+
35+
assertEquals(100, pollerAutoScaler.getSemaphoreSize());
36+
37+
pollerUsageEstimator.increaseActionableTaskCount();
38+
pollerAutoScaler.resizePollers();
39+
40+
assertEquals(100, pollerAutoScaler.getSemaphoreSize());
41+
42+
pollerUsageEstimator.increaseNoopTaskCount();
43+
44+
pollerAutoScaler.resizePollers();
45+
46+
assertEquals(10, pollerAutoScaler.getSemaphoreSize());
47+
}
48+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.mockito.runners.MockitoJUnitRunner;
25+
26+
@RunWith(MockitoJUnitRunner.class)
27+
public class PollerUsageEstimatorTest {
28+
29+
@Test
30+
public void testUsageIsZero() {
31+
PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator();
32+
PollerUsage pollerUsage = pollerUsageEstimator.estimate();
33+
assertEquals(0f, pollerUsage.getPollerUtilizationRate(), 0);
34+
}
35+
36+
@Test
37+
public void testUsagesIs100Percent() {
38+
PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator();
39+
pollerUsageEstimator.increaseActionableTaskCount();
40+
pollerUsageEstimator.increaseActionableTaskCount();
41+
PollerUsage pollerUsage = pollerUsageEstimator.estimate();
42+
assertEquals(1f, pollerUsage.getPollerUtilizationRate(), 0);
43+
}
44+
45+
@Test
46+
public void testUsageCalculatedCorrectly() {
47+
PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator();
48+
pollerUsageEstimator.increaseNoopTaskCount();
49+
pollerUsageEstimator.increaseActionableTaskCount();
50+
pollerUsageEstimator.increaseNoopTaskCount();
51+
pollerUsageEstimator.increaseNoopTaskCount();
52+
53+
PollerUsage pollerUsage = pollerUsageEstimator.estimate();
54+
assertEquals(0.25f, pollerUsage.getPollerUtilizationRate(), 0);
55+
}
56+
57+
@Test
58+
public void estimationIsReset() {
59+
PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator();
60+
pollerUsageEstimator.increaseActionableTaskCount();
61+
62+
PollerUsage pollerUsage = pollerUsageEstimator.estimate();
63+
assertEquals(1f, pollerUsage.getPollerUtilizationRate(), 0);
64+
65+
pollerUsage = pollerUsageEstimator.estimate();
66+
assertEquals(0f, pollerUsage.getPollerUtilizationRate(), 0);
67+
}
68+
69+
@Test
70+
public void testThreadSafety() throws Exception {
71+
PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator();
72+
List<Thread> threadList = new ArrayList<>();
73+
for (int i = 0; i < 100; i++) {
74+
threadList.add(
75+
new Thread(
76+
new Runnable() {
77+
@Override
78+
public void run() {
79+
for (int i = 0; i < 100; i++) {
80+
pollerUsageEstimator.increaseActionableTaskCount();
81+
}
82+
}
83+
}));
84+
}
85+
86+
threadList.forEach(Thread::start);
87+
for (Thread thread : threadList) {
88+
thread.join();
89+
}
90+
91+
pollerUsageEstimator.increaseNoopTaskCount();
92+
assertEquals(10000f / 10001, pollerUsageEstimator.estimate().getPollerUtilizationRate(), 0);
93+
}
94+
}

0 commit comments

Comments
 (0)