Skip to content

Commit 4a2ced9

Browse files
authored
Fixed poller shutdown (#104)
1 parent 34cf70c commit 4a2ced9

File tree

6 files changed

+67
-52
lines changed

6 files changed

+67
-52
lines changed

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public final class ActivityWorker implements SuspendableWorker {
3737

3838
private static final Logger log = LoggerFactory.getLogger(ActivityWorker.class);
3939

40-
private static final String POLL_THREAD_NAME_PREFIX = "SWF Activity Poll ";
40+
private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller ";
4141

4242
private Poller poller;
4343
private final ActivityTaskHandler handler;
@@ -63,7 +63,8 @@ public void start() {
6363
PollerOptions pollerOptions = options.getPollerOptions();
6464
if (pollerOptions.getPollThreadNamePrefix() == null) {
6565
pollerOptions = new PollerOptions.Builder(pollerOptions)
66-
.setPollThreadNamePrefix(POLL_THREAD_NAME_PREFIX)
66+
.setPollThreadNamePrefix(POLL_THREAD_NAME_PREFIX + "\"" + taskList +
67+
"\", domain=\"" + domain + "\", type=\"activity\"")
6768
.build();
6869
}
6970
Poller.ThrowingRunnable pollTask =

src/main/java/com/uber/cadence/internal/worker/ExecutorThreadFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class ExecutorThreadFactory implements ThreadFactory {
1818
@Override
1919
public Thread newThread(Runnable r) {
2020
Thread result = new Thread(r);
21-
result.setName(threadPrefix + (threadIndex.incrementAndGet()));
21+
result.setName(threadPrefix + ": " + (threadIndex.incrementAndGet()));
2222
result.setUncaughtExceptionHandler(uncaughtExceptionHandler);
2323
return result;
2424
}

src/main/java/com/uber/cadence/internal/worker/Poller.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ private class PollServiceTask implements Runnable {
4646
@Override
4747
public void run() {
4848
try {
49-
if (log.isDebugEnabled()) {
50-
log.debug("poll task begin");
51-
}
52-
5349
if (pollExecutor.isTerminating()) {
5450
return;
5551
}
@@ -81,8 +77,10 @@ public void run() {
8177
}
8278
} finally {
8379
// Resubmit itself back to pollExecutor
84-
if (!pollExecutor.isShutdown()) {
80+
if (!pollExecutor.isTerminating()) {
8581
pollExecutor.execute(this);
82+
} else {
83+
log.info("poll loop done");
8684
}
8785
}
8886
}
@@ -139,9 +137,7 @@ private boolean isStarted() {
139137

140138
@Override
141139
public void shutdown() {
142-
if (log.isInfoEnabled()) {
143-
log.info("shutdown");
144-
}
140+
log.info("shutdown");
145141
if (!isStarted()) {
146142
return;
147143
}
@@ -150,9 +146,7 @@ public void shutdown() {
150146

151147
@Override
152148
public void shutdownNow() {
153-
if (log.isInfoEnabled()) {
154-
log.info("shutdownNow");
155-
}
149+
log.info("shutdownNow poller=" + this.options.getPollThreadNamePrefix());
156150
if (!isStarted()) {
157151
return;
158152
}
@@ -161,22 +155,25 @@ public void shutdownNow() {
161155

162156
@Override
163157
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
164-
long start = System.currentTimeMillis();
165158
if (pollExecutor == null) {
166159
// not started yet.
167160
return true;
168161
}
169-
return pollExecutor.awaitTermination(timeout, unit);
162+
boolean result = pollExecutor.awaitTermination(timeout, unit);
163+
log.info("awaitTermination done");
164+
return result;
170165
}
171166

172167
@Override
173168
public boolean shutdownAndAwaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
169+
log.info("shutdownAndAwaitTermination poller=" + this.options.getPollThreadNamePrefix());
174170
if (!isStarted()) {
175171
return true;
176172
}
177-
long start = System.currentTimeMillis();
178173
pollExecutor.shutdownNow();
179-
return pollExecutor.awaitTermination(timeout, unit);
174+
boolean result = pollExecutor.awaitTermination(timeout, unit);
175+
log.info("shutdownAndAwaitTermination done");
176+
return result;
180177
}
181178

182179

@@ -187,20 +184,23 @@ public boolean isRunning() {
187184

188185
@Override
189186
public void suspendPolling() {
190-
if (log.isInfoEnabled()) {
191-
log.info("suspendPolling");
192-
}
187+
log.info("suspendPolling");
193188
suspendLatch.set(new CountDownLatch(1));
194189
}
195190

196191
@Override
197192
public void resumePolling() {
198-
if (log.isInfoEnabled()) {
199-
log.info("resumePolling");
200-
}
193+
log.info("resumePolling");
201194
CountDownLatch existing = suspendLatch.getAndSet(null);
202195
if (existing != null) {
203196
existing.countDown();
204197
}
205198
}
199+
200+
@Override
201+
public String toString() {
202+
return "Poller{" +
203+
"options=" + options +
204+
'}';
205+
}
206206
}

src/main/java/com/uber/cadence/internal/worker/PollerOptions.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public final static class Builder {
2121

2222
private int pollThreadCount = 1;
2323

24-
private String pollThreadNamePrefix = "poller";
24+
private String pollThreadNamePrefix;
2525

2626
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
2727

@@ -154,4 +154,17 @@ public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
154154
public String getPollThreadNamePrefix() {
155155
return pollThreadNamePrefix;
156156
}
157+
158+
@Override
159+
public String toString() {
160+
return "PollerOptions{" +
161+
"maximumPollRateIntervalMilliseconds=" + maximumPollRateIntervalMilliseconds +
162+
", maximumPollRatePerSecond=" + maximumPollRatePerSecond +
163+
", pollBackoffCoefficient=" + pollBackoffCoefficient +
164+
", pollBackoffInitialInterval=" + pollBackoffInitialInterval +
165+
", pollBackoffMaximumInterval=" + pollBackoffMaximumInterval +
166+
", pollThreadCount=" + pollThreadCount +
167+
", pollThreadNamePrefix='" + pollThreadNamePrefix + '\'' +
168+
'}';
169+
}
157170
}

src/main/java/com/uber/cadence/internal/worker/Throttler.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,27 @@ final class Throttler {
2727
* Human readable name of the resource being throttled.
2828
* Used for logging only.
2929
*/
30-
private final String name_;
30+
private final String name;
3131

3232
/**
3333
* Used as a circular buffer
3434
*/
35-
private CircularLongBuffer checkPointTimes_;
35+
private CircularLongBuffer checkPointTimes;
3636

3737
/**
3838
* Used as an index to a circular buffer
3939
*/
40-
private long index_;
40+
private long index;
4141

4242
/**
4343
* Interval used to measure the rate. Shorter interval allows less spikey
4444
* rates.
4545
*/
46-
private long rateInterval_;
46+
private long rateInterval;
4747

48-
private long rateIntervalMilliseconds_;
48+
private final long rateIntervalMilliseconds;
4949

50-
private long overslept_;
50+
private long overslept;
5151

5252
/**
5353
* Construct throttler.
@@ -61,34 +61,34 @@ public Throttler(String name, double maxRatePerSecond, long rateIntervalMillisec
6161
if (null == name) {
6262
throw new IllegalArgumentException("null name");
6363
}
64-
name_ = name;
64+
this.name = name;
6565
if (maxRatePerSecond <= 0) {
6666
throw new IllegalArgumentException("0 or negative maxRatePerSecond");
6767
}
6868
if (rateIntervalMilliseconds <= 0) {
6969
throw new IllegalArgumentException("0 or negative rateIntervalMilliseconds");
7070
}
7171
synchronized (this) {
72-
rateIntervalMilliseconds_ = rateIntervalMilliseconds;
72+
this.rateIntervalMilliseconds = rateIntervalMilliseconds;
7373
setMaxRatePerSecond(maxRatePerSecond);
7474
}
7575
}
7676

7777
public synchronized void setMaxRatePerSecond(double maxRatePerSecond) {
78-
int maxMessagesPerRateInterval = (int) (maxRatePerSecond * rateIntervalMilliseconds_ / 1000);
78+
int maxMessagesPerRateInterval = (int) (maxRatePerSecond * rateIntervalMilliseconds / 1000);
7979
if (maxMessagesPerRateInterval == 0) {
8080
maxMessagesPerRateInterval = 1;
81-
rateInterval_ = (long) (1.0 / maxRatePerSecond * 1000.0);
81+
rateInterval = (long) (1.0 / maxRatePerSecond * 1000.0);
8282
} else {
83-
rateInterval_ = rateIntervalMilliseconds_;
83+
rateInterval = rateIntervalMilliseconds;
8484
}
85-
if (checkPointTimes_ != null) {
86-
int oldSize = checkPointTimes_.size();
87-
checkPointTimes_ = checkPointTimes_.copy(index_ - maxMessagesPerRateInterval, maxMessagesPerRateInterval);
88-
index_ = Math.min(checkPointTimes_.size(), oldSize);
85+
if (checkPointTimes != null) {
86+
int oldSize = checkPointTimes.size();
87+
checkPointTimes = checkPointTimes.copy(index - maxMessagesPerRateInterval, maxMessagesPerRateInterval);
88+
index = Math.min(checkPointTimes.size(), oldSize);
8989
} else {
90-
checkPointTimes_ = new CircularLongBuffer(maxMessagesPerRateInterval);
91-
index_ = 0;
90+
checkPointTimes = new CircularLongBuffer(maxMessagesPerRateInterval);
91+
index = 0;
9292
}
9393
log.debug("new rate=" + maxRatePerSecond + " (msg/sec)");
9494
}
@@ -106,20 +106,20 @@ public synchronized void throttle(int count) throws InterruptedException {
106106
*/
107107
public synchronized void throttle() throws InterruptedException {
108108
long now = System.currentTimeMillis();
109-
long checkPoint = checkPointTimes_.get(index_);
109+
long checkPoint = checkPointTimes.get(index);
110110
if (checkPoint > 0) {
111111
long elapsed = now - checkPoint;
112112

113113
// if the time for this window is less than the minimum per window
114-
if (elapsed >= 0 && elapsed < rateInterval_) {
115-
long sleepInterval = rateInterval_ - elapsed - overslept_;
116-
overslept_ = 0;
114+
if (elapsed >= 0 && elapsed < rateInterval) {
115+
long sleepInterval = rateInterval - elapsed - overslept;
116+
overslept = 0;
117117
if (sleepInterval > 0) {
118118
if (log.isTraceEnabled()) {
119119
log.debug("Throttling "
120-
+ name_
120+
+ name
121121
+ ": called "
122-
+ checkPointTimes_.size()
122+
+ checkPointTimes.size()
123123
+ " times in last "
124124
+ elapsed
125125
+ " milliseconds. Going to sleep for "
@@ -128,11 +128,11 @@ public synchronized void throttle() throws InterruptedException {
128128
}
129129
long t = System.currentTimeMillis();
130130
Thread.sleep(sleepInterval);
131-
overslept_ = System.currentTimeMillis() - t - sleepInterval;
131+
overslept = System.currentTimeMillis() - t - sleepInterval;
132132
}
133133
}
134134
}
135-
checkPointTimes_.set(index_++, System.currentTimeMillis());
135+
checkPointTimes.set(index++, System.currentTimeMillis());
136136
}
137137

138138
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public final class WorkflowWorker implements SuspendableWorker {
4646

4747
private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
4848

49-
private static final String POLL_THREAD_NAME_PREFIX = "SWF Activity Poll ";
49+
private static final String POLL_THREAD_NAME_PREFIX = "Poller taskList=";
5050
private static final int MAXIMUM_PAGE_SIZE = 10000;
5151

5252
private Poller poller;
@@ -73,7 +73,8 @@ public void start() {
7373
PollerOptions pollerOptions = options.getPollerOptions();
7474
if (pollerOptions.getPollThreadNamePrefix() == null) {
7575
pollerOptions = new PollerOptions.Builder(pollerOptions)
76-
.setPollThreadNamePrefix(POLL_THREAD_NAME_PREFIX)
76+
.setPollThreadNamePrefix(POLL_THREAD_NAME_PREFIX + "\"" + taskList +
77+
"\", domain=\"" + domain + "\", type=\"workflow\"")
7778
.build();
7879
}
7980
Poller.ThrowingRunnable pollTask =

0 commit comments

Comments
 (0)