Skip to content

Commit 6f84971

Browse files
author
Liang Mei
committed
Use workflow clock for timer metrics calculation
1 parent 3893ef9 commit 6f84971

File tree

6 files changed

+153
-37
lines changed

6 files changed

+153
-37
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ dependencies {
4242
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
4343
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
4444
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
45-
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.0'
45+
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.1'
4646
testCompile group: 'junit', name: 'junit', version: '4.12'
4747
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
4848
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

src/main/java/com/uber/cadence/internal/metrics/ReplayAwareScope.java

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,30 @@
2626
import com.uber.m3.tally.Scope;
2727
import com.uber.m3.tally.ScopeCloseException;
2828
import com.uber.m3.tally.Stopwatch;
29+
import com.uber.m3.tally.StopwatchRecorder;
2930
import com.uber.m3.tally.Timer;
3031
import com.uber.m3.util.Duration;
3132
import java.util.Map;
3233
import java.util.Objects;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.function.Supplier;
3336

3437
public class ReplayAwareScope implements Scope {
35-
Scope scope;
36-
ReplayAware context;
38+
private Scope scope;
39+
private ReplayAware context;
40+
private Supplier<Long> clock;
3741

38-
public ReplayAwareScope(Scope scope, ReplayAware context) {
42+
public ReplayAwareScope(Scope scope, ReplayAware context, Supplier<Long> clock) {
3943
this.scope = Objects.requireNonNull(scope);
4044
this.context = Objects.requireNonNull(context);
45+
this.clock = Objects.requireNonNull(clock);
4146
}
4247

43-
private static class ReplayAwareCounter implements Counter {
48+
private class ReplayAwareCounter implements Counter {
4449
Counter counter;
45-
ReplayAware context;
4650

47-
public ReplayAwareCounter(Counter counter, ReplayAware context) {
51+
ReplayAwareCounter(Counter counter) {
4852
this.counter = Objects.requireNonNull(counter);
49-
this.context = Objects.requireNonNull(context);
5053
}
5154

5255
@Override
@@ -59,13 +62,11 @@ public void inc(long delta) {
5962
}
6063
}
6164

62-
private static class ReplayAwareGauge implements Gauge {
65+
private class ReplayAwareGauge implements Gauge {
6366
Gauge gauge;
64-
ReplayAware context;
6567

66-
public ReplayAwareGauge(Gauge gauge, ReplayAware context) {
68+
ReplayAwareGauge(Gauge gauge) {
6769
this.gauge = Objects.requireNonNull(gauge);
68-
this.context = Objects.requireNonNull(context);
6970
}
7071

7172
@Override
@@ -78,13 +79,11 @@ public void update(double value) {
7879
}
7980
}
8081

81-
private static class ReplayAwareTimer implements Timer {
82+
private class ReplayAwareTimer implements Timer, DurationRecorder {
8283
Timer timer;
83-
ReplayAware context;
8484

85-
public ReplayAwareTimer(Timer timer, ReplayAware context) {
85+
ReplayAwareTimer(Timer timer) {
8686
this.timer = Objects.requireNonNull(timer);
87-
this.context = Objects.requireNonNull(context);
8887
}
8988

9089
@Override
@@ -98,20 +97,39 @@ public void record(Duration interval) {
9897

9998
@Override
10099
public Stopwatch start() {
101-
// TODO: Stopwatch is a concrete class with non-public constructor so there's no
102-
// way to extend it. Need to fix this when the following issue is resolved.
103-
// https://github.com/uber-java/tally/issues/26
104-
return timer.start();
100+
long startNanos = TimeUnit.MILLISECONDS.toNanos(clock.get());
101+
return new Stopwatch(startNanos, new ReplayAwareStopwatchRecorder(this));
102+
}
103+
104+
@Override
105+
public void recordDuration(Duration interval) {
106+
record(interval);
107+
}
108+
}
109+
110+
interface DurationRecorder {
111+
void recordDuration(Duration interval);
112+
}
113+
114+
private class ReplayAwareStopwatchRecorder implements StopwatchRecorder {
115+
DurationRecorder recorder;
116+
117+
ReplayAwareStopwatchRecorder(DurationRecorder recorder) {
118+
this.recorder = recorder;
119+
}
120+
121+
@Override
122+
public void recordStopwatch(long startNanos) {
123+
long endNanos = TimeUnit.MILLISECONDS.toNanos(clock.get());
124+
recorder.recordDuration(Duration.between(startNanos, endNanos));
105125
}
106126
}
107127

108-
private static class ReplayAwareHistogram implements Histogram {
128+
private class ReplayAwareHistogram implements Histogram, DurationRecorder {
109129
Histogram histogram;
110-
ReplayAware context;
111130

112-
public ReplayAwareHistogram(Histogram histogram, ReplayAware context) {
131+
ReplayAwareHistogram(Histogram histogram) {
113132
this.histogram = Objects.requireNonNull(histogram);
114-
this.context = Objects.requireNonNull(context);
115133
}
116134

117135
@Override
@@ -134,41 +152,39 @@ public void recordDuration(Duration value) {
134152

135153
@Override
136154
public Stopwatch start() {
137-
// TODO: Stopwatch is a concrete class with non-public constructor so there's no
138-
// way to extend it. Need to fix this when the following issue is resolved.
139-
// https://github.com/uber-java/tally/issues/26
140-
return histogram.start();
155+
long startNanos = TimeUnit.MILLISECONDS.toNanos(clock.get());
156+
return new Stopwatch(startNanos, new ReplayAwareStopwatchRecorder(this));
141157
}
142158
}
143159

144160
@Override
145161
public Counter counter(String name) {
146-
return new ReplayAwareCounter(scope.counter(name), context);
162+
return new ReplayAwareCounter(scope.counter(name));
147163
}
148164

149165
@Override
150166
public Gauge gauge(String name) {
151-
return new ReplayAwareGauge(scope.gauge(name), context);
167+
return new ReplayAwareGauge(scope.gauge(name));
152168
}
153169

154170
@Override
155171
public Timer timer(String name) {
156-
return new ReplayAwareTimer(scope.timer(name), context);
172+
return new ReplayAwareTimer(scope.timer(name));
157173
}
158174

159175
@Override
160176
public Histogram histogram(String name, Buckets buckets) {
161-
return new ReplayAwareHistogram(scope.histogram(name, buckets), context);
177+
return new ReplayAwareHistogram(scope.histogram(name, buckets));
162178
}
163179

164180
@Override
165181
public Scope tagged(Map<String, String> tags) {
166-
return new ReplayAwareScope(scope.tagged(tags), context);
182+
return new ReplayAwareScope(scope.tagged(tags), context, clock);
167183
}
168184

169185
@Override
170186
public Scope subScope(String name) {
171-
return new ReplayAwareScope(scope.subScope(name), context);
187+
return new ReplayAwareScope(scope.subScope(name), context, clock);
172188
}
173189

174190
@Override

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ final class DecisionContextImpl implements DecisionContext, HistoryEventHandler
5656
}
5757

5858
public void setMetricsScope(Scope metricsScope) {
59-
this.metricsScope = new ReplayAwareScope(metricsScope, this);
59+
this.metricsScope = new ReplayAwareScope(metricsScope, this, workflowClock::currentTimeMillis);
6060
}
6161

6262
@Override

src/main/java/com/uber/cadence/workflow/Workflow.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.uber.cadence.activity.ActivityOptions;
2222
import com.uber.cadence.common.RetryOptions;
2323
import com.uber.cadence.internal.sync.WorkflowInternal;
24+
import com.uber.cadence.worker.WorkerOptions;
2425
import com.uber.cadence.workflow.Functions.Func;
2526
import com.uber.cadence.workflow.Functions.Func1;
2627
import com.uber.cadence.workflow.Functions.Func2;
@@ -709,6 +710,12 @@ public static boolean isReplaying() {
709710
return WorkflowInternal.isReplaying();
710711
}
711712

713+
/**
714+
* Get scope for reporting business metrics in workflow logic. This should be used instead of
715+
* creating new metrics scopes as it is able to dedup metrics during replay.
716+
*
717+
* <p>The original metrics scope is set through {@link WorkerOptions} when a worker starts up.
718+
*/
712719
public static Scope getMetricsScope() {
713720
return WorkflowInternal.getMetricsScope();
714721
}

src/test/java/com/uber/cadence/metrics/ReplayAwareScopeTest.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
118
package com.uber.cadence.metrics;
219

320
import static org.mockito.Mockito.mock;
@@ -13,9 +30,11 @@
1330
import com.uber.m3.tally.Gauge;
1431
import com.uber.m3.tally.Histogram;
1532
import com.uber.m3.tally.Scope;
33+
import com.uber.m3.tally.Stopwatch;
1634
import com.uber.m3.tally.Timer;
1735
import com.uber.m3.tally.ValueBuckets;
1836
import com.uber.m3.util.Duration;
37+
import java.util.function.Supplier;
1938
import org.junit.Test;
2039

2140
public class ReplayAwareScopeTest {
@@ -48,7 +67,7 @@ public void testReplayAwareScopeReplaying() {
4867
when(scope.histogram("test-histogram", buckets)).thenReturn(histogram);
4968

5069
TestContext context = new TestContext(true);
51-
Scope replayAwareScope = new ReplayAwareScope(scope, context);
70+
Scope replayAwareScope = new ReplayAwareScope(scope, context, System::currentTimeMillis);
5271

5372
replayAwareScope.counter("test-counter").inc(1);
5473
replayAwareScope.gauge("test-gauge").update(100.0);
@@ -78,7 +97,7 @@ public void testReplayAwareScopeNotReplaying() {
7897
when(scope.histogram("test-histogram", buckets)).thenReturn(histogram);
7998

8099
TestContext context = new TestContext(false);
81-
Scope replayAwareScope = new ReplayAwareScope(scope, context);
100+
Scope replayAwareScope = new ReplayAwareScope(scope, context, System::currentTimeMillis);
82101

83102
replayAwareScope.counter("test-counter").inc(1);
84103
replayAwareScope.gauge("test-gauge").update(100.0);
@@ -92,4 +111,43 @@ public void testReplayAwareScopeNotReplaying() {
92111
verify(histogram, times(1)).recordValue(10);
93112
verify(histogram, times(1)).recordDuration(Duration.ofHours(1));
94113
}
114+
115+
static class TestClock implements Supplier<Long> {
116+
private long currTime;
117+
118+
@Override
119+
public Long get() {
120+
return currTime;
121+
}
122+
123+
void setTime(long currTime) {
124+
this.currTime = currTime;
125+
}
126+
}
127+
128+
@Test
129+
public void testCustomClockForTimer() {
130+
Scope scope = mock(Scope.class);
131+
Timer timer = mock(Timer.class);
132+
Histogram histogram = mock(Histogram.class);
133+
134+
Buckets buckets = ValueBuckets.linear(0, 10, 10);
135+
when(scope.timer("test-timer")).thenReturn(timer);
136+
when(scope.histogram("test-histogram", buckets)).thenReturn(histogram);
137+
138+
TestContext context = new TestContext(false);
139+
TestClock clock = new TestClock();
140+
clock.setTime(0);
141+
Scope replayAwareScope = new ReplayAwareScope(scope, context, clock);
142+
Stopwatch sw = replayAwareScope.timer("test-timer").start();
143+
clock.setTime(100);
144+
sw.stop();
145+
146+
sw = replayAwareScope.histogram("test-histogram", buckets).start();
147+
clock.setTime(150);
148+
sw.stop();
149+
150+
verify(timer, times(1)).record(Duration.ofMillis(100));
151+
verify(histogram, times(1)).recordDuration(Duration.ofMillis(50));
152+
}
95153
}

src/test/java/com/uber/cadence/workflow/MetricsTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
118
package com.uber.cadence.workflow;
219

20+
import static org.junit.Assert.assertTrue;
21+
import static org.mockito.Matchers.any;
22+
import static org.mockito.Matchers.eq;
323
import static org.mockito.Mockito.mock;
424
import static org.mockito.Mockito.times;
525
import static org.mockito.Mockito.verify;
@@ -13,8 +33,10 @@
1333
import com.uber.m3.tally.RootScopeBuilder;
1434
import com.uber.m3.tally.Scope;
1535
import com.uber.m3.tally.StatsReporter;
36+
import com.uber.m3.tally.Stopwatch;
1637
import java.time.Duration;
1738
import org.junit.Test;
39+
import org.mockito.ArgumentCaptor;
1840

1941
public class MetricsTest {
2042
private static final String taskList = "metrics-test";
@@ -48,6 +70,11 @@ public static class TestMetricsInChildWorkflow implements TestChildWorkflow {
4870
@Override
4971
public void executeChild() {
5072
Workflow.getMetricsScope().counter("test-child-started").inc(1);
73+
74+
Stopwatch sw = Workflow.getMetricsScope().timer("test-timer").start();
75+
Workflow.sleep(3000);
76+
sw.stop();
77+
5178
Workflow.getMetricsScope().counter("test-child-done").inc(1);
5279
}
5380
}
@@ -83,5 +110,13 @@ public void testWorkflowMetrics() throws InterruptedException {
83110
verify(reporter, times(1)).reportCounter("test-done", null, 1);
84111
verify(reporter, times(1)).reportCounter("test-child-started", null, 1);
85112
verify(reporter, times(1)).reportCounter("test-child-done", null, 1);
113+
114+
ArgumentCaptor<com.uber.m3.util.Duration> sleepDurationCaptor =
115+
ArgumentCaptor.forClass(com.uber.m3.util.Duration.class);
116+
verify(reporter, times(1)).reportTimer(eq("test-timer"), any(), sleepDurationCaptor.capture());
117+
118+
com.uber.m3.util.Duration sleepDuration = sleepDurationCaptor.getValue();
119+
assertTrue(sleepDuration.compareTo(com.uber.m3.util.Duration.ofSeconds(3)) > 0);
120+
assertTrue(sleepDuration.compareTo(com.uber.m3.util.Duration.ofMillis(3100)) < 0);
86121
}
87122
}

0 commit comments

Comments
 (0)