Skip to content

Commit 9917088

Browse files
authored
Merge pull request #149 from meiliang86/metrics2
expose metrics scope in workflow and de-dup metrics in replay mode
2 parents 8f9e510 + 6f84971 commit 9917088

18 files changed

+588
-32
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ dependencies {
4242
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
4343
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
4444
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
45-
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.0'
45+
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.1'
4646
testCompile group: 'junit', name: 'junit', version: '4.12'
4747
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
4848
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

src/main/java/com/uber/cadence/internal/metrics/NoopScope.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.m3.tally.Buckets;
2121
import com.uber.m3.tally.Capabilities;
22+
import com.uber.m3.tally.CapableOf;
2223
import com.uber.m3.tally.RootScopeBuilder;
2324
import com.uber.m3.tally.Scope;
2425
import com.uber.m3.tally.StatsReporter;
@@ -57,17 +58,7 @@ public void reportHistogramDurationSamples(
5758

5859
@Override
5960
public Capabilities capabilities() {
60-
return new Capabilities() {
61-
@Override
62-
public boolean reporting() {
63-
return true;
64-
}
65-
66-
@Override
67-
public boolean tagging() {
68-
return true;
69-
}
70-
};
61+
return CapableOf.REPORTING_TAGGING;
7162
}
7263

7364
@Override
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.metrics;
19+
20+
import com.uber.cadence.internal.replay.ReplayAware;
21+
import com.uber.m3.tally.Buckets;
22+
import com.uber.m3.tally.Capabilities;
23+
import com.uber.m3.tally.Counter;
24+
import com.uber.m3.tally.Gauge;
25+
import com.uber.m3.tally.Histogram;
26+
import com.uber.m3.tally.Scope;
27+
import com.uber.m3.tally.ScopeCloseException;
28+
import com.uber.m3.tally.Stopwatch;
29+
import com.uber.m3.tally.StopwatchRecorder;
30+
import com.uber.m3.tally.Timer;
31+
import com.uber.m3.util.Duration;
32+
import java.util.Map;
33+
import java.util.Objects;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.function.Supplier;
36+
37+
public class ReplayAwareScope implements Scope {
38+
private Scope scope;
39+
private ReplayAware context;
40+
private Supplier<Long> clock;
41+
42+
public ReplayAwareScope(Scope scope, ReplayAware context, Supplier<Long> clock) {
43+
this.scope = Objects.requireNonNull(scope);
44+
this.context = Objects.requireNonNull(context);
45+
this.clock = Objects.requireNonNull(clock);
46+
}
47+
48+
private class ReplayAwareCounter implements Counter {
49+
Counter counter;
50+
51+
ReplayAwareCounter(Counter counter) {
52+
this.counter = Objects.requireNonNull(counter);
53+
}
54+
55+
@Override
56+
public void inc(long delta) {
57+
if (context.isReplaying()) {
58+
return;
59+
}
60+
61+
counter.inc(delta);
62+
}
63+
}
64+
65+
private class ReplayAwareGauge implements Gauge {
66+
Gauge gauge;
67+
68+
ReplayAwareGauge(Gauge gauge) {
69+
this.gauge = Objects.requireNonNull(gauge);
70+
}
71+
72+
@Override
73+
public void update(double value) {
74+
if (context.isReplaying()) {
75+
return;
76+
}
77+
78+
gauge.update(value);
79+
}
80+
}
81+
82+
private class ReplayAwareTimer implements Timer, DurationRecorder {
83+
Timer timer;
84+
85+
ReplayAwareTimer(Timer timer) {
86+
this.timer = Objects.requireNonNull(timer);
87+
}
88+
89+
@Override
90+
public void record(Duration interval) {
91+
if (context.isReplaying()) {
92+
return;
93+
}
94+
95+
timer.record(interval);
96+
}
97+
98+
@Override
99+
public Stopwatch start() {
100+
long startNanos = TimeUnit.MILLISECONDS.toNanos(clock.get());
101+
return new Stopwatch(startNanos, new ReplayAwareStopwatchRecorder(this));
102+
}
103+
104+
@Override
105+
public void recordDuration(Duration interval) {
106+
record(interval);
107+
}
108+
}
109+
110+
interface DurationRecorder {
111+
void recordDuration(Duration interval);
112+
}
113+
114+
private class ReplayAwareStopwatchRecorder implements StopwatchRecorder {
115+
DurationRecorder recorder;
116+
117+
ReplayAwareStopwatchRecorder(DurationRecorder recorder) {
118+
this.recorder = recorder;
119+
}
120+
121+
@Override
122+
public void recordStopwatch(long startNanos) {
123+
long endNanos = TimeUnit.MILLISECONDS.toNanos(clock.get());
124+
recorder.recordDuration(Duration.between(startNanos, endNanos));
125+
}
126+
}
127+
128+
private class ReplayAwareHistogram implements Histogram, DurationRecorder {
129+
Histogram histogram;
130+
131+
ReplayAwareHistogram(Histogram histogram) {
132+
this.histogram = Objects.requireNonNull(histogram);
133+
}
134+
135+
@Override
136+
public void recordValue(double value) {
137+
if (context.isReplaying()) {
138+
return;
139+
}
140+
141+
histogram.recordValue(value);
142+
}
143+
144+
@Override
145+
public void recordDuration(Duration value) {
146+
if (context.isReplaying()) {
147+
return;
148+
}
149+
150+
histogram.recordDuration(value);
151+
}
152+
153+
@Override
154+
public Stopwatch start() {
155+
long startNanos = TimeUnit.MILLISECONDS.toNanos(clock.get());
156+
return new Stopwatch(startNanos, new ReplayAwareStopwatchRecorder(this));
157+
}
158+
}
159+
160+
@Override
161+
public Counter counter(String name) {
162+
return new ReplayAwareCounter(scope.counter(name));
163+
}
164+
165+
@Override
166+
public Gauge gauge(String name) {
167+
return new ReplayAwareGauge(scope.gauge(name));
168+
}
169+
170+
@Override
171+
public Timer timer(String name) {
172+
return new ReplayAwareTimer(scope.timer(name));
173+
}
174+
175+
@Override
176+
public Histogram histogram(String name, Buckets buckets) {
177+
return new ReplayAwareHistogram(scope.histogram(name, buckets));
178+
}
179+
180+
@Override
181+
public Scope tagged(Map<String, String> tags) {
182+
return new ReplayAwareScope(scope.tagged(tags), context, clock);
183+
}
184+
185+
@Override
186+
public Scope subScope(String name) {
187+
return new ReplayAwareScope(scope.subScope(name), context, clock);
188+
}
189+
190+
@Override
191+
public Capabilities capabilities() {
192+
return scope.capabilities();
193+
}
194+
195+
@Override
196+
public void close() throws ScopeCloseException {
197+
scope.close();
198+
}
199+
}

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.WorkflowType;
2222
import com.uber.cadence.workflow.Promise;
23+
import com.uber.m3.tally.Scope;
2324
import java.time.Duration;
2425
import java.util.function.BiConsumer;
2526
import java.util.function.Consumer;
@@ -28,7 +29,7 @@
2829
* Represents the context for decider. Should only be used within the scope of workflow definition
2930
* code, meaning any code which is not part of activity implementations.
3031
*/
31-
public interface DecisionContext {
32+
public interface DecisionContext extends ReplayAware {
3233

3334
WorkflowExecution getWorkflowExecution();
3435

@@ -101,13 +102,6 @@ Consumer<Exception> signalWorkflowExecution(
101102
*/
102103
long currentTimeMillis();
103104

104-
/**
105-
* <code>true</code> indicates if workflow is replaying already processed events to reconstruct it
106-
* state. <code>false</code> indicates that code is making forward process for the first time. For
107-
* example can be used to avoid duplicating log records due to replay.
108-
*/
109-
boolean isReplaying();
110-
111105
/**
112106
* Create a Value that becomes ready after the specified delay.
113107
*
@@ -117,4 +111,6 @@ Consumer<Exception> signalWorkflowExecution(
117111
* @return cancellation handle. Invoke {@link Consumer#accept(Object)} to cancel timer.
118112
*/
119113
Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback);
114+
115+
Scope getMetricsScope();
120116
}

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import com.uber.cadence.WorkflowExecution;
2525
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2626
import com.uber.cadence.WorkflowType;
27+
import com.uber.cadence.internal.metrics.ReplayAwareScope;
2728
import com.uber.cadence.workflow.Promise;
2829
import com.uber.cadence.workflow.Workflow;
30+
import com.uber.m3.tally.Scope;
2931
import java.time.Duration;
3032
import java.util.function.BiConsumer;
3133
import java.util.function.Consumer;
@@ -40,6 +42,8 @@ final class DecisionContextImpl implements DecisionContext, HistoryEventHandler
4042

4143
private final WorkflowContext workflowContext;
4244

45+
private Scope metricsScope;
46+
4347
DecisionContextImpl(
4448
DecisionsHelper decisionsHelper,
4549
String domain,
@@ -51,6 +55,15 @@ final class DecisionContextImpl implements DecisionContext, HistoryEventHandler
5155
this.workflowClock = new ClockDecisionContext(decisionsHelper);
5256
}
5357

58+
public void setMetricsScope(Scope metricsScope) {
59+
this.metricsScope = new ReplayAwareScope(metricsScope, this, workflowClock::currentTimeMillis);
60+
}
61+
62+
@Override
63+
public Scope getMetricsScope() {
64+
return metricsScope;
65+
}
66+
5467
@Override
5568
public WorkflowExecution getWorkflowExecution() {
5669
return workflowContext.getWorkflowExecution();
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.replay;
19+
20+
public interface ReplayAware {
21+
/**
22+
* <code>true</code> indicates if workflow is replaying already processed events to reconstruct it
23+
* state. <code>false</code> indicates that code is making forward process for the first time. For
24+
* example can be used to avoid duplicating log records due to replay.
25+
*/
26+
boolean isReplaying();
27+
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class ReplayDecider {
8888
domain,
8989
decisionTask,
9090
historyHelper.getWorkflowExecutionStartedEventAttributes());
91+
context.setMetricsScope(metricsScope);
9192
}
9293

9394
public boolean isCancelRequested() {

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
2828
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
2929
import com.uber.cadence.workflow.Promise;
30+
import com.uber.m3.tally.Scope;
3031
import java.time.Duration;
3132
import java.util.ArrayDeque;
3233
import java.util.ArrayList;
@@ -516,5 +517,10 @@ public boolean isReplaying() {
516517
public Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback) {
517518
throw new UnsupportedOperationException("not implemented");
518519
}
520+
521+
@Override
522+
public Scope getMetricsScope() {
523+
throw new UnsupportedOperationException("not implemented");
524+
}
519525
}
520526
}

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void setDataConverter(DataConverter dataConverter) {
176176
}
177177

178178
@Override
179-
public ReplayWorkflow getWorkflow(WorkflowType workflowType) throws Exception {
179+
public ReplayWorkflow getWorkflow(WorkflowType workflowType) {
180180
SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType);
181181
return new SyncWorkflow(workflow, dataConverter, threadPool, interceptorFactory);
182182
}

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.uber.cadence.workflow.SignalExternalWorkflowException;
4848
import com.uber.cadence.workflow.Workflow;
4949
import com.uber.cadence.workflow.WorkflowInterceptor;
50+
import com.uber.m3.tally.Scope;
5051
import java.time.Duration;
5152
import java.util.HashMap;
5253
import java.util.Map;
@@ -479,4 +480,8 @@ private RuntimeException mapSignalWorkflowException(Exception failure) {
479480
}
480481
return (SignalExternalWorkflowException) failure;
481482
}
483+
484+
public Scope getMetricsScope() {
485+
return context.getMetricsScope();
486+
}
482487
}

0 commit comments

Comments
 (0)