Skip to content

Commit 6426f72

Browse files
authored
track GetWork from WindmillWorker to UserWorker in proxyless path (#34413)
1 parent 64b29f6 commit 6426f72

File tree

3 files changed

+219
-124
lines changed

3 files changed

+219
-124
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkTimingInfosTracker.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.Map;
2424
import javax.annotation.Nullable;
25+
import javax.annotation.concurrent.NotThreadSafe;
2526
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo;
2627
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo.Event;
2728
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
@@ -33,6 +34,7 @@
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

37+
@NotThreadSafe
3638
final class GetWorkTimingInfosTracker {
3739
private static final Logger LOG = LoggerFactory.getLogger(GetWorkTimingInfosTracker.class);
3840

@@ -46,8 +48,8 @@ final class GetWorkTimingInfosTracker {
4648
this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
4749
this.clock = clock;
4850
this.workItemCreationEndTime = Instant.EPOCH;
49-
workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
50-
workItemCreationLatency = null;
51+
this.workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
52+
this.workItemCreationLatency = null;
5153
}
5254

5355
void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
@@ -69,8 +71,9 @@ void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
6971

7072
// Record the difference between starting to get work and the first chunk being sent as the
7173
// work creation time.
74+
@Nullable
7275
Instant workItemCreationStart = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
73-
Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
76+
@Nullable Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
7477
if (workItemCreationStart != null
7578
&& workItemCreationEnd != null
7679
&& workItemCreationLatency == null) {
@@ -90,39 +93,41 @@ void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
9093
Instant receivedByDispatcherTiming =
9194
getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
9295
if (workItemCreationEnd != null && receivedByDispatcherTiming != null) {
93-
Duration newDuration = new Duration(workItemCreationEnd, receivedByDispatcherTiming);
94-
aggregatedGetWorkStreamLatencies.compute(
96+
trackTimeInState(
9597
State.GET_WORK_IN_TRANSIT_TO_DISPATCHER,
96-
(stateKey, duration) -> {
97-
if (duration == null) {
98-
return new SumAndMaxDurations(newDuration, newDuration);
99-
}
100-
duration.max = newDuration.isLongerThan(duration.max) ? newDuration : duration.max;
101-
duration.sum = duration.sum.plus(newDuration);
102-
return duration;
103-
});
98+
new Duration(workItemCreationEnd, receivedByDispatcherTiming));
10499
}
105100

106-
// Record the latency of each chunk between send on dispatcher and arrival on worker.
101+
// Record the latency of each chunk between send on dispatcher or windmill worker and arrival on
102+
// the user worker.
103+
@Nullable
107104
Instant forwardedByDispatcherTiming =
108105
getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
109106
Instant now = Instant.ofEpochMilli(clock.getMillis());
110107
if (forwardedByDispatcherTiming != null && now.isAfter(forwardedByDispatcherTiming)) {
111-
Duration newDuration = new Duration(forwardedByDispatcherTiming, now);
112-
aggregatedGetWorkStreamLatencies.compute(
113-
State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
114-
(stateKey, duration) -> {
115-
if (duration == null) {
116-
return new SumAndMaxDurations(newDuration, newDuration);
117-
}
118-
duration.max = newDuration.isLongerThan(duration.max) ? newDuration : duration.max;
119-
duration.sum = duration.sum.plus(newDuration);
120-
return duration;
121-
});
108+
trackTimeInState(
109+
State.GET_WORK_IN_TRANSIT_TO_USER_WORKER, new Duration(forwardedByDispatcherTiming, now));
110+
} else if (workItemCreationEnd != null && now.isAfter(workItemCreationEnd)) {
111+
trackTimeInState(
112+
State.GET_WORK_IN_TRANSIT_TO_USER_WORKER, new Duration(workItemCreationEnd, now));
122113
}
114+
123115
workItemLastChunkReceivedByWorkerTime = now;
124116
}
125117

118+
private void trackTimeInState(LatencyAttribution.State state, Duration newDuration) {
119+
aggregatedGetWorkStreamLatencies.compute(
120+
state,
121+
(stateKey, duration) -> {
122+
if (duration == null) {
123+
return new SumAndMaxDurations(newDuration, newDuration);
124+
}
125+
duration.max = newDuration.isLongerThan(duration.max) ? newDuration : duration.max;
126+
duration.sum = duration.sum.plus(newDuration);
127+
return duration;
128+
});
129+
}
130+
126131
ImmutableList<LatencyAttribution> getLatencyAttributions() {
127132
if (workItemCreationLatency == null && aggregatedGetWorkStreamLatencies.isEmpty()) {
128133
return ImmutableList.of();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
19+
20+
import static java.util.stream.Collectors.toMap;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertNull;
23+
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.function.Function;
29+
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
30+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
31+
import org.junit.Test;
32+
import org.junit.runner.RunWith;
33+
import org.junit.runners.JUnit4;
34+
35+
@RunWith(JUnit4.class)
36+
public class GetWorkTimingInfosTrackerTest {
37+
38+
@Test
39+
public void testGetWorkTimingInfosTracker_calculatesTransitToUserWorkerTimeFromWindmillWorker() {
40+
GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
41+
List<Windmill.GetWorkStreamTimingInfo> infos = new ArrayList<>();
42+
for (int i = 0; i <= 3; i++) {
43+
infos.add(
44+
Windmill.GetWorkStreamTimingInfo.newBuilder()
45+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_CREATION_START)
46+
.setTimestampUsec(0)
47+
.build());
48+
infos.add(
49+
Windmill.GetWorkStreamTimingInfo.newBuilder()
50+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_CREATION_END)
51+
.setTimestampUsec(10000)
52+
.build());
53+
tracker.addTimingInfo(infos);
54+
infos.clear();
55+
}
56+
// durations for each chunk:
57+
// GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
58+
// GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
59+
ImmutableList<Windmill.LatencyAttribution> attributions = tracker.getLatencyAttributions();
60+
assertEquals(2, attributions.size());
61+
Map<Windmill.LatencyAttribution.State, Windmill.LatencyAttribution> latencies =
62+
attributions.stream()
63+
.collect(toMap(Windmill.LatencyAttribution::getState, Function.identity()));
64+
65+
assertEquals(
66+
10L,
67+
latencies
68+
.get(Windmill.LatencyAttribution.State.GET_WORK_IN_WINDMILL_WORKER)
69+
.getTotalDurationMillis());
70+
71+
assertEquals(
72+
// Elapsed time from 10 -> 50.
73+
40,
74+
latencies
75+
.get(Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
76+
.getTotalDurationMillis());
77+
}
78+
79+
@Test
80+
public void testGetWorkTimingInfosTracker_calculatesTransitToUserWorkerTimeFromDispatcher() {
81+
GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
82+
List<Windmill.GetWorkStreamTimingInfo> infos = new ArrayList<>();
83+
for (int i = 0; i <= 3; i++) {
84+
infos.add(
85+
Windmill.GetWorkStreamTimingInfo.newBuilder()
86+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_CREATION_START)
87+
.setTimestampUsec(0)
88+
.build());
89+
infos.add(
90+
Windmill.GetWorkStreamTimingInfo.newBuilder()
91+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_CREATION_END)
92+
.setTimestampUsec(10000)
93+
.build());
94+
infos.add(
95+
Windmill.GetWorkStreamTimingInfo.newBuilder()
96+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_RECEIVED_BY_DISPATCHER)
97+
.setTimestampUsec((i + 11) * 1000)
98+
.build());
99+
infos.add(
100+
Windmill.GetWorkStreamTimingInfo.newBuilder()
101+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_FORWARDED_BY_DISPATCHER)
102+
.setTimestampUsec((i + 16) * 1000)
103+
.build());
104+
tracker.addTimingInfo(infos);
105+
infos.clear();
106+
}
107+
// durations for each chunk:
108+
// GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
109+
// GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
110+
// GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
111+
Map<Windmill.LatencyAttribution.State, Windmill.LatencyAttribution> latencies = new HashMap<>();
112+
ImmutableList<Windmill.LatencyAttribution> attributions = tracker.getLatencyAttributions();
113+
assertEquals(3, attributions.size());
114+
for (Windmill.LatencyAttribution attribution : attributions) {
115+
latencies.put(attribution.getState(), attribution);
116+
}
117+
assertEquals(
118+
10L,
119+
latencies
120+
.get(Windmill.LatencyAttribution.State.GET_WORK_IN_WINDMILL_WORKER)
121+
.getTotalDurationMillis());
122+
// elapsed time from 10 -> 50;
123+
long elapsedTime = 40;
124+
// sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
125+
long sumDurations = 140;
126+
assertEquals(
127+
Math.min(4, (long) (elapsedTime * (10.0 / sumDurations))),
128+
latencies
129+
.get(Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_DISPATCHER)
130+
.getTotalDurationMillis());
131+
assertEquals(
132+
Math.min(34, (long) (elapsedTime * (130.0 / sumDurations))),
133+
latencies
134+
.get(Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_USER_WORKER)
135+
.getTotalDurationMillis());
136+
}
137+
138+
@Test
139+
public void testGetWorkTimingInfosTracker_clockSkew() {
140+
int skewMicros = 50 * 1000;
141+
GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
142+
List<Windmill.GetWorkStreamTimingInfo> infos = new ArrayList<>();
143+
for (int i = 0; i <= 3; i++) {
144+
infos.add(
145+
Windmill.GetWorkStreamTimingInfo.newBuilder()
146+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_CREATION_START)
147+
.setTimestampUsec(skewMicros)
148+
.build());
149+
infos.add(
150+
Windmill.GetWorkStreamTimingInfo.newBuilder()
151+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_CREATION_END)
152+
.setTimestampUsec(10000 + skewMicros)
153+
.build());
154+
infos.add(
155+
Windmill.GetWorkStreamTimingInfo.newBuilder()
156+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_RECEIVED_BY_DISPATCHER)
157+
.setTimestampUsec((i + 11) * 1000 + skewMicros)
158+
.build());
159+
infos.add(
160+
Windmill.GetWorkStreamTimingInfo.newBuilder()
161+
.setEvent(Windmill.GetWorkStreamTimingInfo.Event.GET_WORK_FORWARDED_BY_DISPATCHER)
162+
.setTimestampUsec((i + 16) * 1000 + skewMicros)
163+
.build());
164+
tracker.addTimingInfo(infos);
165+
infos.clear();
166+
}
167+
// durations for each chunk:
168+
// GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
169+
// GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
170+
// GET_WORK_IN_TRANSIT_TO_USER_WORKER: not observed due to skew
171+
Map<Windmill.LatencyAttribution.State, Windmill.LatencyAttribution> latencies = new HashMap<>();
172+
ImmutableList<Windmill.LatencyAttribution> attributions = tracker.getLatencyAttributions();
173+
assertEquals(2, attributions.size());
174+
for (Windmill.LatencyAttribution attribution : attributions) {
175+
latencies.put(attribution.getState(), attribution);
176+
}
177+
assertEquals(
178+
10L,
179+
latencies
180+
.get(Windmill.LatencyAttribution.State.GET_WORK_IN_WINDMILL_WORKER)
181+
.getTotalDurationMillis());
182+
assertEquals(
183+
4L,
184+
latencies
185+
.get(Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_DISPATCHER)
186+
.getTotalDurationMillis());
187+
assertNull(latencies.get(Windmill.LatencyAttribution.State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
188+
}
189+
}

0 commit comments

Comments
 (0)