Skip to content

Commit c53cd0d

Browse files
committed
address review comments
1 parent 67e3eac commit c53cd0d

File tree

3 files changed

+81
-9
lines changed

3 files changed

+81
-9
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,8 @@ public void onResponse(StreamingCommitResponse response) {
177177
continue;
178178
}
179179

180-
// From windmill.proto: Indices must line up with the request_id field, but trailing OKs
181-
// may be omitted.
180+
// From windmill.proto: Indices must line up with the request_id field, but trailing OKs may
181+
// be omitted.
182182
CommitStatus commitStatus =
183183
i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK;
184184

@@ -194,8 +194,7 @@ public void onResponse(StreamingCommitResponse response) {
194194
try {
195195
pendingRequest.completeWithStatus(commitStatus);
196196
} catch (RuntimeException e) {
197-
// Catch possible exceptions to ensure that an exception for one commit does not
198-
// prevent
197+
// Catch possible exceptions to ensure that an exception for one commit does not prevent
199198
// other commits from being processed. Aggregate all the failures to throw after
200199
// processing the response if they exist.
201200
LOG.warn("Exception while processing commit response.", e);
@@ -273,6 +272,7 @@ private void issueSingleRequest(long id, PendingRequest pendingRequest)
273272
synchronized (this) {
274273
if (isShutdown) {
275274
pendingRequest.abort();
275+
return;
276276
}
277277
pending.put(
278278
id,

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,8 @@ private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamS
504504
try {
505505
// Peek first to ensure we don't pull off if the wrong batch.
506506
verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send().");
507-
// Pull off before we send, the sending threads in issueRequest will be notified if there is an error and will
507+
// Pull off before we send, the sending threads in issueRequest will be notified if there is
508+
// an error and will
508509
// resend requests (possibly with new batching).
509510
verify(batch == batches.pollFirst());
510511
verify(!batch.isEmpty());

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
1919

2020
import static com.google.common.truth.Truth.assertThat;
21+
import static org.hamcrest.Matchers.*;
22+
import static org.hamcrest.Matchers.equalTo;
2123
import static org.junit.Assert.assertEquals;
2224
import static org.junit.Assert.assertNotNull;
2325
import static org.junit.Assert.assertNull;
@@ -40,7 +42,7 @@
4042
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessChannelBuilder;
4143
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
4244
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.testing.GrpcCleanupRule;
43-
import org.hamcrest.Matchers;
45+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
4446
import org.junit.After;
4547
import org.junit.Before;
4648
import org.junit.Rule;
@@ -62,6 +64,10 @@ public class GrpcCommitWorkStreamTest {
6264
.build();
6365
private static final String COMPUTATION_ID = "computationId";
6466

67+
@SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+ API only
68+
private static final ByteString LARGE_BYTE_STRING =
69+
ByteString.copyFromUtf8(Strings.repeat("a", 2 * 1024 * 1024));
70+
6571
@Rule public final ErrorCollector errorCollector = new ErrorCollector();
6672
@Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
6773

@@ -328,7 +334,36 @@ public void testCommitWorkItem_retryOnNewStreamHalfClose() throws Exception {
328334
}
329335

330336
@Test
331-
public void testSend_notCalledAfterShutdown() throws ExecutionException, InterruptedException {
337+
public void testSend_notCalledAfterShutdown_Single()
338+
throws ExecutionException, InterruptedException {
339+
int numCommits = 1;
340+
CountDownLatch commitProcessed = new CountDownLatch(numCommits);
341+
GrpcCommitWorkStream commitWorkStream = createCommitWorkStream();
342+
FakeWindmillGrpcService.CommitStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
343+
344+
try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) {
345+
assertTrue(
346+
batcher.commitWorkItem(
347+
COMPUTATION_ID,
348+
workItemCommitRequest(0),
349+
commitStatus -> {
350+
errorCollector.checkThat(commitStatus, equalTo(Windmill.CommitStatus.ABORTED));
351+
errorCollector.checkThat(commitProcessed.getCount(), greaterThan(0L));
352+
commitProcessed.countDown();
353+
}));
354+
// Shutdown the stream before we exit the try-with-resources block which will try to send()
355+
// the batched request.
356+
commitWorkStream.shutdown();
357+
}
358+
commitProcessed.await();
359+
360+
assertNotNull(streamInfo.onDone.get());
361+
assertThat(streamInfo.requests).isEmpty();
362+
}
363+
364+
@Test
365+
public void testSend_notCalledAfterShutdown_Batch()
366+
throws ExecutionException, InterruptedException {
332367
int numCommits = 2;
333368
CountDownLatch commitProcessed = new CountDownLatch(numCommits);
334369
GrpcCommitWorkStream commitWorkStream = createCommitWorkStream();
@@ -340,12 +375,48 @@ public void testSend_notCalledAfterShutdown() throws ExecutionException, Interru
340375
batcher.commitWorkItem(
341376
COMPUTATION_ID,
342377
workItemCommitRequest(i),
343-
commitStatus -> commitProcessed.countDown()));
378+
commitStatus -> {
379+
errorCollector.checkThat(commitStatus, equalTo(Windmill.CommitStatus.ABORTED));
380+
errorCollector.checkThat(commitProcessed.getCount(), greaterThan(0L));
381+
commitProcessed.countDown();
382+
}));
344383
}
345384
// Shutdown the stream before we exit the try-with-resources block which will try to send()
346385
// the batched request.
347386
commitWorkStream.shutdown();
348387
}
388+
commitProcessed.await();
389+
390+
assertNotNull(streamInfo.onDone.get());
391+
assertThat(streamInfo.requests).isEmpty();
392+
}
393+
394+
@Test
395+
public void testSend_notCalledAfterShutdown_Multichunk()
396+
throws ExecutionException, InterruptedException {
397+
int numCommits = 1;
398+
CountDownLatch commitProcessed = new CountDownLatch(numCommits);
399+
GrpcCommitWorkStream commitWorkStream = createCommitWorkStream();
400+
FakeWindmillGrpcService.CommitStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
401+
402+
try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) {
403+
assertTrue(
404+
batcher.commitWorkItem(
405+
COMPUTATION_ID,
406+
workItemCommitRequest(0)
407+
.toBuilder()
408+
.addBagUpdates(Windmill.TagBag.newBuilder().setTag(LARGE_BYTE_STRING).build())
409+
.build(),
410+
commitStatus -> {
411+
errorCollector.checkThat(commitStatus, equalTo(Windmill.CommitStatus.ABORTED));
412+
errorCollector.checkThat(commitProcessed.getCount(), greaterThan(0L));
413+
commitProcessed.countDown();
414+
}));
415+
// Shutdown the stream before we exit the try-with-resources block which will try to send()
416+
// the batched request.
417+
commitWorkStream.shutdown();
418+
}
419+
commitProcessed.await();
349420
assertNotNull(streamInfo.onDone.get());
350421
assertThat(streamInfo.requests).isEmpty();
351422
}
@@ -354,7 +425,7 @@ private FakeWindmillGrpcService.CommitStreamInfo waitForConnectionAndConsumeHead
354425
try {
355426
FakeWindmillGrpcService.CommitStreamInfo info = fakeService.waitForConnectedCommitStream();
356427
Windmill.StreamingCommitWorkRequest request = info.requests.take();
357-
errorCollector.checkThat(request.getHeader(), Matchers.is(TEST_JOB_HEADER));
428+
errorCollector.checkThat(request.getHeader(), is(TEST_JOB_HEADER));
358429
assertEquals(0, request.getCommitChunkCount());
359430
return info;
360431
} catch (InterruptedException e) {

0 commit comments

Comments
 (0)