Skip to content

Commit b9a298f

Browse files
committed
comments and lint warnings
1 parent 296ac57 commit b9a298f

File tree

5 files changed

+22
-39
lines changed

5 files changed

+22
-39
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,7 @@ private void startStream() {
262262
checkState(halfCloseFuture == null, "Unexpected half-close future");
263263
if (isShutdown) {
264264
// No need to start the stream. shutdown() or onPhysicalStreamCompletion will be
265-
// responsible for completing
266-
// shutdown.
265+
// responsible for completing shutdown.
267266
return;
268267
}
269268
debugMetrics.recordStart();
@@ -465,7 +464,7 @@ public final boolean awaitTermination(int time, TimeUnit unit) throws Interrupte
465464

466465
@Override
467466
public final Instant startTime() {
468-
return new Instant(debugMetrics.getStartTimeMs());
467+
return Instant.ofEpochMilli(debugMetrics.getStartTimeMs());
469468
}
470469

471470
@Override
@@ -539,7 +538,7 @@ private void onHalfClosePhysicalStreamTimeout(PhysicalStreamHandler handler) {
539538
requestObserver.onCompleted();
540539
} catch (Exception e) {
541540
logger.debug(
542-
"Exception while half-closing handler, onPhysicalStreamCompletion will for the stream",
541+
"Exception while half-closing handler, onPhysicalStreamCompletion will be called for the stream",
543542
e);
544543
}
545544
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ public void appendSpecificHtml(PrintWriter writer) {
423423
if (batches > 0) {
424424
writer.format("GetDataStream: %d queued batches ", batches);
425425
} else {
426-
writer.append("GetDataStream: no queued ");
426+
writer.append("GetDataStream: no queued batches ");
427427
}
428428
}
429429

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/TriggeredScheduledExecutorService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,13 @@ private FakeScheduledFuture(Runnable r, Duration delay) {
8888
this.delay = delay;
8989
}
9090

91-
public void triggerRun() {
91+
void triggerRun() {
9292
TriggeredScheduledExecutorService.this.execute(
9393
() -> {
9494
try {
9595
r.run();
9696
delegateFuture.complete(null);
97-
} catch (Exception e) {
97+
} catch (RuntimeException e) {
9898
delegateFuture.completeExceptionally(e);
9999
}
100100
});

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,8 @@ private Windmill.WorkItemCommitRequest createTestCommit(int id) {
503503

504504
@Test
505505
public void testCommitWorkItem_multiplePhysicalStreams() throws Exception {
506+
// A special executor that allows triggering scheduled futures (of which the handover is the
507+
// only such future).
506508
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
507509
GrpcCommitWorkStream commitWorkStream =
508510
createCommitWorkStreamWithPhysicalStreamHandover(triggeredExecutor);
@@ -524,7 +526,8 @@ public void testCommitWorkItem_multiplePhysicalStreams() throws Exception {
524526
request.getCommitChunk(0).getSerializedWorkItemCommit());
525527
assertThat(parsedRequest).isEqualTo(workItemCommitRequest);
526528

527-
// Trigger a new stream to be created due to handover.
529+
// Trigger a new stream to be created by forcing the scheduled halfCloseFuture scheduled within
530+
// AbstractWindmillStream to run.
528531
assertTrue(triggeredExecutor.unblockNextFuture());
529532
FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader();
530533
fakeService.expectNoMoreStreams();
@@ -571,7 +574,7 @@ public void testCommitWorkItem_multiplePhysicalStreams() throws Exception {
571574
}
572575

573576
@Test
574-
public void testCommitWorkItem_multiplePhysicalStreams_OldStreamFails() throws Exception {
577+
public void testCommitWorkItem_multiplePhysicalStreams_oldStreamFails() throws Exception {
575578
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
576579
GrpcCommitWorkStream commitWorkStream =
577580
createCommitWorkStreamWithPhysicalStreamHandover(triggeredExecutor);
@@ -881,7 +884,7 @@ public void testCommitWorkItem_multiplePhysicalStreams_multipleHandovers() throw
881884
}
882885

883886
@Test
884-
public void testCommitWorkItem_multiplePhysicalStreams_OldStreamFailsWhileNewStreamInBackoff()
887+
public void testCommitWorkItem_multiplePhysicalStreams_oldStreamFailsWhileNewStreamInBackoff()
885888
throws Exception {
886889
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
887890
GrpcCommitWorkStream commitWorkStream =
@@ -1130,17 +1133,6 @@ public void testCommitWorkItem_multiplePhysicalStreams_multipleHandovers_halfClo
11301133
assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
11311134
}
11321135

1133-
// XXX add handover tests needed such as:
1134-
// - simple physical half close and new stream starting
1135-
// - when half-closed background stream fails and retries need to occur on new stream
1136-
// - when active stream fails with a background stream, new stream needs to be created and should
1137-
// just get
1138-
// requests from failed stream
1139-
// - creation of current stream is in backoff due to start failure and background stream fails,
1140-
// make sure requests eventually retried
1141-
// - logical halfclose with background streams
1142-
// - shutdown with background streams
1143-
11441136
private FakeWindmillGrpcService.CommitStreamInfo waitForConnectionAndConsumeHeader() {
11451137
try {
11461138
FakeWindmillGrpcService.CommitStreamInfo info = fakeService.waitForConnectedCommitStream();

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,7 @@ public void testRequestKeyedData_multiplePhysicalStreams()
362362
throws InterruptedException, ExecutionException {
363363
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
364364
GrpcGetDataStream getDataStream =
365-
createGetDataStreamWithPhysicalStreamHandover(
366-
java.time.Duration.ofSeconds(60), triggeredExecutor);
365+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
367366
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
368367

369368
// These will block until they are successfully sent.
@@ -438,12 +437,11 @@ public void testRequestKeyedData_multiplePhysicalStreams()
438437
}
439438

440439
@Test
441-
public void testRequestKeyedData_multiplePhysicalStreams_OldStreamFails()
440+
public void testRequestKeyedData_multiplePhysicalStreams_oldStreamFails()
442441
throws InterruptedException, ExecutionException {
443442
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
444443
GrpcGetDataStream getDataStream =
445-
createGetDataStreamWithPhysicalStreamHandover(
446-
java.time.Duration.ofSeconds(60), triggeredExecutor);
444+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
447445
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
448446

449447
// These will block until they are successfully sent.
@@ -525,8 +523,7 @@ public void testRequestKeyedData_multiplePhysicalStreams_newStreamFailsWhileEmpt
525523
throws InterruptedException, ExecutionException {
526524
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
527525
GrpcGetDataStream getDataStream =
528-
createGetDataStreamWithPhysicalStreamHandover(
529-
java.time.Duration.ofSeconds(60), triggeredExecutor);
526+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
530527
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
531528

532529
// These will block until they are successfully sent.
@@ -602,8 +599,7 @@ public void testRequestKeyedData_multiplePhysicalStreams_newStreamFailsWithReque
602599
throws InterruptedException, ExecutionException {
603600
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
604601
GrpcGetDataStream getDataStream =
605-
createGetDataStreamWithPhysicalStreamHandover(
606-
java.time.Duration.ofSeconds(60), triggeredExecutor);
602+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
607603
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
608604

609605
// These will block until they are successfully sent.
@@ -683,8 +679,7 @@ public void testRequestKeyedData_multiplePhysicalStreams_multipleHandovers_allRe
683679
throws InterruptedException, ExecutionException {
684680
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
685681
GrpcGetDataStream getDataStream =
686-
createGetDataStreamWithPhysicalStreamHandover(
687-
java.time.Duration.ofSeconds(60), triggeredExecutor);
682+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
688683
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
689684

690685
// Request 1, Stream 1
@@ -781,12 +776,11 @@ public void testRequestKeyedData_multiplePhysicalStreams_multipleHandovers_allRe
781776
}
782777

783778
@Test
784-
public void testRequestKeyedData_multiplePhysicalStreams_OldStreamFailsWhileNewStreamInBackoff()
779+
public void testRequestKeyedData_multiplePhysicalStreams_oldStreamFailsWhileNewStreamInBackoff()
785780
throws InterruptedException, ExecutionException {
786781
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
787782
GrpcGetDataStream getDataStream =
788-
createGetDataStreamWithPhysicalStreamHandover(
789-
java.time.Duration.ofSeconds(60), triggeredExecutor);
783+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
790784
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
791785

792786
Windmill.KeyedGetDataRequest keyedGetDataRequest = createTestRequest(1);
@@ -849,8 +843,7 @@ public void testRequestKeyedData_multiplePhysicalStreams_multipleHandovers_shutd
849843
throws InterruptedException, ExecutionException {
850844
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
851845
GrpcGetDataStream getDataStream =
852-
createGetDataStreamWithPhysicalStreamHandover(
853-
java.time.Duration.ofSeconds(60), triggeredExecutor);
846+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
854847
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
855848

856849
// Request 1, Stream 1
@@ -932,8 +925,7 @@ public void testRequestKeyedData_multiplePhysicalStreams_multipleHandovers_halfC
932925
throws InterruptedException, ExecutionException {
933926
TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService();
934927
GrpcGetDataStream getDataStream =
935-
createGetDataStreamWithPhysicalStreamHandover(
936-
java.time.Duration.ofSeconds(60), triggeredExecutor);
928+
createGetDataStreamWithPhysicalStreamHandover(Duration.ofSeconds(60), triggeredExecutor);
937929
FakeWindmillGrpcService.GetDataStreamInfo streamInfo = waitForConnectionAndConsumeHeader();
938930

939931
// Request 1, Stream 1

0 commit comments

Comments
 (0)