Skip to content

Commit cf93f86

Browse files
authored
Miscellaneous stability and debuggability fixes (#326)
* Fix executor threadpool names * Fix logs and also rate limit thread metrics emitting
1 parent 393d19f commit cf93f86

File tree

10 files changed

+86
-88
lines changed

10 files changed

+86
-88
lines changed

src/main/java/com/uber/cadence/converter/CustomThrowableTypeAdapter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,9 @@ private StackTraceElement[] parseStackTrace(JsonObject object) {
178178
}
179179
return result;
180180
} catch (Exception e) {
181-
log.warn("Failed to parse stack trace: " + stackTrace);
181+
if (log.isWarnEnabled()) {
182+
log.warn("Failed to parse stack trace: " + stackTrace);
183+
}
182184
return new StackTraceElement[0];
183185
}
184186
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,8 @@ private static String prettyPrintObject(
766766
return new String((byte[]) object, UTF_8);
767767
}
768768
if (ByteBuffer.class.isAssignableFrom(clz)) {
769-
return new String(((ByteBuffer) object).array(), UTF_8);
769+
byte[] bytes = org.apache.thrift.TBaseHelper.byteBufferToByteArray((ByteBuffer) object);
770+
return new String(bytes, UTF_8);
770771
}
771772
if (clz.equals(Date.class)) {
772773
return String.valueOf(object);

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,9 @@ void handleMarkerRecorded(HistoryEvent event) {
217217
} else if (LOCAL_ACTIVITY_MARKER_NAME.equals(name)) {
218218
handleLocalActivityMarker(attributes);
219219
} else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name) && !VERSION_MARKER_NAME.equals(name)) {
220-
log.warn("Unexpected marker: " + event);
220+
if (log.isWarnEnabled()) {
221+
log.warn("Unexpected marker: " + event);
222+
}
221223
}
222224
}
223225

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,9 @@
2929
import java.util.NoSuchElementException;
3030
import java.util.Optional;
3131
import java.util.concurrent.TimeUnit;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3432

3533
class HistoryHelper {
3634

37-
private static final Logger log = LoggerFactory.getLogger(HistoryHelper.class);
38-
3935
/**
4036
* Events of a single decision. It includes all new events in the history since the last decision
4137
* as events. It doesn't include events that are decision events of the previous decision. The
@@ -242,7 +238,6 @@ public DecisionEvents next() {
242238
replay,
243239
replayCurrentTimeMilliseconds,
244240
nextDecisionEventId);
245-
log.debug("DecisionEventsIterator next=" + result);
246241
return result;
247242
}
248243
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20+
import com.google.common.util.concurrent.RateLimiter;
2021
import com.uber.cadence.internal.logging.LoggerTag;
2122
import com.uber.cadence.internal.metrics.MetricsType;
2223
import com.uber.cadence.internal.replay.DeciderCache;
@@ -35,6 +36,7 @@
3536
import org.slf4j.MDC;
3637

3738
class WorkflowThreadImpl implements WorkflowThread {
39+
private static final RateLimiter metricsRateLimiter = RateLimiter.create(1);
3840

3941
/**
4042
* Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
@@ -100,7 +102,9 @@ public void run() {
100102
if (!isCancelRequested()) {
101103
threadContext.setUnhandledException(e);
102104
}
103-
log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
105+
if (log.isDebugEnabled()) {
106+
log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
107+
}
104108
} catch (Throwable e) {
105109
if (log.isWarnEnabled() && !root) {
106110
StringWriter sw = new StringWriter();
@@ -231,10 +235,12 @@ public void start() {
231235
}
232236
context.setStatus(Status.RUNNING);
233237

234-
getDecisionContext()
235-
.getMetricsScope()
236-
.gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
237-
.update(((ThreadPoolExecutor) threadPool).getActiveCount());
238+
if (metricsRateLimiter.tryAcquire(1)) {
239+
getDecisionContext()
240+
.getMetricsScope()
241+
.gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
242+
.update(((ThreadPoolExecutor) threadPool).getActiveCount());
243+
}
238244

239245
while (true) {
240246
try {

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
public final class ActivityWorker implements SuspendableWorker {
4040

41-
private static final String POLL_THREAD_NAME_PREFIX = "Poller taskList=";
41+
private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller taskList=";
4242

4343
private SuspendableWorker poller = new NoopSuspendableWorker();
4444
private final ActivityTaskHandler handler;
@@ -53,38 +53,31 @@ public ActivityWorker(
5353
String taskList,
5454
SingleWorkerOptions options,
5555
ActivityTaskHandler handler) {
56-
Objects.requireNonNull(service);
57-
Objects.requireNonNull(domain);
58-
Objects.requireNonNull(taskList);
59-
this.service = service;
60-
this.domain = domain;
61-
this.taskList = taskList;
62-
this.options = options;
56+
this.service = Objects.requireNonNull(service);
57+
this.domain = Objects.requireNonNull(domain);
58+
this.taskList = Objects.requireNonNull(taskList);
6359
this.handler = handler;
60+
61+
PollerOptions pollerOptions = options.getPollerOptions();
62+
if (pollerOptions.getPollThreadNamePrefix() == null) {
63+
pollerOptions =
64+
new PollerOptions.Builder(pollerOptions)
65+
.setPollThreadNamePrefix(
66+
POLL_THREAD_NAME_PREFIX + "\"" + taskList + "\", domain=\"" + domain + "\"")
67+
.build();
68+
}
69+
this.options = new SingleWorkerOptions.Builder(options).setPollerOptions(pollerOptions).build();
6470
}
6571

6672
@Override
6773
public void start() {
6874
if (handler.isAnyTypeSupported()) {
69-
PollerOptions pollerOptions = options.getPollerOptions();
70-
if (pollerOptions.getPollThreadNamePrefix() == null) {
71-
pollerOptions =
72-
new PollerOptions.Builder(pollerOptions)
73-
.setPollThreadNamePrefix(
74-
POLL_THREAD_NAME_PREFIX
75-
+ "\""
76-
+ taskList
77-
+ "\", domain=\""
78-
+ domain
79-
+ "\", type=\"activity\"")
80-
.build();
81-
}
8275
poller =
8376
new Poller<>(
8477
options.getIdentity(),
8578
new ActivityPollTask(service, domain, taskList, options),
8679
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
87-
pollerOptions,
80+
options.getPollerOptions(),
8881
options.getMetricsScope());
8982
poller.start();
9083
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,33 +48,29 @@ public LocalActivityWorker(
4848
String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) {
4949
this.domain = Objects.requireNonNull(domain);
5050
this.taskList = Objects.requireNonNull(taskList);
51-
this.options = options;
5251
this.handler = handler;
5352
this.laPollTask = new LocalActivityPollTask();
53+
54+
PollerOptions pollerOptions = options.getPollerOptions();
55+
if (pollerOptions.getPollThreadNamePrefix() == null) {
56+
pollerOptions =
57+
new PollerOptions.Builder(pollerOptions)
58+
.setPollThreadNamePrefix(
59+
POLL_THREAD_NAME_PREFIX + "\"" + taskList + "\", domain=\"" + domain + "\"")
60+
.build();
61+
}
62+
this.options = new SingleWorkerOptions.Builder(options).setPollerOptions(pollerOptions).build();
5463
}
5564

5665
@Override
5766
public void start() {
5867
if (handler.isAnyTypeSupported()) {
59-
PollerOptions pollerOptions = options.getPollerOptions();
60-
if (pollerOptions.getPollThreadNamePrefix() == null) {
61-
pollerOptions =
62-
new PollerOptions.Builder(pollerOptions)
63-
.setPollThreadNamePrefix(
64-
POLL_THREAD_NAME_PREFIX
65-
+ "\""
66-
+ taskList
67-
+ "\", domain=\""
68-
+ domain
69-
+ "\", type=\"activity\"")
70-
.build();
71-
}
7268
poller =
7369
new Poller<>(
7470
options.getIdentity(),
7571
laPollTask,
7672
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
77-
pollerOptions,
73+
options.getPollerOptions(),
7874
options.getMetricsScope());
7975
poller.start();
8076
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);

src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public interface TaskHandler<TT> {
5656
new SynchronousQueue<>());
5757
taskExecutor.setThreadFactory(
5858
new ExecutorThreadFactory(
59-
options.getPollerOptions().getPollThreadNamePrefix() + " " + taskList + " ",
59+
options.getPollerOptions().getPollThreadNamePrefix().replaceFirst("Poller", "Executor"),
6060
options.getPollerOptions().getUncaughtExceptionHandler()));
6161
taskExecutor.setRejectedExecutionHandler(new BlockCallerPolicy());
6262
}

src/main/java/com/uber/cadence/internal/worker/Poller.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,12 @@ public boolean isStarted() {
118118

119119
@Override
120120
public boolean isShutdown() {
121-
return pollExecutor.isShutdown();
121+
return pollExecutor.isShutdown() && taskExecutor.isShutdown();
122122
}
123123

124124
@Override
125125
public boolean isTerminated() {
126-
return pollExecutor.isTerminated();
126+
return pollExecutor.isTerminated() && taskExecutor.isTerminated();
127127
}
128128

129129
@Override
@@ -144,7 +144,9 @@ public void shutdown() {
144144

145145
@Override
146146
public void shutdownNow() {
147-
log.info("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix());
147+
if (log.isInfoEnabled()) {
148+
log.info("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix());
149+
}
148150
if (!isStarted()) {
149151
return;
150152
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,22 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20-
import com.uber.cadence.*;
20+
import com.uber.cadence.BadRequestError;
21+
import com.uber.cadence.DomainNotActiveError;
22+
import com.uber.cadence.EntityNotExistsError;
23+
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
24+
import com.uber.cadence.History;
25+
import com.uber.cadence.HistoryEvent;
26+
import com.uber.cadence.PollForDecisionTaskResponse;
27+
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
28+
import com.uber.cadence.RespondDecisionTaskFailedRequest;
29+
import com.uber.cadence.RespondQueryTaskCompletedRequest;
30+
import com.uber.cadence.WorkflowExecution;
31+
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
32+
import com.uber.cadence.WorkflowQuery;
33+
import com.uber.cadence.WorkflowType;
2134
import com.uber.cadence.common.RetryOptions;
2235
import com.uber.cadence.common.WorkflowExecutionHistory;
23-
import com.uber.cadence.internal.common.InternalUtils;
2436
import com.uber.cadence.internal.common.Retryer;
2537
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
2638
import com.uber.cadence.internal.logging.LoggerTag;
@@ -44,7 +56,7 @@ public final class WorkflowWorker
4456
private static final String POLL_THREAD_NAME_PREFIX = "Workflow Poller taskList=";
4557

4658
private SuspendableWorker poller = new NoopSuspendableWorker();
47-
private final PollTaskExecutor<PollForDecisionTaskResponse> pollTaskExecutor;
59+
private PollTaskExecutor<PollForDecisionTaskResponse> pollTaskExecutor;
4860
private final DecisionTaskHandler handler;
4961
private final IWorkflowService service;
5062
private final String domain;
@@ -57,45 +69,35 @@ public WorkflowWorker(
5769
String taskList,
5870
SingleWorkerOptions options,
5971
DecisionTaskHandler handler) {
60-
Objects.requireNonNull(service);
61-
Objects.requireNonNull(domain);
62-
Objects.requireNonNull(taskList);
63-
this.service = service;
64-
this.domain = domain;
65-
this.taskList = taskList;
66-
this.options = options;
72+
this.service = Objects.requireNonNull(service);
73+
this.domain = Objects.requireNonNull(domain);
74+
this.taskList = Objects.requireNonNull(taskList);
6775
this.handler = handler;
68-
pollTaskExecutor =
69-
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler));
76+
77+
PollerOptions pollerOptions = options.getPollerOptions();
78+
if (pollerOptions.getPollThreadNamePrefix() == null) {
79+
pollerOptions =
80+
new PollerOptions.Builder(pollerOptions)
81+
.setPollThreadNamePrefix(
82+
POLL_THREAD_NAME_PREFIX + "\"" + taskList + "\", domain=\"" + domain + "\"")
83+
.build();
84+
}
85+
this.options = new SingleWorkerOptions.Builder(options).setPollerOptions(pollerOptions).build();
7086
}
7187

7288
@Override
7389
public void start() {
7490
if (handler.isAnyTypeSupported()) {
75-
PollerOptions pollerOptions = options.getPollerOptions();
76-
if (pollerOptions.getPollThreadNamePrefix() == null) {
77-
pollerOptions =
78-
new PollerOptions.Builder(pollerOptions)
79-
.setPollThreadNamePrefix(
80-
POLL_THREAD_NAME_PREFIX
81-
+ "\""
82-
+ taskList
83-
+ "\", domain=\""
84-
+ domain
85-
+ "\", type=\"workflow\"")
86-
.build();
87-
}
88-
SingleWorkerOptions workerOptions =
89-
new SingleWorkerOptions.Builder(options).setPollerOptions(pollerOptions).build();
90-
91+
pollTaskExecutor =
92+
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler));
9193
poller =
9294
new Poller<>(
9395
options.getIdentity(),
9496
new WorkflowPollTask(
9597
service, domain, taskList, options.getMetricsScope(), options.getIdentity()),
9698
pollTaskExecutor,
97-
pollerOptions,
98-
workerOptions.getMetricsScope());
99+
options.getPollerOptions(),
100+
options.getMetricsScope());
99101
poller.start();
100102
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
101103
}
@@ -113,7 +115,7 @@ public boolean isShutdown() {
113115

114116
@Override
115117
public boolean isTerminated() {
116-
return pollTaskExecutor.isTerminated() && poller.isTerminated();
118+
return poller.isTerminated();
117119
}
118120

119121
public byte[] queryWorkflowExecution(WorkflowExecution exec, String queryType, byte[] args)
@@ -195,9 +197,8 @@ public void awaitTermination(long timeout, TimeUnit unit) {
195197
if (!poller.isStarted()) {
196198
return;
197199
}
198-
long timeoutMillis = unit.toMillis(timeout);
199-
timeoutMillis = InternalUtils.awaitTermination(poller, timeoutMillis);
200-
InternalUtils.awaitTermination(pollTaskExecutor, timeoutMillis);
200+
201+
poller.awaitTermination(timeout, unit);
201202
}
202203

203204
@Override

0 commit comments

Comments
 (0)