Skip to content

Commit 56ba518

Browse files
scwhittleparveensania
authored andcommitted
[Java Dataflow Streaming] Add support to AbstractWindmillStream to transition between physical streams within the same logical stream (apache#35523)
* fix stuck get data if no current stream * improve html
1 parent e9d9e88 commit 56ba518

File tree

15 files changed

+2118
-210
lines changed

15 files changed

+2118
-210
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

Lines changed: 213 additions & 101 deletions
Large diffs are not rendered by default.

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@ public interface WindmillStream {
6464
interface GetWorkStream extends WindmillStream {
6565
/** Adjusts the {@link GetWorkBudget} for the stream. */
6666
void setBudget(GetWorkBudget newBudget);
67-
68-
default void setBudget(long newItems, long newBytes) {
69-
setBudget(GetWorkBudget.builder().setItems(newItems).setBytes(newBytes).build());
70-
}
7167
}
7268

7369
/** Interface for streaming GetDataRequests to Windmill. */

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222

2323
import com.google.auto.value.AutoValue;
2424
import java.io.PrintWriter;
25+
import java.time.Duration;
2526
import java.util.HashMap;
2627
import java.util.Iterator;
2728
import java.util.Map;
2829
import java.util.Set;
2930
import java.util.concurrent.ConcurrentHashMap;
3031
import java.util.concurrent.ConcurrentMap;
32+
import java.util.concurrent.ScheduledExecutorService;
3133
import java.util.concurrent.atomic.AtomicLong;
3234
import java.util.function.Consumer;
3335
import java.util.function.Function;
@@ -85,16 +87,19 @@ private GrpcCommitWorkStream(
8587
int logEveryNStreamFailures,
8688
JobHeader jobHeader,
8789
AtomicLong idGenerator,
88-
int streamingRpcBatchLimit) {
90+
int streamingRpcBatchLimit,
91+
Duration halfClosePhysicalStreamAfter,
92+
ScheduledExecutorService executor) {
8993
super(
9094
LOG,
91-
"CommitWorkStream",
9295
startCommitWorkRpcFn,
9396
backoff,
9497
streamObserverFactory,
9598
streamRegistry,
9699
logEveryNStreamFailures,
97-
backendWorkerToken);
100+
backendWorkerToken,
101+
halfClosePhysicalStreamAfter,
102+
executor);
98103
this.idGenerator = idGenerator;
99104
this.jobHeader = jobHeader;
100105
this.streamingRpcBatchLimit = streamingRpcBatchLimit;
@@ -110,7 +115,9 @@ static GrpcCommitWorkStream create(
110115
int logEveryNStreamFailures,
111116
JobHeader jobHeader,
112117
AtomicLong idGenerator,
113-
int streamingRpcBatchLimit) {
118+
int streamingRpcBatchLimit,
119+
Duration halfClosePhysicalStreamAfter,
120+
ScheduledExecutorService executor) {
114121
return new GrpcCommitWorkStream(
115122
backendWorkerToken,
116123
startCommitWorkRpcFn,
@@ -120,25 +127,33 @@ static GrpcCommitWorkStream create(
120127
logEveryNStreamFailures,
121128
jobHeader,
122129
idGenerator,
123-
streamingRpcBatchLimit);
130+
streamingRpcBatchLimit,
131+
halfClosePhysicalStreamAfter,
132+
executor);
124133
}
125134

126135
@Override
127136
public void appendSpecificHtml(PrintWriter writer) {
128-
writer.format("CommitWorkStream: %d pending", pending.size());
137+
writer.format("CommitWorkStream: %d pending ", pending.size());
129138
}
130139

131140
@Override
132-
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
133-
trySend(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
141+
@SuppressWarnings("ReferenceEquality")
142+
protected synchronized void onFlushPending(boolean isNewStream)
143+
throws WindmillStreamShutdownException {
144+
if (isNewStream) {
145+
trySend(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
146+
}
134147
// Flush all pending requests that are no longer on active streams.
135148
try (Batcher resendBatcher = new Batcher()) {
136149
for (Map.Entry<Long, StreamAndRequest> entry : pending.entrySet()) {
137150
CommitWorkPhysicalStreamHandler requestHandler = entry.getValue().handler;
138151
checkState(requestHandler != currentPhysicalStream);
139-
// When we have streams closing in the background we should avoid retrying the requests
140-
// active on those streams.
141-
152+
if (requestHandler != null && closingPhysicalStreams.contains(requestHandler)) {
153+
LOG.debug(
154+
"Not resending request that is active on background half-closing physical stream.");
155+
continue;
156+
}
142157
long id = entry.getKey();
143158
PendingRequest request = entry.getValue().request;
144159
if (!resendBatcher.canAccept(request.getBytes())) {
@@ -169,6 +184,7 @@ protected synchronized void sendHealthCheck() throws WindmillStreamShutdownExcep
169184

170185
private class CommitWorkPhysicalStreamHandler extends PhysicalStreamHandler {
171186
@Override
187+
@SuppressWarnings("ReferenceEquality")
172188
public void onResponse(StreamingCommitResponse response) {
173189
CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler();
174190
for (int i = 0; i < response.getRequestIdCount(); ++i) {
@@ -206,6 +222,7 @@ public void onResponse(StreamingCommitResponse response) {
206222
}
207223

208224
@Override
225+
@SuppressWarnings("ReferenceEquality")
209226
public boolean hasPendingRequests() {
210227
return pending.entrySet().stream().anyMatch(e -> e.getValue().handler == this);
211228
}
@@ -218,6 +235,7 @@ public void onDone(Status status) {
218235
}
219236

220237
@Override
238+
@SuppressWarnings("ReferenceEquality")
221239
public void appendHtml(PrintWriter writer) {
222240
writer.format(
223241
"CommitWorkStream: %d pending",

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2121

2222
import java.io.PrintWriter;
23+
import java.time.Duration;
2324
import java.util.Set;
2425
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.concurrent.ConcurrentMap;
27+
import java.util.concurrent.ScheduledExecutorService;
2628
import java.util.concurrent.atomic.AtomicReference;
2729
import java.util.function.Function;
2830
import javax.annotation.concurrent.GuardedBy;
@@ -98,16 +100,19 @@ private GrpcDirectGetWorkStream(
98100
HeartbeatSender heartbeatSender,
99101
GetDataClient getDataClient,
100102
WorkCommitter workCommitter,
101-
WorkItemScheduler workItemScheduler) {
103+
WorkItemScheduler workItemScheduler,
104+
Duration halfClosePhysicalStreamAfter,
105+
ScheduledExecutorService executorService) {
102106
super(
103107
LOG,
104-
"GetWorkStream",
105108
startGetWorkRpcFn,
106109
backoff,
107110
streamObserverFactory,
108111
streamRegistry,
109112
logEveryNStreamFailures,
110-
backendWorkerToken);
113+
backendWorkerToken,
114+
halfClosePhysicalStreamAfter,
115+
executorService);
111116
this.requestHeader = requestHeader;
112117
this.workItemScheduler = workItemScheduler;
113118
this.heartbeatSender = heartbeatSender;
@@ -138,7 +143,9 @@ static GrpcDirectGetWorkStream create(
138143
HeartbeatSender heartbeatSender,
139144
GetDataClient getDataClient,
140145
WorkCommitter workCommitter,
141-
WorkItemScheduler workItemScheduler) {
146+
WorkItemScheduler workItemScheduler,
147+
Duration halfClosePhysicalStreamAfter,
148+
ScheduledExecutorService executor) {
142149
return new GrpcDirectGetWorkStream(
143150
backendWorkerToken,
144151
startGetWorkRpcFn,
@@ -151,7 +158,9 @@ static GrpcDirectGetWorkStream create(
151158
heartbeatSender,
152159
getDataClient,
153160
workCommitter,
154-
workItemScheduler);
161+
workItemScheduler,
162+
halfClosePhysicalStreamAfter,
163+
executor);
155164
}
156165

157166
private static Watermarks createWatermarks(
@@ -230,7 +239,11 @@ protected PhysicalStreamHandler newResponseHandler() {
230239
}
231240

232241
@Override
233-
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
242+
protected synchronized void onFlushPending(boolean isNewStream)
243+
throws WindmillStreamShutdownException {
244+
if (!isNewStream) {
245+
return;
246+
}
234247
budgetTracker.reset();
235248
GetWorkBudget initialGetWorkBudget = budgetTracker.computeBudgetExtension();
236249
StreamingGetWorkRequest request =

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.CancellationException;
3434
import java.util.concurrent.ConcurrentHashMap;
3535
import java.util.concurrent.ConcurrentLinkedDeque;
36+
import java.util.concurrent.ScheduledExecutorService;
3637
import java.util.concurrent.atomic.AtomicLong;
3738
import java.util.function.Consumer;
3839
import java.util.function.Function;
@@ -112,16 +113,19 @@ private GrpcGetDataStream(
112113
AtomicLong idGenerator,
113114
int streamingRpcBatchLimit,
114115
boolean sendKeyedGetDataRequests,
115-
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
116+
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses,
117+
java.time.Duration halfClosePhysicalStreamAfter,
118+
ScheduledExecutorService executorService) {
116119
super(
117120
LOG,
118-
"GetDataStream",
119121
startGetDataRpcFn,
120122
backoff,
121123
streamObserverFactory,
122124
streamRegistry,
123125
logEveryNStreamFailures,
124-
backendWorkerToken);
126+
backendWorkerToken,
127+
halfClosePhysicalStreamAfter,
128+
executorService);
125129
this.idGenerator = idGenerator;
126130
this.jobHeader = jobHeader;
127131
this.streamingRpcBatchLimit = streamingRpcBatchLimit;
@@ -146,7 +150,9 @@ static GrpcGetDataStream create(
146150
AtomicLong idGenerator,
147151
int streamingRpcBatchLimit,
148152
boolean sendKeyedGetDataRequests,
149-
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
153+
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses,
154+
java.time.Duration halfClosePhysicalStreamAfter,
155+
ScheduledExecutorService executor) {
150156
return new GrpcGetDataStream(
151157
backendWorkerToken,
152158
startGetDataRpcFn,
@@ -158,7 +164,9 @@ static GrpcGetDataStream create(
158164
idGenerator,
159165
streamingRpcBatchLimit,
160166
sendKeyedGetDataRequests,
161-
processHeartbeatResponses);
167+
processHeartbeatResponses,
168+
halfClosePhysicalStreamAfter,
169+
executor);
162170
}
163171

164172
private static WindmillStreamShutdownException shutdownExceptionFor(QueuedBatch batch) {
@@ -189,7 +197,7 @@ public void sendBatch(QueuedBatch batch) throws WindmillStreamShutdownException
189197
}
190198

191199
if (!trySend(batch.asGetDataRequest())) {
192-
// The stream broke before this call went through; onNewStream will retry the fetch.
200+
// The stream broke before this call went through; onFlushPending will retry the fetch.
193201
LOG.debug("GetData stream broke before call started.");
194202
}
195203
}
@@ -260,8 +268,11 @@ protected PhysicalStreamHandler newResponseHandler() {
260268
}
261269

262270
@Override
263-
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
264-
trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
271+
protected synchronized void onFlushPending(boolean isNewStream)
272+
throws WindmillStreamShutdownException {
273+
if (isNewStream) {
274+
trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
275+
}
265276
while (!batches.isEmpty()) {
266277
QueuedBatch batch = checkNotNull(batches.peekFirst());
267278
verify(!batch.isEmpty());
@@ -392,6 +403,12 @@ protected synchronized void shutdownInternal() {
392403
}
393404
currentGetDataStream.pending.clear();
394405
}
406+
for (PhysicalStreamHandler handler : closingPhysicalStreams) {
407+
for (AppendableInputStream ais : ((GetDataPhysicalStreamHandler) handler).pending.values()) {
408+
ais.cancel();
409+
}
410+
((GetDataPhysicalStreamHandler) handler).pending.clear();
411+
}
395412
batches.forEach(
396413
batch -> {
397414
batch.markFinalized();
@@ -402,7 +419,12 @@ protected synchronized void shutdownInternal() {
402419

403420
@Override
404421
public void appendSpecificHtml(PrintWriter writer) {
405-
writer.format("GetDataStream: %d queued batches", batchesDebugSizeSupplier.get());
422+
int batches = batchesDebugSizeSupplier.get();
423+
if (batches > 0) {
424+
writer.format("GetDataStream: %d queued batches ", batches);
425+
} else {
426+
writer.append("GetDataStream: no queued batches ");
427+
}
406428
}
407429

408430
private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn)
@@ -476,10 +498,11 @@ private void queueRequestAndWait(QueuedRequest request)
476498
prevBatch.waitForSendOrFailNotification();
477499
}
478500
trySendBatch(batch);
479-
} else {
480-
// Wait for this batch to be sent before parsing the response.
481-
batch.waitForSendOrFailNotification();
501+
// Since the above send may not succeed, we fall through to block on sending or failure.
482502
}
503+
504+
// Wait for this batch to be sent before parsing the response.
505+
batch.waitForSendOrFailNotification();
483506
}
484507

485508
private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException {
@@ -494,8 +517,8 @@ private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamS
494517
final @Nullable GetDataPhysicalStreamHandler currentGetDataPhysicalStream =
495518
(GetDataPhysicalStreamHandler) currentPhysicalStream;
496519
if (currentGetDataPhysicalStream == null) {
497-
// Leave the batch finalized but in the batches queue. Finalized batches will be sent on the
498-
// new stream in onNewStream.
520+
// Leave the batch finalized but in the batches queue. Finalized batches will be sent on a
521+
// new stream in onFlushPending.
499522
return;
500523
}
501524

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
1919

2020
import java.io.PrintWriter;
21+
import java.time.Duration;
2122
import java.util.Set;
2223
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.ScheduledExecutorService;
2325
import java.util.concurrent.atomic.AtomicLong;
2426
import java.util.function.Function;
2527
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
@@ -68,16 +70,19 @@ private GrpcGetWorkStream(
6870
Set<AbstractWindmillStream<?, ?>> streamRegistry,
6971
int logEveryNStreamFailures,
7072
boolean requestBatchedGetWorkResponse,
71-
WorkItemReceiver receiver) {
73+
WorkItemReceiver receiver,
74+
Duration halfClosePhysicalStreamAfter,
75+
ScheduledExecutorService executor) {
7276
super(
7377
LOG,
74-
"GetWorkStream",
7578
startGetWorkRpcFn,
7679
backoff,
7780
streamObserverFactory,
7881
streamRegistry,
7982
logEveryNStreamFailures,
80-
backendWorkerToken);
83+
backendWorkerToken,
84+
halfClosePhysicalStreamAfter,
85+
executor);
8186
this.request = request;
8287
this.receiver = receiver;
8388
this.inflightMessages = new AtomicLong();
@@ -97,7 +102,9 @@ public static GrpcGetWorkStream create(
97102
Set<AbstractWindmillStream<?, ?>> streamRegistry,
98103
int logEveryNStreamFailures,
99104
boolean requestBatchedGetWorkResponse,
100-
WorkItemReceiver receiver) {
105+
WorkItemReceiver receiver,
106+
Duration halfClosePhysicalStreamAfter,
107+
ScheduledExecutorService executor) {
101108
return new GrpcGetWorkStream(
102109
backendWorkerToken,
103110
startGetWorkRpcFn,
@@ -107,7 +114,9 @@ public static GrpcGetWorkStream create(
107114
streamRegistry,
108115
logEveryNStreamFailures,
109116
requestBatchedGetWorkResponse,
110-
receiver);
117+
receiver,
118+
halfClosePhysicalStreamAfter,
119+
executor);
111120
}
112121

113122
private void sendRequestExtension(long moreItems, long moreBytes) {
@@ -163,7 +172,11 @@ protected PhysicalStreamHandler newResponseHandler() {
163172
}
164173

165174
@Override
166-
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
175+
protected synchronized void onFlushPending(boolean isNewStream)
176+
throws WindmillStreamShutdownException {
177+
if (!isNewStream) {
178+
return;
179+
}
167180
inflightMessages.set(request.getMaxItems());
168181
inflightBytes.set(request.getMaxBytes());
169182
trySend(

0 commit comments

Comments
 (0)