Skip to content

Commit 36eff8c

Browse files
committed
fix retries of GetDataStream
1 parent bcdac59 commit 36eff8c

File tree

6 files changed

+30
-28
lines changed

6 files changed

+30
-28
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -261,26 +261,8 @@ protected PhysicalStreamHandler newResponseHandler() {
261261
@Override
262262
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
263263
trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
264-
while (!batches.isEmpty()) {
265-
QueuedBatch batch = batches.peekFirst();
266-
verify(!batch.isEmpty());
267-
if (batch.isFinalized()) {
268-
try {
269-
((GetDataPhysicalStreamHandler) currentPhysicalStream).sendBatch(batch);
270-
verify(
271-
batch == batches.pollFirst(),
272-
"Sent GetDataStream request batch removed before send() was complete.");
273-
// Notify all waiters with requests in this batch as well as the sender
274-
// of the next batch (if one exists).
275-
batch.notifySent();
276-
} catch (Exception e) {
277-
LOG.debug("Batch failed to send on new stream", e);
278-
// Free waiters if the send() failed.
279-
batch.notifyFailed();
280-
throw e;
281-
}
282-
}
283-
}
264+
// If there are any requests in batches, they will either complete being sent by the queuing
265+
// thread or will encounter an error due to their input stream cancelling and will retry.
284266
}
285267

286268
private long uniqueId() {
@@ -500,11 +482,9 @@ private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamS
500482

501483
try {
502484
verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send().");
485+
verify(batch == batches.pollFirst());
503486
verify(!batch.isEmpty());
504487
((GetDataPhysicalStreamHandler) currentPhysicalStream).sendBatch(batch);
505-
verify(
506-
batch == batches.pollFirst(),
507-
"Sent GetDataStream request batch removed before send() was complete.");
508488
// Notify all waiters with requests in this batch as well as the sender
509489
// of the next batch (if one exists).
510490
batch.notifySent();

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamPoolTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ public class WindmillStreamPoolTest {
4343
private final ConcurrentHashMap<
4444
TestWindmillStream, WindmillStreamPool.StreamData<TestWindmillStream>>
4545
holds = new ConcurrentHashMap<>();
46-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
46+
47+
@Rule
48+
public transient Timeout globalTimeout =
49+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
50+
4751
private List<WindmillStreamPool.@Nullable StreamData<TestWindmillStream>> streams;
4852

4953
@Before

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.atomic.AtomicBoolean;
3233
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
3334
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -63,7 +64,11 @@ public class GrpcCommitWorkStreamTest {
6364

6465
@Rule public final ErrorCollector errorCollector = new ErrorCollector();
6566
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
66-
@Rule public transient Timeout globalTimeout = Timeout.seconds(60);
67+
68+
@Rule
69+
public transient Timeout globalTimeout =
70+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
71+
6772
private final FakeWindmillGrpcService fakeService = new FakeWindmillGrpcService(errorCollector);
6873
private ManagedChannel inProcessChannel;
6974
private Server inProcessServer;

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ public class GrpcDirectGetWorkStreamTest {
8181
private static final String FAKE_SERVER_NAME = "Fake server for GrpcDirectGetWorkStreamTest";
8282
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
8383
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
84-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
84+
85+
@Rule
86+
public transient Timeout globalTimeout =
87+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
88+
8589
private ManagedChannel inProcessChannel;
8690
private GrpcDirectGetWorkStream stream;
8791

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.ExecutionException;
3131
import java.util.concurrent.ExecutorService;
3232
import java.util.concurrent.Executors;
33+
import java.util.concurrent.TimeUnit;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.IntStream;
3536
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
@@ -63,7 +64,11 @@ public class GrpcGetDataStreamTest {
6364

6465
@Rule public final ErrorCollector errorCollector = new ErrorCollector();
6566
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
66-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
67+
68+
@Rule
69+
public transient Timeout globalTimeout =
70+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
71+
6772
private final FakeWindmillGrpcService fakeService = new FakeWindmillGrpcService(errorCollector);
6873
private ManagedChannel inProcessChannel;
6974
private Server inProcessServer;

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ public class GrpcWindmillServerTest {
117117
private final long clientId = 10L;
118118
private final Set<ManagedChannel> openedChannels = new HashSet<>();
119119
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
120-
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
120+
121+
@Rule
122+
public transient Timeout globalTimeout =
123+
Timeout.builder().withTimeout(10, TimeUnit.MINUTES).withLookingForStuckThread(true).build();
124+
121125
@Rule public GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
122126
@Rule public ErrorCollector errorCollector = new ErrorCollector();
123127
private Server server;

0 commit comments

Comments
 (0)