2727import java .util .concurrent .RejectedExecutionException ;
2828import java .util .concurrent .TimeUnit ;
2929import java .util .concurrent .atomic .AtomicBoolean ;
30+ import java .util .concurrent .atomic .AtomicReference ;
3031import java .util .function .Function ;
31- import java . util . function . Supplier ;
32+ import javax . annotation . Nullable ;
3233import javax .annotation .concurrent .GuardedBy ;
3334import org .apache .beam .runners .dataflow .worker .windmill .client .grpc .observers .StreamObserverCancelledException ;
3435import org .apache .beam .runners .dataflow .worker .windmill .client .grpc .observers .StreamObserverFactory ;
3839import org .apache .beam .vendor .grpc .v1p69p0 .io .grpc .Status ;
3940import org .apache .beam .vendor .grpc .v1p69p0 .io .grpc .stub .StreamObserver ;
4041import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ThreadFactoryBuilder ;
42+ import org .checkerframework .checker .nullness .qual .NonNull ;
4143import org .joda .time .DateTime ;
4244import org .joda .time .Instant ;
4345import org .slf4j .Logger ;
4951 * stream if it is broken. Subclasses are responsible for retrying requests that have been lost on a
5052 * broken stream.
5153 *
52- * <p>Subclasses should override {@link #onResponse(ResponseT )} to handle responses from the server,
53- * and {@link #onNewStream()} to perform any work that must be done when a new stream is created,
54- * such as sending headers or retrying requests.
54+ * <p>Subclasses should override {@link #newResponseHandler( )} to implement a handler for physical
55+ * stream connection. {@link #onNewStream()} to perform any work that must be done when a new stream
56+ * is created, such as sending headers or retrying requests.
5557 *
56- * <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called from {@link
57- * #onResponse(ResponseT)} ; use {@link #executeSafely(Runnable)} instead.
58+ * <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called when handling
59+ * responses ; use {@link #executeSafely(Runnable)} instead.
5860 *
5961 * <p>Synchronization on this is used to synchronize the gRpc stream state and internal data
6062 * structures. Since grpc channel operations may block, synchronization on this stream may also
@@ -84,9 +86,12 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
8486 private final Set <AbstractWindmillStream <?, ?>> streamRegistry ;
8587 private final int logEveryNStreamFailures ;
8688 private final String backendWorkerToken ;
89+
90+ private final Function <StreamObserver <ResponseT >, TerminatingStreamObserver <RequestT >>
91+ physicalStreamFactory ;
92+ protected final long physicalStreamDeadlineSeconds ;
8793 private final ResettableThrowingStreamObserver <RequestT > requestObserver ;
8894
89- private final Supplier <TerminatingStreamObserver <RequestT >> requestObserverFactory ;
9095 private final StreamDebugMetrics debugMetrics ;
9196 private final AtomicBoolean isHealthCheckScheduled ;
9297
@@ -96,6 +101,17 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
96101 @ GuardedBy ("this" )
97102 protected boolean isShutdown ;
98103
104+ // The active physical grpc stream. trySend will send messages on the bi-directional stream
105+ // associated with this handler. The instances are created by subclasses via newResponseHandler.
106+ // Subclasses may wish to store additional per-physical stream state within the handler.
107+ @ GuardedBy ("this" )
108+ protected @ Nullable PhysicalStreamHandler currentPhysicalStream ;
109+
110+ // Generally the same as currentPhysicalStream, set under synchronization of this but can be read
111+ // without.
112+ private final AtomicReference <PhysicalStreamHandler > currentPhysicalStreamForDebug =
113+ new AtomicReference <>();
114+
99115 @ GuardedBy ("this" )
100116 private boolean started ;
101117
@@ -109,6 +125,9 @@ protected AbstractWindmillStream(
109125 int logEveryNStreamFailures ,
110126 String backendWorkerToken ) {
111127 this .backendWorkerToken = backendWorkerToken ;
128+ this .physicalStreamFactory =
129+ (StreamObserver <ResponseT > observer ) -> streamObserverFactory .from (clientFactory , observer );
130+ this .physicalStreamDeadlineSeconds = streamObserverFactory .getDeadlineSeconds ();
112131 this .executor =
113132 Executors .newSingleThreadExecutor (
114133 new ThreadFactoryBuilder ()
@@ -125,10 +144,6 @@ protected AbstractWindmillStream(
125144 this .finishLatch = new CountDownLatch (1 );
126145 this .logger = logger ;
127146 this .requestObserver = new ResettableThrowingStreamObserver <>(logger );
128- this .requestObserverFactory =
129- () ->
130- streamObserverFactory .from (
131- clientFactory , new AbstractWindmillStream <RequestT , ResponseT >.ResponseObserver ());
132147 this .sleeper = Sleeper .DEFAULT ;
133148 this .debugMetrics = StreamDebugMetrics .create ();
134149 }
@@ -139,19 +154,45 @@ private static String createThreadName(String streamType, String backendWorkerTo
139154 : String .format ("%s-WindmillStream-thread" , streamType );
140155 }
141156
142- /** Called on each response from the server . */
143- protected abstract void onResponse ( ResponseT response );
157+ /** Represents a physical grpc stream that is part of the logical windmill stream . */
158+ protected abstract class PhysicalStreamHandler {
144159
145- /** Called when a new underlying stream to the server has been opened. */
146- protected abstract void onNewStream () throws WindmillStreamShutdownException ;
160+ /** Called on each response from the server. */
161+ public abstract void onResponse (ResponseT response );
162+
163+ /** Returns whether there are any pending requests that should be retried on a stream break. */
164+ public abstract boolean hasPendingRequests ();
165+
166+ /**
167+ * Called when the physical stream has finished. For streams with requests that should be
168+ * retried, requests should be moved to parent state so that it is captured by the next
169+ * flushPendingToStream call.
170+ */
171+ public abstract void onDone (Status status );
172+
173+ /**
174+ * Renders information useful for debugging as html.
175+ *
176+ * @implNote Don't require synchronization on AbstractWindmillStream.this, see the {@link
177+ * #appendSummaryHtml(PrintWriter)} comment.
178+ */
179+ public abstract void appendHtml (PrintWriter writer );
180+
181+ private final StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics .create ();
182+ }
183+
184+ protected abstract PhysicalStreamHandler newResponseHandler ();
147185
148- /** Returns whether there are any pending requests that should be retried on a stream break. */
149- protected abstract boolean hasPendingRequests ();
186+ protected abstract void onNewStream () throws WindmillStreamShutdownException ;
150187
151188 /** Try to send a request to the server. Returns true if the request was successfully sent. */
152189 @ CanIgnoreReturnValue
153190 protected final synchronized boolean trySend (RequestT request )
154191 throws WindmillStreamShutdownException {
192+ if (currentPhysicalStream == null ) {
193+ return false ;
194+ }
195+ currentPhysicalStream .streamDebugMetrics .recordSend ();
155196 debugMetrics .recordSend ();
156197 try {
157198 requestObserver .onNext (request );
@@ -183,10 +224,14 @@ private void startStream() {
183224 // Add the stream to the registry after it has been fully constructed.
184225 streamRegistry .add (this );
185226 while (true ) {
227+ @ NonNull PhysicalStreamHandler streamHandler = newResponseHandler ();
186228 try {
187229 synchronized (this ) {
188230 debugMetrics .recordStart ();
189- requestObserver .reset (requestObserverFactory .get ());
231+ streamHandler .streamDebugMetrics .recordStart ();
232+ currentPhysicalStream = streamHandler ;
233+ currentPhysicalStreamForDebug .set (currentPhysicalStream );
234+ requestObserver .reset (physicalStreamFactory .apply (new ResponseObserver (streamHandler )));
190235 onNewStream ();
191236 if (clientClosed ) {
192237 halfClose ();
@@ -275,6 +320,23 @@ public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
275320 */
276321 public final void appendSummaryHtml (PrintWriter writer ) {
277322 appendSpecificHtml (writer );
323+
324+ @ Nullable PhysicalStreamHandler currentHandler = currentPhysicalStreamForDebug .get ();
325+ if (currentHandler != null ) {
326+ writer .format ("Physical stream: " );
327+ currentHandler .appendHtml (writer );
328+ StreamDebugMetrics .Snapshot summaryMetrics =
329+ currentHandler .streamDebugMetrics .getSummaryMetrics ();
330+ if (summaryMetrics .isClientClosed ()) {
331+ writer .write (" client closed" );
332+ }
333+ writer .format (
334+ " current stream is %dms old, last send %dms, last response %dms\n " ,
335+ summaryMetrics .streamAge (),
336+ summaryMetrics .timeSinceLastSend (),
337+ summaryMetrics .timeSinceLastResponse ());
338+ }
339+
278340 StreamDebugMetrics .Snapshot summaryMetrics = debugMetrics .getSummaryMetrics ();
279341 summaryMetrics
280342 .restartMetrics ()
@@ -307,6 +369,8 @@ public final void appendSummaryHtml(PrintWriter writer) {
307369 }
308370
309371 /**
372+ * Add specific debug state for the logical stream.
373+ *
310374 * @implNote Don't require synchronization on stream, see the {@link
311375 * #appendSummaryHtml(PrintWriter)} comment.
312376 */
@@ -318,6 +382,9 @@ public final synchronized void halfClose() {
318382 debugMetrics .recordHalfClose ();
319383 clientClosed = true ;
320384 try {
385+ if (currentPhysicalStream != null ) {
386+ currentPhysicalStream .streamDebugMetrics .recordHalfClose ();
387+ }
321388 requestObserver .onCompleted ();
322389 } catch (ResettableThrowingStreamObserver .StreamClosedException e ) {
323390 logger .warn ("Stream was previously closed." );
@@ -357,11 +424,17 @@ public final void shutdown() {
357424 }
358425 }
359426
360- protected abstract void shutdownInternal ();
427+ protected synchronized void shutdownInternal () {}
361428
362429 /** Returns true if the stream was torn down and should not be restarted internally. */
363- private synchronized boolean maybeTearDownStream () {
364- if (isShutdown || (clientClosed && !hasPendingRequests ())) {
430+ private synchronized boolean maybeTearDownStream (PhysicalStreamHandler doneStream ) {
431+ if (clientClosed && !doneStream .hasPendingRequests ()) {
432+ shutdown ();
433+ }
434+
435+ if (isShutdown ) {
436+ // Once we have background closing physicalStreams we will need to improve this to wait for
437+ // all of the work of the logical stream to be complete.
365438 streamRegistry .remove (AbstractWindmillStream .this );
366439 finishLatch .countDown ();
367440 executor .shutdownNow ();
@@ -372,6 +445,11 @@ private synchronized boolean maybeTearDownStream() {
372445 }
373446
374447 private class ResponseObserver implements StreamObserver <ResponseT > {
448+ private final PhysicalStreamHandler handler ;
449+
450+ ResponseObserver (PhysicalStreamHandler handler ) {
451+ this .handler = handler ;
452+ }
375453
376454 @ Override
377455 public void onNext (ResponseT response ) {
@@ -381,18 +459,34 @@ public void onNext(ResponseT response) {
381459 // Ignore.
382460 }
383461 debugMetrics .recordResponse ();
384- onResponse (response );
462+ handler .streamDebugMetrics .recordResponse ();
463+ handler .onResponse (response );
385464 }
386465
387466 @ Override
388467 public void onError (Throwable t ) {
389- if (maybeTearDownStream ()) {
390- return ;
391- }
468+ executeSafely (() -> onPhysicalStreamCompletion (Status .fromThrowable (t ), handler ));
469+ }
392470
393- Status errorStatus = Status .fromThrowable (t );
394- recordStreamStatus (errorStatus );
471+ @ Override
472+ public void onCompleted () {
473+ executeSafely (() -> onPhysicalStreamCompletion (OK_STATUS , handler ));
474+ }
475+ }
395476
477+ private void onPhysicalStreamCompletion (Status status , PhysicalStreamHandler handler ) {
478+ synchronized (this ) {
479+ if (currentPhysicalStream == handler ) {
480+ currentPhysicalStreamForDebug .set (null );
481+ currentPhysicalStream = null ;
482+ }
483+ }
484+ handler .onDone (status );
485+ if (maybeTearDownStream (handler )) {
486+ return ;
487+ }
488+ // Backoff on errors.;
489+ if (!status .isOk ()) {
396490 try {
397491 long sleep = backoff .nextBackOffMillis ();
398492 debugMetrics .recordSleep (sleep );
@@ -403,54 +497,43 @@ public void onError(Throwable t) {
403497 } catch (IOException e ) {
404498 // Ignore.
405499 }
406-
407- executeSafely (AbstractWindmillStream .this ::startStream );
408- }
409-
410- @ Override
411- public void onCompleted () {
412- if (maybeTearDownStream ()) {
413- return ;
414- }
415- recordStreamStatus (OK_STATUS );
416- executeSafely (AbstractWindmillStream .this ::startStream );
417500 }
501+ recordStreamRestart (status );
502+ startStream ();
503+ }
418504
419- private void recordStreamStatus (Status status ) {
420- int currentRestartCount = debugMetrics .incrementAndGetRestarts ();
421- if (status .isOk ()) {
422- String restartReason =
423- "Stream completed successfully but did not complete requested operations, "
424- + "recreating" ;
425- logger .warn (restartReason );
426- debugMetrics .recordRestartReason (restartReason );
427- } else {
428- int currentErrorCount = debugMetrics .incrementAndGetErrors ();
429- debugMetrics .recordRestartReason (status .toString ());
430- Throwable t = status .getCause ();
431- if (t instanceof StreamObserverCancelledException ) {
432- logger .error (
433- "StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}" ,
434- getClass (),
435- backendWorkerToken ,
436- t .getStackTrace (),
437- t );
438- } else if (currentRestartCount % logEveryNStreamFailures == 0 ) {
439- // Don't log every restart since it will get noisy, and many errors transient.
440- long nowMillis = Instant .now ().getMillis ();
441- logger .debug (
442- "{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}"
443- + " with status: {}. created {}ms ago; {}. This is normal with autoscaling." ,
444- AbstractWindmillStream .this .getClass (),
445- currentRestartCount ,
446- currentErrorCount ,
447- t ,
448- status ,
449- nowMillis - debugMetrics .getStartTimeMs (),
450- debugMetrics
451- .responseDebugString (nowMillis )
452- .orElse (NEVER_RECEIVED_RESPONSE_LOG_STRING ));
453- }
505+ private void recordStreamRestart (Status status ) {
506+ int currentRestartCount = debugMetrics .incrementAndGetRestarts ();
507+ if (status .isOk ()) {
508+ String restartReason =
509+ "Stream completed successfully but did not complete requested operations, "
510+ + "recreating" ;
511+ logger .warn (restartReason );
512+ debugMetrics .recordRestartReason (restartReason );
513+ } else {
514+ int currentErrorCount = debugMetrics .incrementAndGetErrors ();
515+ debugMetrics .recordRestartReason (status .toString ());
516+ Throwable t = status .getCause ();
517+ if (t instanceof StreamObserverCancelledException ) {
518+ logger .error (
519+ "StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}" ,
520+ getClass (),
521+ backendWorkerToken ,
522+ t .getStackTrace (),
523+ t );
524+ } else if (currentRestartCount % logEveryNStreamFailures == 0 ) {
525+ // Don't log every restart since it will get noisy, and many errors transient.
526+ long nowMillis = Instant .now ().getMillis ();
527+ logger .debug (
528+ "{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}"
529+ + " with status: {}. created {}ms ago; {}. This is normal with autoscaling." ,
530+ AbstractWindmillStream .this .getClass (),
531+ currentRestartCount ,
532+ currentErrorCount ,
533+ t ,
534+ status ,
535+ nowMillis - debugMetrics .getStartTimeMs (),
536+ debugMetrics .responseDebugString (nowMillis ).orElse (NEVER_RECEIVED_RESPONSE_LOG_STRING ));
454537 }
455538 }
456539 }
0 commit comments