Skip to content

Commit 951cef0

Browse files
authored
Sticky metrics (#204)
1 parent bf2124a commit 951cef0

File tree

10 files changed

+226
-55
lines changed

10 files changed

+226
-55
lines changed

src/main/java/com/uber/cadence/internal/metrics/MetricsType.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,5 +134,10 @@ public class MetricsType {
134134

135135
public static final String STICKY_CACHE_HIT = CADENCE_METRICS_PREFIX + "sticky-cache-hit";
136136
public static final String STICKY_CACHE_MISS = CADENCE_METRICS_PREFIX + "sticky-cache-miss";
137+
public static final String STICKY_CACHE_TOTAL_FORCED_EVICTION =
138+
CADENCE_METRICS_PREFIX + "sticky-cache-total-forced-eviction";
139+
public static final String STICKY_CACHE_THREAD_FORCED_EVICTION =
140+
CADENCE_METRICS_PREFIX + "sticky-cache-thread-forced-eviction";
137141
public static final String STICKY_CACHE_STALL = CADENCE_METRICS_PREFIX + "sticky-cache-stall";
142+
public static final String STICKY_CACHE_SIZE = CADENCE_METRICS_PREFIX + "sticky-cache-size";
138143
}

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,21 @@
2424
import com.google.common.cache.Weigher;
2525
import com.uber.cadence.PollForDecisionTaskResponse;
2626
import com.uber.cadence.internal.common.ThrowableFunc1;
27+
import com.uber.cadence.internal.metrics.MetricsType;
28+
import com.uber.m3.tally.Scope;
29+
import java.util.Objects;
2730
import java.util.UUID;
2831

2932
public final class DeciderCache {
3033
private final String evictionEntryId = UUID.randomUUID().toString();
3134
private final int maxCacheSize;
35+
private final Scope metricsScope;
3236
private LoadingCache<String, WeightedCacheEntry<Decider>> cache;
3337

34-
public DeciderCache(int maxCacheSize) {
38+
public DeciderCache(int maxCacheSize, Scope scope) {
3539
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
3640
this.maxCacheSize = maxCacheSize;
41+
this.metricsScope = Objects.requireNonNull(scope);
3742
this.cache =
3843
CacheBuilder.newBuilder()
3944
.maximumWeight(maxCacheSize)
@@ -60,6 +65,7 @@ public Decider getOrCreate(
6065
ThrowableFunc1<PollForDecisionTaskResponse, Decider, Exception> createReplayDecider)
6166
throws Exception {
6267
String runId = decisionTask.getWorkflowExecution().getRunId();
68+
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
6369
if (isFullHistory(decisionTask)) {
6470
cache.invalidate(runId);
6571
return cache.get(
@@ -71,17 +77,22 @@ public Decider getOrCreate(
7177

7278
public Decider getUnchecked(String runId) throws Exception {
7379
try {
74-
return cache.getUnchecked(runId).entry;
80+
Decider cachedDecider = cache.getUnchecked(runId).entry;
81+
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
82+
return cachedDecider;
7583
} catch (CacheLoader.InvalidCacheLoadException e) {
84+
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
7685
throw new EvictedException(runId);
7786
}
7887
}
7988

8089
public void evictNext() {
90+
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
8191
int remainingSpace = (int) (maxCacheSize - cache.size());
8292
// Force eviction to happen
8393
cache.put(evictionEntryId, new WeightedCacheEntry<>(null, remainingSpace + 1));
8494
cache.invalidate(evictionEntryId);
95+
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
8596
}
8697

8798
public void invalidate(PollForDecisionTaskResponse decisionTask) {

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.sync;
1919

2020
import com.uber.cadence.internal.logging.LoggerTag;
21+
import com.uber.cadence.internal.metrics.MetricsType;
2122
import com.uber.cadence.internal.replay.DeciderCache;
2223
import com.uber.cadence.internal.replay.DecisionContext;
2324
import com.uber.cadence.workflow.Promise;
@@ -233,6 +234,10 @@ public void start() {
233234
taskFuture = threadPool.submit(task);
234235
return;
235236
} catch (RejectedExecutionException e) {
237+
getDecisionContext()
238+
.getMetricsScope()
239+
.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
240+
.inc(1);
236241
cache.evictNext();
237242
}
238243

src/main/java/com/uber/cadence/internal/worker/WorkflowPollTask.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,50 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20-
import com.google.common.base.Preconditions;
21-
import com.uber.cadence.*;
20+
import com.uber.cadence.InternalServiceError;
21+
import com.uber.cadence.PollForDecisionTaskRequest;
22+
import com.uber.cadence.PollForDecisionTaskResponse;
23+
import com.uber.cadence.ServiceBusyError;
24+
import com.uber.cadence.TaskList;
2225
import com.uber.cadence.internal.metrics.MetricsType;
2326
import com.uber.cadence.serviceclient.IWorkflowService;
27+
import com.uber.m3.tally.Scope;
2428
import com.uber.m3.tally.Stopwatch;
29+
import java.util.Objects;
2530
import org.apache.thrift.TException;
2631
import org.slf4j.Logger;
2732
import org.slf4j.LoggerFactory;
2833

2934
final class WorkflowPollTask implements Poller.PollTask<PollForDecisionTaskResponse> {
3035

31-
private final SingleWorkerOptions options;
36+
private final Scope metricScope;
3237
private final IWorkflowService service;
3338
private final String domain;
3439
private final String taskList;
40+
private final String identity;
3541
private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
3642

3743
WorkflowPollTask(
38-
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {
39-
Preconditions.checkNotNull(service, "service should not be null");
40-
Preconditions.checkNotNull(domain, "domain should not be null");
41-
Preconditions.checkNotNull(taskList, "taskList should not be null");
42-
Preconditions.checkNotNull(options, "options should not be null");
43-
44-
this.service = service;
45-
this.domain = domain;
46-
this.taskList = taskList;
47-
this.options = options;
44+
IWorkflowService service,
45+
String domain,
46+
String taskList,
47+
Scope metricScope,
48+
String identity) {
49+
this.identity = Objects.requireNonNull(identity);
50+
this.service = Objects.requireNonNull(service);
51+
this.domain = Objects.requireNonNull(domain);
52+
this.taskList = Objects.requireNonNull(taskList);
53+
this.metricScope = Objects.requireNonNull(metricScope);
4854
}
4955

5056
@Override
5157
public PollForDecisionTaskResponse poll() throws TException {
52-
options.getMetricsScope().counter(MetricsType.DECISION_POLL_COUNTER).inc(1);
53-
Stopwatch sw = options.getMetricsScope().timer(MetricsType.DECISION_POLL_LATENCY).start();
58+
metricScope.counter(MetricsType.DECISION_POLL_COUNTER).inc(1);
59+
Stopwatch sw = metricScope.timer(MetricsType.DECISION_POLL_LATENCY).start();
5460

5561
PollForDecisionTaskRequest pollRequest = new PollForDecisionTaskRequest();
5662
pollRequest.setDomain(domain);
57-
pollRequest.setIdentity(options.getIdentity());
63+
pollRequest.setIdentity(identity);
5864

5965
TaskList tl = new TaskList();
6066
tl.setName(taskList);
@@ -67,10 +73,10 @@ public PollForDecisionTaskResponse poll() throws TException {
6773
try {
6874
result = service.PollForDecisionTask(pollRequest);
6975
} catch (InternalServiceError | ServiceBusyError e) {
70-
options.getMetricsScope().counter(MetricsType.DECISION_POLL_TRANSIENT_FAILED_COUNTER).inc(1);
76+
metricScope.counter(MetricsType.DECISION_POLL_TRANSIENT_FAILED_COUNTER).inc(1);
7177
throw e;
7278
} catch (TException e) {
73-
options.getMetricsScope().counter(MetricsType.DECISION_POLL_FAILED_COUNTER).inc(1);
79+
metricScope.counter(MetricsType.DECISION_POLL_FAILED_COUNTER).inc(1);
7480
throw e;
7581
}
7682

@@ -90,11 +96,11 @@ public PollForDecisionTaskResponse poll() throws TException {
9096
}
9197

9298
if (result == null || result.getTaskToken() == null) {
93-
options.getMetricsScope().counter(MetricsType.DECISION_POLL_NO_TASK_COUNTER).inc(1);
99+
metricScope.counter(MetricsType.DECISION_POLL_NO_TASK_COUNTER).inc(1);
94100
return null;
95101
}
96102

97-
options.getMetricsScope().counter(MetricsType.DECISION_POLL_SUCCEED_COUNTER).inc(1);
103+
metricScope.counter(MetricsType.DECISION_POLL_SUCCEED_COUNTER).inc(1);
98104
sw.stop();
99105
return result;
100106
}

src/main/java/com/uber/cadence/internal/worker/WorkflowPollTaskFactory.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,34 @@
1919

2020
import com.uber.cadence.PollForDecisionTaskResponse;
2121
import com.uber.cadence.serviceclient.IWorkflowService;
22+
import com.uber.m3.tally.Scope;
2223
import java.util.Objects;
2324
import java.util.function.Supplier;
2425

2526
public class WorkflowPollTaskFactory
2627
implements Supplier<Poller.PollTask<PollForDecisionTaskResponse>> {
2728

28-
private final SingleWorkerOptions options;
2929
private final IWorkflowService service;
3030
private final String domain;
3131
private final String taskList;
32+
private final Scope metricScope;
33+
private final String identity;
3234

3335
public WorkflowPollTaskFactory(
34-
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {
36+
IWorkflowService service,
37+
String domain,
38+
String taskList,
39+
Scope metricScope,
40+
String identity) {
3541
this.service = Objects.requireNonNull(service, "service should not be null");
3642
this.domain = Objects.requireNonNull(domain, "domain should not be null");
3743
this.taskList = Objects.requireNonNull(taskList, "taskList should not be null");
38-
this.options = Objects.requireNonNull(options, "options should not be null");
44+
this.metricScope = Objects.requireNonNull(metricScope, "metricScope should not be null");
45+
this.identity = Objects.requireNonNull(identity, "identity should not be null");
3946
}
4047

4148
@Override
4249
public Poller.PollTask<PollForDecisionTaskResponse> get() {
43-
return new WorkflowPollTask(service, domain, taskList, options);
50+
return new WorkflowPollTask(service, domain, taskList, metricScope, identity);
4451
}
4552
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public void start() {
9494
poller =
9595
new Poller<>(
9696
options.getIdentity(),
97-
new WorkflowPollTask(service, domain, taskList, options),
97+
new WorkflowPollTask(
98+
service, domain, taskList, options.getMetricsScope(), options.getIdentity()),
9899
pollTaskExecutor,
99100
pollerOptions,
100101
workerOptions.getMetricsScope());

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.uber.cadence.client.WorkflowClient;
2727
import com.uber.cadence.converter.DataConverter;
2828
import com.uber.cadence.internal.metrics.MetricsTag;
29+
import com.uber.cadence.internal.metrics.NoopScope;
2930
import com.uber.cadence.internal.replay.DeciderCache;
3031
import com.uber.cadence.internal.sync.SyncActivityWorker;
3132
import com.uber.cadence.internal.sync.SyncWorkflowWorker;
@@ -40,6 +41,7 @@
4041
import com.uber.cadence.worker.WorkerOptions.Builder;
4142
import com.uber.cadence.workflow.Functions.Func;
4243
import com.uber.cadence.workflow.WorkflowMethod;
44+
import com.uber.m3.tally.Scope;
4345
import com.uber.m3.util.ImmutableMap;
4446
import java.lang.reflect.Type;
4547
import java.net.InetAddress;
@@ -400,11 +402,13 @@ public Factory(IWorkflowService workflowService, String domain, FactoryOptions f
400402
return;
401403
}
402404

403-
this.cache = new DeciderCache(factoryOptions.cacheMaximumSize);
405+
factoryOptions.metricsScope.tagged(
406+
new ImmutableMap.Builder<String, String>(2)
407+
.put(MetricsTag.DOMAIN, domain)
408+
.put(MetricsTag.TASK_LIST, getStickyTaskListName())
409+
.build());
404410

405-
// TODO: expose configuring these through Factory options
406-
SingleWorkerOptions options = getDefaultSingleWorkerOptions();
407-
PollerOptions pollerOptions = getDefaultPollerOptions(options);
411+
this.cache = new DeciderCache(factoryOptions.cacheMaximumSize, factoryOptions.metricsScope);
408412

409413
dispatcher = new PollDecisionTaskDispatcherFactory(workflowService).create();
410414
stickyPoller =
@@ -414,11 +418,12 @@ public Factory(IWorkflowService workflowService, String domain, FactoryOptions f
414418
workflowService,
415419
domain,
416420
getStickyTaskListName(),
417-
getDefaultSingleWorkerOptions())
421+
factoryOptions.metricsScope,
422+
id.toString())
418423
.get(),
419424
dispatcher,
420-
pollerOptions,
421-
options.getMetricsScope());
425+
factoryOptions.stickyWorkflowPollerOptions,
426+
factoryOptions.metricsScope);
422427
}
423428

424429
public Worker newWorker(String taskList) {
@@ -534,6 +539,8 @@ public static class Builder {
534539
private int stickyDecisionScheduleToStartTimeoutInSeconds = 5;
535540
private int cacheMaximumSize = 600;
536541
private int maxWorkflowThreadCount = 600;
542+
private PollerOptions stickyWorkflowPollerOptions;
543+
private Scope metricScope;
537544

538545
public Builder setEnableStickyExecution(boolean enableStickyExecution) {
539546
this.enableStickyExecution = enableStickyExecution;
@@ -557,25 +564,41 @@ public Builder setStickyDecisionScheduleToStartTimeoutInSeconds(
557564
return this;
558565
}
559566

567+
public Builder setStickyWorkflowPollerOptions(PollerOptions stickyWorkflowPollerOptions) {
568+
this.stickyWorkflowPollerOptions = stickyWorkflowPollerOptions;
569+
return this;
570+
}
571+
572+
public Builder setMetricScope(Scope metricScope) {
573+
this.metricScope = metricScope;
574+
return this;
575+
}
576+
560577
public FactoryOptions Build() {
561578
return new FactoryOptions(
562579
enableStickyExecution,
563580
cacheMaximumSize,
564581
maxWorkflowThreadCount,
565-
stickyDecisionScheduleToStartTimeoutInSeconds);
582+
stickyDecisionScheduleToStartTimeoutInSeconds,
583+
stickyWorkflowPollerOptions,
584+
metricScope);
566585
}
567586
}
568587

569588
private final boolean enableStickyExecution;
570589
private final int cacheMaximumSize;
571590
private final int maxWorkflowThreadCount;
572591
private final int stickyDecisionScheduleToStartTimeoutInSeconds;
592+
private final PollerOptions stickyWorkflowPollerOptions;
593+
private final Scope metricsScope;
573594

574595
private FactoryOptions(
575596
boolean enableStickyExecution,
576597
int cacheMaximumSize,
577598
int maxWorkflowThreadCount,
578-
int stickyDecisionScheduleToStartTimeoutInSeconds) {
599+
int stickyDecisionScheduleToStartTimeoutInSeconds,
600+
PollerOptions stickyWorkflowPollerOptions,
601+
Scope metricsScope) {
579602
Preconditions.checkArgument(
580603
cacheMaximumSize > 0, "cacheMaximumSize should be greater than 0");
581604
Preconditions.checkArgument(
@@ -589,6 +612,23 @@ private FactoryOptions(
589612
this.maxWorkflowThreadCount = maxWorkflowThreadCount;
590613
this.stickyDecisionScheduleToStartTimeoutInSeconds =
591614
stickyDecisionScheduleToStartTimeoutInSeconds;
615+
616+
if (stickyWorkflowPollerOptions == null) {
617+
this.stickyWorkflowPollerOptions =
618+
new PollerOptions.Builder()
619+
.setPollBackoffInitialInterval(Duration.ofMillis(200))
620+
.setPollBackoffMaximumInterval(Duration.ofSeconds(20))
621+
.setPollThreadCount(1)
622+
.build();
623+
} else {
624+
this.stickyWorkflowPollerOptions = stickyWorkflowPollerOptions;
625+
}
626+
627+
if (metricsScope == null) {
628+
this.metricsScope = NoopScope.getInstance();
629+
} else {
630+
this.metricsScope = metricsScope;
631+
}
592632
}
593633
}
594634
}

0 commit comments

Comments
 (0)