Skip to content

Commit 3893ef9

Browse files
author
Liang Mei
committed
expose metrics scope in workflow and dedup metrics in replay mode
1 parent 8f9e510 commit 3893ef9

17 files changed

+471
-31
lines changed

src/main/java/com/uber/cadence/internal/metrics/NoopScope.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.m3.tally.Buckets;
2121
import com.uber.m3.tally.Capabilities;
22+
import com.uber.m3.tally.CapableOf;
2223
import com.uber.m3.tally.RootScopeBuilder;
2324
import com.uber.m3.tally.Scope;
2425
import com.uber.m3.tally.StatsReporter;
@@ -57,17 +58,7 @@ public void reportHistogramDurationSamples(
5758

5859
@Override
5960
public Capabilities capabilities() {
60-
return new Capabilities() {
61-
@Override
62-
public boolean reporting() {
63-
return true;
64-
}
65-
66-
@Override
67-
public boolean tagging() {
68-
return true;
69-
}
70-
};
61+
return CapableOf.REPORTING_TAGGING;
7162
}
7263

7364
@Override
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.metrics;
19+
20+
import com.uber.cadence.internal.replay.ReplayAware;
21+
import com.uber.m3.tally.Buckets;
22+
import com.uber.m3.tally.Capabilities;
23+
import com.uber.m3.tally.Counter;
24+
import com.uber.m3.tally.Gauge;
25+
import com.uber.m3.tally.Histogram;
26+
import com.uber.m3.tally.Scope;
27+
import com.uber.m3.tally.ScopeCloseException;
28+
import com.uber.m3.tally.Stopwatch;
29+
import com.uber.m3.tally.Timer;
30+
import com.uber.m3.util.Duration;
31+
import java.util.Map;
32+
import java.util.Objects;
33+
34+
public class ReplayAwareScope implements Scope {
35+
Scope scope;
36+
ReplayAware context;
37+
38+
public ReplayAwareScope(Scope scope, ReplayAware context) {
39+
this.scope = Objects.requireNonNull(scope);
40+
this.context = Objects.requireNonNull(context);
41+
}
42+
43+
private static class ReplayAwareCounter implements Counter {
44+
Counter counter;
45+
ReplayAware context;
46+
47+
public ReplayAwareCounter(Counter counter, ReplayAware context) {
48+
this.counter = Objects.requireNonNull(counter);
49+
this.context = Objects.requireNonNull(context);
50+
}
51+
52+
@Override
53+
public void inc(long delta) {
54+
if (context.isReplaying()) {
55+
return;
56+
}
57+
58+
counter.inc(delta);
59+
}
60+
}
61+
62+
private static class ReplayAwareGauge implements Gauge {
63+
Gauge gauge;
64+
ReplayAware context;
65+
66+
public ReplayAwareGauge(Gauge gauge, ReplayAware context) {
67+
this.gauge = Objects.requireNonNull(gauge);
68+
this.context = Objects.requireNonNull(context);
69+
}
70+
71+
@Override
72+
public void update(double value) {
73+
if (context.isReplaying()) {
74+
return;
75+
}
76+
77+
gauge.update(value);
78+
}
79+
}
80+
81+
private static class ReplayAwareTimer implements Timer {
82+
Timer timer;
83+
ReplayAware context;
84+
85+
public ReplayAwareTimer(Timer timer, ReplayAware context) {
86+
this.timer = Objects.requireNonNull(timer);
87+
this.context = Objects.requireNonNull(context);
88+
}
89+
90+
@Override
91+
public void record(Duration interval) {
92+
if (context.isReplaying()) {
93+
return;
94+
}
95+
96+
timer.record(interval);
97+
}
98+
99+
@Override
100+
public Stopwatch start() {
101+
// TODO: Stopwatch is a concrete class with non-public constructor so there's no
102+
// way to extend it. Need to fix this when the following issue is resolved.
103+
// https://github.com/uber-java/tally/issues/26
104+
return timer.start();
105+
}
106+
}
107+
108+
private static class ReplayAwareHistogram implements Histogram {
109+
Histogram histogram;
110+
ReplayAware context;
111+
112+
public ReplayAwareHistogram(Histogram histogram, ReplayAware context) {
113+
this.histogram = Objects.requireNonNull(histogram);
114+
this.context = Objects.requireNonNull(context);
115+
}
116+
117+
@Override
118+
public void recordValue(double value) {
119+
if (context.isReplaying()) {
120+
return;
121+
}
122+
123+
histogram.recordValue(value);
124+
}
125+
126+
@Override
127+
public void recordDuration(Duration value) {
128+
if (context.isReplaying()) {
129+
return;
130+
}
131+
132+
histogram.recordDuration(value);
133+
}
134+
135+
@Override
136+
public Stopwatch start() {
137+
// TODO: Stopwatch is a concrete class with non-public constructor so there's no
138+
// way to extend it. Need to fix this when the following issue is resolved.
139+
// https://github.com/uber-java/tally/issues/26
140+
return histogram.start();
141+
}
142+
}
143+
144+
@Override
145+
public Counter counter(String name) {
146+
return new ReplayAwareCounter(scope.counter(name), context);
147+
}
148+
149+
@Override
150+
public Gauge gauge(String name) {
151+
return new ReplayAwareGauge(scope.gauge(name), context);
152+
}
153+
154+
@Override
155+
public Timer timer(String name) {
156+
return new ReplayAwareTimer(scope.timer(name), context);
157+
}
158+
159+
@Override
160+
public Histogram histogram(String name, Buckets buckets) {
161+
return new ReplayAwareHistogram(scope.histogram(name, buckets), context);
162+
}
163+
164+
@Override
165+
public Scope tagged(Map<String, String> tags) {
166+
return new ReplayAwareScope(scope.tagged(tags), context);
167+
}
168+
169+
@Override
170+
public Scope subScope(String name) {
171+
return new ReplayAwareScope(scope.subScope(name), context);
172+
}
173+
174+
@Override
175+
public Capabilities capabilities() {
176+
return scope.capabilities();
177+
}
178+
179+
@Override
180+
public void close() throws ScopeCloseException {
181+
scope.close();
182+
}
183+
}

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.WorkflowType;
2222
import com.uber.cadence.workflow.Promise;
23+
import com.uber.m3.tally.Scope;
2324
import java.time.Duration;
2425
import java.util.function.BiConsumer;
2526
import java.util.function.Consumer;
@@ -28,7 +29,7 @@
2829
* Represents the context for decider. Should only be used within the scope of workflow definition
2930
* code, meaning any code which is not part of activity implementations.
3031
*/
31-
public interface DecisionContext {
32+
public interface DecisionContext extends ReplayAware {
3233

3334
WorkflowExecution getWorkflowExecution();
3435

@@ -101,13 +102,6 @@ Consumer<Exception> signalWorkflowExecution(
101102
*/
102103
long currentTimeMillis();
103104

104-
/**
105-
* <code>true</code> indicates if workflow is replaying already processed events to reconstruct it
106-
* state. <code>false</code> indicates that code is making forward process for the first time. For
107-
* example can be used to avoid duplicating log records due to replay.
108-
*/
109-
boolean isReplaying();
110-
111105
/**
112106
* Create a Value that becomes ready after the specified delay.
113107
*
@@ -117,4 +111,6 @@ Consumer<Exception> signalWorkflowExecution(
117111
* @return cancellation handle. Invoke {@link Consumer#accept(Object)} to cancel timer.
118112
*/
119113
Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback);
114+
115+
Scope getMetricsScope();
120116
}

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import com.uber.cadence.WorkflowExecution;
2525
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2626
import com.uber.cadence.WorkflowType;
27+
import com.uber.cadence.internal.metrics.ReplayAwareScope;
2728
import com.uber.cadence.workflow.Promise;
2829
import com.uber.cadence.workflow.Workflow;
30+
import com.uber.m3.tally.Scope;
2931
import java.time.Duration;
3032
import java.util.function.BiConsumer;
3133
import java.util.function.Consumer;
@@ -40,6 +42,8 @@ final class DecisionContextImpl implements DecisionContext, HistoryEventHandler
4042

4143
private final WorkflowContext workflowContext;
4244

45+
private Scope metricsScope;
46+
4347
DecisionContextImpl(
4448
DecisionsHelper decisionsHelper,
4549
String domain,
@@ -51,6 +55,15 @@ final class DecisionContextImpl implements DecisionContext, HistoryEventHandler
5155
this.workflowClock = new ClockDecisionContext(decisionsHelper);
5256
}
5357

58+
public void setMetricsScope(Scope metricsScope) {
59+
this.metricsScope = new ReplayAwareScope(metricsScope, this);
60+
}
61+
62+
@Override
63+
public Scope getMetricsScope() {
64+
return metricsScope;
65+
}
66+
5467
@Override
5568
public WorkflowExecution getWorkflowExecution() {
5669
return workflowContext.getWorkflowExecution();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.replay;
19+
20+
public interface ReplayAware {
21+
/**
22+
* <code>true</code> indicates if workflow is replaying already processed events to reconstruct it
23+
* state. <code>false</code> indicates that code is making forward process for the first time. For
24+
* example can be used to avoid duplicating log records due to replay.
25+
*/
26+
boolean isReplaying();
27+
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class ReplayDecider {
8888
domain,
8989
decisionTask,
9090
historyHelper.getWorkflowExecutionStartedEventAttributes());
91+
context.setMetricsScope(metricsScope);
9192
}
9293

9394
public boolean isCancelRequested() {

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
2828
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
2929
import com.uber.cadence.workflow.Promise;
30+
import com.uber.m3.tally.Scope;
3031
import java.time.Duration;
3132
import java.util.ArrayDeque;
3233
import java.util.ArrayList;
@@ -516,5 +517,10 @@ public boolean isReplaying() {
516517
public Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback) {
517518
throw new UnsupportedOperationException("not implemented");
518519
}
520+
521+
@Override
522+
public Scope getMetricsScope() {
523+
throw new UnsupportedOperationException("not implemented");
524+
}
519525
}
520526
}

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void setDataConverter(DataConverter dataConverter) {
176176
}
177177

178178
@Override
179-
public ReplayWorkflow getWorkflow(WorkflowType workflowType) throws Exception {
179+
public ReplayWorkflow getWorkflow(WorkflowType workflowType) {
180180
SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType);
181181
return new SyncWorkflow(workflow, dataConverter, threadPool, interceptorFactory);
182182
}

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.uber.cadence.workflow.SignalExternalWorkflowException;
4848
import com.uber.cadence.workflow.Workflow;
4949
import com.uber.cadence.workflow.WorkflowInterceptor;
50+
import com.uber.m3.tally.Scope;
5051
import java.time.Duration;
5152
import java.util.HashMap;
5253
import java.util.Map;
@@ -479,4 +480,8 @@ private RuntimeException mapSignalWorkflowException(Exception failure) {
479480
}
480481
return (SignalExternalWorkflowException) failure;
481482
}
483+
484+
public Scope getMetricsScope() {
485+
return context.getMetricsScope();
486+
}
482487
}

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public Worker newWorker(String taskList) {
117117
taskList,
118118
new WorkerOptions.Builder()
119119
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
120+
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
120121
.build());
121122
workers.add(result);
122123
return result;
@@ -128,6 +129,7 @@ public WorkflowClient newWorkflowClient() {
128129
new WorkflowClientOptions.Builder()
129130
.setDataConverter(testEnvironmentOptions.getDataConverter())
130131
.setInterceptors(new TimeLockingInterceptor(service))
132+
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
131133
.build();
132134
return WorkflowClientInternal.newInstance(service, testEnvironmentOptions.getDomain(), options);
133135
}

0 commit comments

Comments
 (0)