2626import java .util .concurrent .RejectedExecutionException ;
2727import java .util .concurrent .TimeUnit ;
2828import java .util .concurrent .atomic .AtomicBoolean ;
29+ import java .util .concurrent .atomic .AtomicReference ;
2930import java .util .function .Function ;
30- import java . util . function . Supplier ;
31+ import javax . annotation . Nullable ;
3132import javax .annotation .concurrent .GuardedBy ;
3233import org .apache .beam .runners .dataflow .worker .windmill .client .grpc .observers .StreamObserverCancelledException ;
3334import org .apache .beam .runners .dataflow .worker .windmill .client .grpc .observers .StreamObserverFactory ;
3738import org .apache .beam .vendor .grpc .v1p69p0 .io .grpc .Status ;
3839import org .apache .beam .vendor .grpc .v1p69p0 .io .grpc .stub .StreamObserver ;
3940import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ThreadFactoryBuilder ;
41+ import org .checkerframework .checker .nullness .qual .NonNull ;
4042import org .joda .time .DateTime ;
4143import org .joda .time .Instant ;
4244import org .slf4j .Logger ;
4850 * stream if it is broken. Subclasses are responsible for retrying requests that have been lost on a
4951 * broken stream.
5052 *
51- * <p>Subclasses should override {@link #onResponse(ResponseT )} to handle responses from the server,
52- * and {@link #onNewStream()} to perform any work that must be done when a new stream is created,
53- * such as sending headers or retrying requests.
53+ * <p>Subclasses should override {@link #newResponseHandler( )} to implement a handler for physical
54+ * stream connection. {@link #onNewStream()} to perform any work that must be done when a new stream
55+ * is created, such as sending headers or retrying requests.
5456 *
55- * <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called from {@link
56- * #onResponse(ResponseT)} ; use {@link #executeSafely(Runnable)} instead.
57+ * <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called when handling
58+ * responses ; use {@link #executeSafely(Runnable)} instead.
5759 *
5860 * <p>Synchronization on this is used to synchronize the gRpc stream state and internal data
5961 * structures. Since grpc channel operations may block, synchronization on this stream may also
@@ -83,9 +85,12 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
8385 private final Set <AbstractWindmillStream <?, ?>> streamRegistry ;
8486 private final int logEveryNStreamFailures ;
8587 private final String backendWorkerToken ;
88+
89+ private final Function <StreamObserver <ResponseT >, TerminatingStreamObserver <RequestT >>
90+ physicalStreamFactory ;
91+ protected final long physicalStreamDeadlineSeconds ;
8692 private final ResettableThrowingStreamObserver <RequestT > requestObserver ;
8793
88- private final Supplier <TerminatingStreamObserver <RequestT >> requestObserverFactory ;
8994 private final StreamDebugMetrics debugMetrics ;
9095 private final AtomicBoolean isHealthCheckScheduled ;
9196
@@ -95,6 +100,17 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
95100 @ GuardedBy ("this" )
96101 protected boolean isShutdown ;
97102
103+ // The active physical grpc stream. trySend will send messages on the bi-directional stream
104+ // associated with this handler. The instances are created by subclasses via newResponseHandler.
105+ // Subclasses may wish to store additional per-physical stream state within the handler.
106+ @ GuardedBy ("this" )
107+ protected @ Nullable PhysicalStreamHandler currentPhysicalStream ;
108+
109+ // Generally the same as currentPhysicalStream, set under synchronization of this but can be read
110+ // without.
111+ private final AtomicReference <PhysicalStreamHandler > currentPhysicalStreamForDebug =
112+ new AtomicReference <>();
113+
98114 @ GuardedBy ("this" )
99115 private boolean started ;
100116
@@ -108,6 +124,9 @@ protected AbstractWindmillStream(
108124 int logEveryNStreamFailures ,
109125 String backendWorkerToken ) {
110126 this .backendWorkerToken = backendWorkerToken ;
127+ this .physicalStreamFactory =
128+ (StreamObserver <ResponseT > observer ) -> streamObserverFactory .from (clientFactory , observer );
129+ this .physicalStreamDeadlineSeconds = streamObserverFactory .getDeadlineSeconds ();
111130 this .executor =
112131 Executors .newSingleThreadExecutor (
113132 new ThreadFactoryBuilder ()
@@ -124,10 +143,6 @@ protected AbstractWindmillStream(
124143 this .finishLatch = new CountDownLatch (1 );
125144 this .logger = logger ;
126145 this .requestObserver = new ResettableThrowingStreamObserver <>(logger );
127- this .requestObserverFactory =
128- () ->
129- streamObserverFactory .from (
130- clientFactory , new AbstractWindmillStream <RequestT , ResponseT >.ResponseObserver ());
131146 this .sleeper = Sleeper .DEFAULT ;
132147 this .debugMetrics = StreamDebugMetrics .create ();
133148 }
@@ -138,19 +153,45 @@ private static String createThreadName(String streamType, String backendWorkerTo
138153 : String .format ("%s-WindmillStream-thread" , streamType );
139154 }
140155
141- /** Called on each response from the server . */
142- protected abstract void onResponse ( ResponseT response );
156+ /** Represents a physical grpc stream that is part of the logical windmill stream . */
157+ protected abstract class PhysicalStreamHandler {
143158
144- /** Called when a new underlying stream to the server has been opened. */
145- protected abstract void onNewStream () throws WindmillStreamShutdownException ;
159+ /** Called on each response from the server. */
160+ public abstract void onResponse (ResponseT response );
161+
162+ /** Returns whether there are any pending requests that should be retried on a stream break. */
163+ public abstract boolean hasPendingRequests ();
146164
147- /** Returns whether there are any pending requests that should be retried on a stream break. */
148- protected abstract boolean hasPendingRequests ();
165+ /**
166+ * Called when the physical stream has finished. For streams with requests that should be
167+ * retried, requests should be moved to parent state so that it is captured by the next
168+ * flushPendingToStream call.
169+ */
170+ public abstract void onDone (Status status );
171+
172+ /**
173+ * Renders information useful for debugging as html.
174+ *
175+ * @implNote Don't require synchronization on AbstractWindmillStream.this, see the {@link
176+ * #appendSummaryHtml(PrintWriter)} comment.
177+ */
178+ public abstract void appendHtml (PrintWriter writer );
179+
180+ private final StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics .create ();
181+ }
182+
183+ protected abstract PhysicalStreamHandler newResponseHandler ();
184+
185+ protected abstract void onNewStream () throws WindmillStreamShutdownException ;
149186
150187 /** Try to send a request to the server. Returns true if the request was successfully sent. */
151188 @ CanIgnoreReturnValue
152189 protected final synchronized boolean trySend (RequestT request )
153190 throws WindmillStreamShutdownException {
191+ if (currentPhysicalStream == null ) {
192+ return false ;
193+ }
194+ currentPhysicalStream .streamDebugMetrics .recordSend ();
154195 debugMetrics .recordSend ();
155196 try {
156197 requestObserver .onNext (request );
@@ -182,10 +223,14 @@ private void startStream() {
182223 // Add the stream to the registry after it has been fully constructed.
183224 streamRegistry .add (this );
184225 while (true ) {
226+ @ NonNull PhysicalStreamHandler streamHandler = newResponseHandler ();
185227 try {
186228 synchronized (this ) {
187229 debugMetrics .recordStart ();
188- requestObserver .reset (requestObserverFactory .get ());
230+ streamHandler .streamDebugMetrics .recordStart ();
231+ currentPhysicalStream = streamHandler ;
232+ currentPhysicalStreamForDebug .set (currentPhysicalStream );
233+ requestObserver .reset (physicalStreamFactory .apply (new ResponseObserver (streamHandler )));
189234 onNewStream ();
190235 if (clientClosed ) {
191236 halfClose ();
@@ -272,6 +317,23 @@ public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
272317 */
273318 public final void appendSummaryHtml (PrintWriter writer ) {
274319 appendSpecificHtml (writer );
320+
321+ @ Nullable PhysicalStreamHandler currentHandler = currentPhysicalStreamForDebug .get ();
322+ if (currentHandler != null ) {
323+ writer .format ("Physical stream: " );
324+ currentHandler .appendHtml (writer );
325+ StreamDebugMetrics .Snapshot summaryMetrics =
326+ currentHandler .streamDebugMetrics .getSummaryMetrics ();
327+ if (summaryMetrics .isClientClosed ()) {
328+ writer .write (" client closed" );
329+ }
330+ writer .format (
331+ " current stream is %dms old, last send %dms, last response %dms\n " ,
332+ summaryMetrics .streamAge (),
333+ summaryMetrics .timeSinceLastSend (),
334+ summaryMetrics .timeSinceLastResponse ());
335+ }
336+
275337 StreamDebugMetrics .Snapshot summaryMetrics = debugMetrics .getSummaryMetrics ();
276338 summaryMetrics
277339 .restartMetrics ()
@@ -304,6 +366,8 @@ public final void appendSummaryHtml(PrintWriter writer) {
304366 }
305367
306368 /**
369+ * Add specific debug state for the logical stream.
370+ *
307371 * @implNote Don't require synchronization on stream, see the {@link
308372 * #appendSummaryHtml(PrintWriter)} comment.
309373 */
@@ -315,6 +379,9 @@ public final synchronized void halfClose() {
315379 debugMetrics .recordHalfClose ();
316380 clientClosed = true ;
317381 try {
382+ if (currentPhysicalStream != null ) {
383+ currentPhysicalStream .streamDebugMetrics .recordHalfClose ();
384+ }
318385 requestObserver .onCompleted ();
319386 } catch (ResettableThrowingStreamObserver .StreamClosedException e ) {
320387 logger .warn ("Stream was previously closed." );
@@ -354,11 +421,17 @@ public final void shutdown() {
354421 }
355422 }
356423
357- protected abstract void shutdownInternal ();
424+ protected synchronized void shutdownInternal () {}
358425
359426 /** Returns true if the stream was torn down and should not be restarted internally. */
360- private synchronized boolean maybeTearDownStream () {
361- if (isShutdown || (clientClosed && !hasPendingRequests ())) {
427+ private synchronized boolean maybeTearDownStream (PhysicalStreamHandler doneStream ) {
428+ if (clientClosed && !doneStream .hasPendingRequests ()) {
429+ shutdown ();
430+ }
431+
432+ if (isShutdown ) {
433+ // Once we have background closing physicalStreams we will need to improve this to wait for
434+ // all of the work of the logical stream to be complete.
362435 streamRegistry .remove (AbstractWindmillStream .this );
363436 finishLatch .countDown ();
364437 executor .shutdownNow ();
@@ -369,23 +442,49 @@ private synchronized boolean maybeTearDownStream() {
369442 }
370443
371444 private class ResponseObserver implements StreamObserver <ResponseT > {
445+ private final PhysicalStreamHandler handler ;
446+
447+ ResponseObserver (PhysicalStreamHandler handler ) {
448+ this .handler = handler ;
449+ }
372450
373451 @ Override
374452 public void onNext (ResponseT response ) {
375453 backoff .reset ();
376454 debugMetrics .recordResponse ();
377- onResponse (response );
455+ handler .streamDebugMetrics .recordResponse ();
456+ handler .onResponse (response );
378457 }
379458
380459 @ Override
381460 public void onError (Throwable t ) {
382- if (maybeTearDownStream ()) {
383- return ;
384- }
461+ executeSafely (() -> onPhysicalStreamCompletion (Status .fromThrowable (t ), handler ));
462+ }
385463
386- Status errorStatus = Status .fromThrowable (t );
387- recordStreamStatus (errorStatus );
464+ @ Override
465+ public void onCompleted () {
466+ executeSafely (() -> onPhysicalStreamCompletion (OK_STATUS , handler ));
467+ }
468+ }
469+
470+ @ SuppressWarnings ("nullness" )
471+ private void clearPhysicalStreamForDebug () {
472+ currentPhysicalStreamForDebug .set (null );
473+ }
388474
475+ private void onPhysicalStreamCompletion (Status status , PhysicalStreamHandler handler ) {
476+ synchronized (this ) {
477+ if (currentPhysicalStream == handler ) {
478+ clearPhysicalStreamForDebug ();
479+ currentPhysicalStream = null ;
480+ }
481+ }
482+ handler .onDone (status );
483+ if (maybeTearDownStream (handler )) {
484+ return ;
485+ }
486+ // Backoff on errors.;
487+ if (!status .isOk ()) {
389488 try {
390489 long sleep = backoff .nextBackOffMillis ();
391490 debugMetrics .recordSleep (sleep );
@@ -394,54 +493,43 @@ public void onError(Throwable t) {
394493 Thread .currentThread ().interrupt ();
395494 return ;
396495 }
397-
398- executeSafely (AbstractWindmillStream .this ::startStream );
399- }
400-
401- @ Override
402- public void onCompleted () {
403- if (maybeTearDownStream ()) {
404- return ;
405- }
406- recordStreamStatus (OK_STATUS );
407- executeSafely (AbstractWindmillStream .this ::startStream );
408496 }
497+ recordStreamRestart (status );
498+ startStream ();
499+ }
409500
410- private void recordStreamStatus (Status status ) {
411- int currentRestartCount = debugMetrics .incrementAndGetRestarts ();
412- if (status .isOk ()) {
413- String restartReason =
414- "Stream completed successfully but did not complete requested operations, "
415- + "recreating" ;
416- logger .warn (restartReason );
417- debugMetrics .recordRestartReason (restartReason );
418- } else {
419- int currentErrorCount = debugMetrics .incrementAndGetErrors ();
420- debugMetrics .recordRestartReason (status .toString ());
421- Throwable t = status .getCause ();
422- if (t instanceof StreamObserverCancelledException ) {
423- logger .error (
424- "StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}" ,
425- getClass (),
426- backendWorkerToken ,
427- t .getStackTrace (),
428- t );
429- } else if (currentRestartCount % logEveryNStreamFailures == 0 ) {
430- // Don't log every restart since it will get noisy, and many errors transient.
431- long nowMillis = Instant .now ().getMillis ();
432- logger .debug (
433- "{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}"
434- + " with status: {}. created {}ms ago; {}. This is normal with autoscaling." ,
435- AbstractWindmillStream .this .getClass (),
436- currentRestartCount ,
437- currentErrorCount ,
438- t ,
439- status ,
440- nowMillis - debugMetrics .getStartTimeMs (),
441- debugMetrics
442- .responseDebugString (nowMillis )
443- .orElse (NEVER_RECEIVED_RESPONSE_LOG_STRING ));
444- }
501+ private void recordStreamRestart (Status status ) {
502+ int currentRestartCount = debugMetrics .incrementAndGetRestarts ();
503+ if (status .isOk ()) {
504+ String restartReason =
505+ "Stream completed successfully but did not complete requested operations, "
506+ + "recreating" ;
507+ logger .warn (restartReason );
508+ debugMetrics .recordRestartReason (restartReason );
509+ } else {
510+ int currentErrorCount = debugMetrics .incrementAndGetErrors ();
511+ debugMetrics .recordRestartReason (status .toString ());
512+ Throwable t = status .getCause ();
513+ if (t instanceof StreamObserverCancelledException ) {
514+ logger .error (
515+ "StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}" ,
516+ getClass (),
517+ backendWorkerToken ,
518+ t .getStackTrace (),
519+ t );
520+ } else if (currentRestartCount % logEveryNStreamFailures == 0 ) {
521+ // Don't log every restart since it will get noisy, and many errors transient.
522+ long nowMillis = Instant .now ().getMillis ();
523+ logger .debug (
524+ "{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}"
525+ + " with status: {}. created {}ms ago; {}. This is normal with autoscaling." ,
526+ AbstractWindmillStream .this .getClass (),
527+ currentRestartCount ,
528+ currentErrorCount ,
529+ t ,
530+ status ,
531+ nowMillis - debugMetrics .getStartTimeMs (),
532+ debugMetrics .responseDebugString (nowMillis ).orElse (NEVER_RECEIVED_RESPONSE_LOG_STRING ));
445533 }
446534 }
447535 }
0 commit comments