Skip to content

Commit c34ba48

Browse files
committed
fix nullness, change retries
1 parent 3c53e12 commit c34ba48

File tree

8 files changed

+101
-56
lines changed

8 files changed

+101
-56
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,10 +467,15 @@ public void onCompleted() {
467467
}
468468
}
469469

470+
@SuppressWarnings("nullness")
471+
private void clearPhysicalStreamForDebug() {
472+
currentPhysicalStreamForDebug.set(null);
473+
}
474+
470475
private void onPhysicalStreamCompletion(Status status, PhysicalStreamHandler handler) {
471476
synchronized (this) {
472477
if (currentPhysicalStream == handler) {
473-
currentPhysicalStreamForDebug.set(null);
478+
clearPhysicalStreamForDebug();
474479
currentPhysicalStream = null;
475480
}
476481
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
4545
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
4646
import org.apache.beam.sdk.util.BackOff;
47-
import org.apache.beam.sdk.values.KV;
4847
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
4948
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
5049
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
@@ -60,8 +59,17 @@ final class GrpcCommitWorkStream
6059

6160
private static final long HEARTBEAT_REQUEST_ID = Long.MAX_VALUE;
6261

63-
private final ConcurrentMap<Long, KV<CommitWorkPhysicalStreamHandler, PendingRequest>> pending =
64-
new ConcurrentHashMap<>();
62+
private static class StreamAndRequest {
63+
StreamAndRequest(@Nullable CommitWorkPhysicalStreamHandler handler, PendingRequest request) {
64+
this.handler = handler;
65+
this.request = request;
66+
}
67+
68+
final @Nullable CommitWorkPhysicalStreamHandler handler;
69+
final PendingRequest request;
70+
}
71+
72+
private final ConcurrentMap<Long, StreamAndRequest> pending = new ConcurrentHashMap<>();
6573

6674
private final AtomicLong idGenerator;
6775
private final JobHeader jobHeader;
@@ -125,15 +133,14 @@ protected synchronized void onNewStream() throws WindmillStreamShutdownException
125133
trySend(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
126134
// Flush all pending requests that are no longer on active streams.
127135
try (Batcher resendBatcher = new Batcher()) {
128-
for (Map.Entry<Long, KV<CommitWorkPhysicalStreamHandler, PendingRequest>> entry :
129-
pending.entrySet()) {
130-
CommitWorkPhysicalStreamHandler requestHandler = entry.getValue().getKey();
136+
for (Map.Entry<Long, StreamAndRequest> entry : pending.entrySet()) {
137+
CommitWorkPhysicalStreamHandler requestHandler = entry.getValue().handler;
131138
checkState(requestHandler != currentPhysicalStream);
132139
// When we have streams closing in the background we should avoid retrying the requests
133140
// active on those streams.
134141

135142
long id = entry.getKey();
136-
PendingRequest request = entry.getValue().getValue();
143+
PendingRequest request = entry.getValue().request;
137144
if (!resendBatcher.canAccept(request.getBytes())) {
138145
resendBatcher.flush();
139146
}
@@ -153,7 +160,7 @@ public CommitWorkStream.RequestBatcher batcher() {
153160

154161
@Override
155162
protected synchronized void sendHealthCheck() throws WindmillStreamShutdownException {
156-
if (currentPhysicalStream.hasPendingRequests()) {
163+
if (currentPhysicalStream != null && currentPhysicalStream.hasPendingRequests()) {
157164
StreamingCommitWorkRequest.Builder builder = StreamingCommitWorkRequest.newBuilder();
158165
builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
159166
trySend(builder.build());
@@ -175,16 +182,15 @@ public void onResponse(StreamingCommitResponse response) {
175182
CommitStatus commitStatus =
176183
i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK;
177184

178-
@Nullable
179-
KV<CommitWorkPhysicalStreamHandler, PendingRequest> entry = pending.remove(requestId);
185+
@Nullable StreamAndRequest entry = pending.remove(requestId);
180186
if (entry == null) {
181187
LOG.error("Got unknown commit request ID: {}", requestId);
182188
continue;
183189
}
184-
if (entry.getKey() != this) {
190+
if (entry.handler != this) {
185191
LOG.error("Got commit request id {} on unexpected stream", requestId);
186192
}
187-
PendingRequest pendingRequest = entry.getValue();
193+
PendingRequest pendingRequest = entry.request;
188194
try {
189195
pendingRequest.completeWithStatus(commitStatus);
190196
} catch (RuntimeException e) {
@@ -202,7 +208,7 @@ public void onResponse(StreamingCommitResponse response) {
202208

203209
@Override
204210
public boolean hasPendingRequests() {
205-
return pending.entrySet().stream().anyMatch(e -> e.getValue().getKey() == this);
211+
return pending.entrySet().stream().anyMatch(e -> e.getValue().handler == this);
206212
}
207213

208214
@Override
@@ -216,7 +222,7 @@ public void onDone(Status status) {
216222
public void appendHtml(PrintWriter writer) {
217223
writer.format(
218224
"CommitWorkStream: %d pending",
219-
pending.entrySet().stream().filter(e -> e.getValue().getKey() == this).count());
225+
pending.entrySet().stream().filter(e -> e.getValue().handler == this).count());
220226
}
221227
}
222228

@@ -227,10 +233,9 @@ protected PhysicalStreamHandler newResponseHandler() {
227233

228234
@Override
229235
protected synchronized void shutdownInternal() {
230-
Iterator<KV<CommitWorkPhysicalStreamHandler, PendingRequest>> pendingRequests =
231-
pending.values().iterator();
236+
Iterator<StreamAndRequest> pendingRequests = pending.values().iterator();
232237
while (pendingRequests.hasNext()) {
233-
PendingRequest pendingRequest = pendingRequests.next().getValue();
238+
PendingRequest pendingRequest = pendingRequests.next().request;
234239
pendingRequest.abort();
235240
pendingRequests.remove();
236241
}
@@ -270,7 +275,9 @@ private void issueSingleRequest(long id, PendingRequest pendingRequest)
270275
pendingRequest.abort();
271276
}
272277
pending.put(
273-
id, KV.of((CommitWorkPhysicalStreamHandler) currentPhysicalStream, pendingRequest));
278+
id,
279+
new StreamAndRequest(
280+
(CommitWorkPhysicalStreamHandler) currentPhysicalStream, pendingRequest));
274281
trySend(chunk);
275282
}
276283
}
@@ -300,7 +307,8 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests)
300307
for (Map.Entry<Long, PendingRequest> entry : requests.entrySet()) {
301308
pending.put(
302309
entry.getKey(),
303-
KV.of((CommitWorkPhysicalStreamHandler) currentPhysicalStream, entry.getValue()));
310+
new StreamAndRequest(
311+
(CommitWorkPhysicalStreamHandler) currentPhysicalStream, entry.getValue()));
304312
}
305313
trySend(request);
306314
}
@@ -317,7 +325,9 @@ private void issueMultiChunkRequest(long id, PendingRequest pendingRequest)
317325
}
318326

319327
pending.put(
320-
id, KV.of((CommitWorkPhysicalStreamHandler) currentPhysicalStream, pendingRequest));
328+
id,
329+
new StreamAndRequest(
330+
(CommitWorkPhysicalStreamHandler) currentPhysicalStream, pendingRequest));
321331
for (int i = 0;
322332
i < serializedCommit.size();
323333
i += AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2122
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify;
2223
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verifyNotNull;
2324

@@ -202,10 +203,10 @@ public void onResponse(StreamingGetDataResponse chunk) {
202203
for (int i = 0; i < chunk.getRequestIdCount(); ++i) {
203204
long requestId = chunk.getRequestId(i);
204205
boolean completeResponse = chunk.getRemainingBytesForResponse() == 0;
205-
@Nullable
206206
AppendableInputStream responseStream =
207-
completeResponse ? pending.remove(requestId) : pending.get(requestId);
208-
verifyNotNull(responseStream, "No pending response stream");
207+
verifyNotNull(
208+
completeResponse ? pending.remove(requestId) : pending.get(requestId),
209+
"No pending response stream");
209210
responseStream.append(chunk.getSerializedResponse(i).newInput());
210211
if (completeResponse) {
211212
responseStream.complete();
@@ -262,23 +263,22 @@ protected PhysicalStreamHandler newResponseHandler() {
262263
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
263264
trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
264265
while (!batches.isEmpty()) {
265-
QueuedBatch batch = batches.peekFirst();
266+
QueuedBatch batch = checkNotNull(batches.peekFirst());
266267
verify(!batch.isEmpty());
267-
if (batch.isFinalized()) {
268-
try {
269-
((GetDataPhysicalStreamHandler) currentPhysicalStream).sendBatch(batch);
270-
verify(
271-
batch == batches.pollFirst(),
272-
"Sent GetDataStream request batch removed before send() was complete.");
273-
// Notify all waiters with requests in this batch as well as the sender
274-
// of the next batch (if one exists).
275-
batch.notifySent();
276-
} catch (Exception e) {
277-
LOG.debug("Batch failed to send on new stream", e);
278-
// Free waiters if the send() failed.
279-
batch.notifyFailed();
280-
throw e;
281-
}
268+
if (!batch.isFinalized()) break;
269+
try {
270+
verify(
271+
batch == batches.pollFirst(),
272+
"Sent GetDataStream request batch removed before send() was complete.");
273+
checkNotNull((GetDataPhysicalStreamHandler) currentPhysicalStream).sendBatch(batch);
274+
// Notify all waiters with requests in this batch as well as the sender
275+
// of the next batch (if one exists).
276+
batch.notifySent();
277+
} catch (Exception e) {
278+
LOG.debug("Batch failed to send on new stream", e);
279+
// Free waiters if the send() failed.
280+
batch.notifyFailed();
281+
throw e;
282282
}
283283
}
284284
}
@@ -375,7 +375,7 @@ public void onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
375375

376376
@Override
377377
protected synchronized void sendHealthCheck() throws WindmillStreamShutdownException {
378-
if (currentPhysicalStream.hasPendingRequests()) {
378+
if (currentPhysicalStream != null && currentPhysicalStream.hasPendingRequests()) {
379379
trySend(HEALTH_CHECK_REQUEST);
380380
}
381381
}
@@ -384,12 +384,13 @@ protected synchronized void sendHealthCheck() throws WindmillStreamShutdownExcep
384384
protected synchronized void shutdownInternal() {
385385
// Stream has been explicitly closed. Drain pending input streams and request batches.
386386
// Future calls to send RPCs will fail.
387-
if (currentPhysicalStream != null) {
388-
for (AppendableInputStream ais :
389-
((GetDataPhysicalStreamHandler) currentPhysicalStream).pending.values()) {
387+
final @Nullable GetDataPhysicalStreamHandler currentGetDataStream =
388+
(GetDataPhysicalStreamHandler) currentPhysicalStream;
389+
if (currentGetDataStream != null) {
390+
for (AppendableInputStream ais : currentGetDataStream.pending.values()) {
390391
ais.cancel();
391392
}
392-
((GetDataPhysicalStreamHandler) currentPhysicalStream).pending.clear();
393+
currentGetDataStream.pending.clear();
393394
}
394395
batches.forEach(
395396
batch -> {
@@ -492,19 +493,19 @@ private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamS
492493
batch.notifyFailed();
493494
throw shutdownExceptionFor(batch);
494495
}
495-
if (currentPhysicalStream == null) {
496+
final @Nullable GetDataPhysicalStreamHandler currentGetDataPhysicalStream =
497+
(GetDataPhysicalStreamHandler) currentPhysicalStream;
498+
if (currentGetDataPhysicalStream == null) {
496499
// Leave the batch finalized but in the batches queue. Finalized batches will be sent on the
497500
// new stream in onNewStream.
498501
return;
499502
}
500503

501504
try {
502505
verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send().");
506+
verify(batch == batches.pollFirst());
503507
verify(!batch.isEmpty());
504-
((GetDataPhysicalStreamHandler) currentPhysicalStream).sendBatch(batch);
505-
verify(
506-
batch == batches.pollFirst(),
507-
"Sent GetDataStream request batch removed before send() was complete.");
508+
currentGetDataPhysicalStream.sendBatch(batch);
508509
// Notify all waiters with requests in this batch as well as the sender
509510
// of the next batch (if one exists).
510511
batch.notifySent();

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ public class WindmillStreamPoolTest {
4343
private final ConcurrentHashMap<
4444
TestWindmillStream, WindmillStreamPool.StreamData<TestWindmillStream>>
4545
holds = new ConcurrentHashMap<>();
46-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
46+
47+
@Rule
48+
public transient Timeout globalTimeout =
49+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
50+
4751
private List<WindmillStreamPool.@Nullable StreamData<TestWindmillStream>> streams;
4852

4953
@Before

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.atomic.AtomicBoolean;
3233
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
3334
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -63,7 +64,11 @@ public class GrpcCommitWorkStreamTest {
6364

6465
@Rule public final ErrorCollector errorCollector = new ErrorCollector();
6566
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
66-
@Rule public transient Timeout globalTimeout = Timeout.seconds(60);
67+
68+
@Rule
69+
public transient Timeout globalTimeout =
70+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
71+
6772
private final FakeWindmillGrpcService fakeService = new FakeWindmillGrpcService(errorCollector);
6873
private ManagedChannel inProcessChannel;
6974
private Server inProcessServer;

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ public class GrpcDirectGetWorkStreamTest {
8181
private static final String FAKE_SERVER_NAME = "Fake server for GrpcDirectGetWorkStreamTest";
8282
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
8383
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
84-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
84+
85+
@Rule
86+
public transient Timeout globalTimeout =
87+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
88+
8589
private ManagedChannel inProcessChannel;
8690
private GrpcDirectGetWorkStream stream;
8791

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertNull;
2323
import static org.junit.Assert.assertThrows;
2424
import static org.junit.Assert.assertTrue;
25+
import static org.junit.Assert.fail;
2526

2627
import java.io.IOException;
2728
import java.util.List;
@@ -30,6 +31,7 @@
3031
import java.util.concurrent.ExecutionException;
3132
import java.util.concurrent.ExecutorService;
3233
import java.util.concurrent.Executors;
34+
import java.util.concurrent.TimeUnit;
3335
import java.util.stream.Collectors;
3436
import java.util.stream.IntStream;
3537
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
@@ -63,7 +65,11 @@ public class GrpcGetDataStreamTest {
6365

6466
@Rule public final ErrorCollector errorCollector = new ErrorCollector();
6567
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
66-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
68+
69+
@Rule
70+
public transient Timeout globalTimeout =
71+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
72+
6773
private final FakeWindmillGrpcService fakeService = new FakeWindmillGrpcService(errorCollector);
6874
private ManagedChannel inProcessChannel;
6975
private Server inProcessServer;
@@ -237,7 +243,13 @@ public void testRequestKeyedData_reconnectOnStreamError() throws InterruptedExce
237243
streamInfo.responseObserver.onError(new IOException("test error"));
238244

239245
streamInfo = waitForConnectionAndConsumeHeader();
240-
request = streamInfo.requests.take();
246+
while (true) {
247+
request = streamInfo.requests.poll(5, TimeUnit.SECONDS);
248+
if (request != null) break;
249+
if (sendFuture.isDone()) {
250+
fail("Unexpected send completion " + sendFuture);
251+
}
252+
}
241253
assertThat(request.getRequestIdList()).containsExactly(1L);
242254
assertEquals(keyedGetDataRequest, request.getStateRequest(0).getRequests(0));
243255

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ public class GrpcWindmillServerTest {
117117
private final long clientId = 10L;
118118
private final Set<ManagedChannel> openedChannels = new HashSet<>();
119119
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
120-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
120+
121+
@Rule
122+
public transient Timeout globalTimeout =
123+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
124+
121125
@Rule public GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
122126
@Rule public ErrorCollector errorCollector = new ErrorCollector();
123127
private Server server;

0 commit comments

Comments
 (0)