Skip to content

Commit 67e3eac

Browse files
committed
comments
1 parent c34ba48 commit 67e3eac

File tree

1 file changed

+4
-1
lines changed
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc

1 file changed

+4
-1
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ private void queueRequestAndWait(QueuedRequest request)
486486

487487
private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException {
488488
// Finalize the batch so that no additional requests will be added. Leave the batch in the
489-
// queue so that a subsequent batch will wait for its completion.
489+
// queue so that a subsequent batch will wait for it to be sent.
490490
batch.markFinalized();
491491

492492
if (isShutdown) {
@@ -502,7 +502,10 @@ private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamS
502502
}
503503

504504
try {
505+
// Peek first to ensure we don't pull off if the wrong batch.
505506
verify(batch == batches.peekFirst(), "GetDataStream request batch removed before send().");
507+
// Pull off before we send, the sending threads in issueRequest will be notified if there is an error and will
508+
// resend requests (possibly with new batching).
506509
verify(batch == batches.pollFirst());
507510
verify(!batch.isEmpty());
508511
currentGetDataPhysicalStream.sendBatch(batch);

0 commit comments

Comments
 (0)