Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
Expand All @@ -37,6 +38,7 @@
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -48,12 +50,12 @@
* stream if it is broken. Subclasses are responsible for retrying requests that have been lost on a
* broken stream.
*
* <p>Subclasses should override {@link #onResponse(ResponseT)} to handle responses from the server,
* and {@link #onNewStream()} to perform any work that must be done when a new stream is created,
* such as sending headers or retrying requests.
* <p>Subclasses should override {@link #newResponseHandler()} to implement a handler for physical
* stream connection. {@link #onNewStream()} to perform any work that must be done when a new stream
* is created, such as sending headers or retrying requests.
*
* <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called from {@link
* #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
* <p>{@link #trySend(RequestT)} and {@link #startStream()} should not be called when handling
* responses; use {@link #executeSafely(Runnable)} instead.
*
* <p>Synchronization on this is used to synchronize the gRpc stream state and internal data
* structures. Since grpc channel operations may block, synchronization on this stream may also
Expand Down Expand Up @@ -83,9 +85,12 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
private final int logEveryNStreamFailures;
private final String backendWorkerToken;

private final Function<StreamObserver<ResponseT>, TerminatingStreamObserver<RequestT>>
physicalStreamFactory;
protected final long physicalStreamDeadlineSeconds;
private final ResettableThrowingStreamObserver<RequestT> requestObserver;

private final Supplier<TerminatingStreamObserver<RequestT>> requestObserverFactory;
private final StreamDebugMetrics debugMetrics;
private final AtomicBoolean isHealthCheckScheduled;

Expand All @@ -95,6 +100,17 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
@GuardedBy("this")
protected boolean isShutdown;

// The active physical grpc stream. trySend will send messages on the bi-directional stream
// associated with this handler. The instances are created by subclasses via newResponseHandler.
// Subclasses may wish to store additional per-physical stream state within the handler.
@GuardedBy("this")
protected @Nullable PhysicalStreamHandler currentPhysicalStream;

// Generally the same as currentPhysicalStream, set under synchronization of this but can be read
// without.
private final AtomicReference<PhysicalStreamHandler> currentPhysicalStreamForDebug =
new AtomicReference<>();

@GuardedBy("this")
private boolean started;

Expand All @@ -108,6 +124,9 @@ protected AbstractWindmillStream(
int logEveryNStreamFailures,
String backendWorkerToken) {
this.backendWorkerToken = backendWorkerToken;
this.physicalStreamFactory =
(StreamObserver<ResponseT> observer) -> streamObserverFactory.from(clientFactory, observer);
this.physicalStreamDeadlineSeconds = streamObserverFactory.getDeadlineSeconds();
this.executor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
Expand All @@ -124,10 +143,6 @@ protected AbstractWindmillStream(
this.finishLatch = new CountDownLatch(1);
this.logger = logger;
this.requestObserver = new ResettableThrowingStreamObserver<>(logger);
this.requestObserverFactory =
() ->
streamObserverFactory.from(
clientFactory, new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver());
this.sleeper = Sleeper.DEFAULT;
this.debugMetrics = StreamDebugMetrics.create();
}
Expand All @@ -138,19 +153,45 @@ private static String createThreadName(String streamType, String backendWorkerTo
: String.format("%s-WindmillStream-thread", streamType);
}

/** Called on each response from the server. */
protected abstract void onResponse(ResponseT response);
/** Represents a physical grpc stream that is part of the logical windmill stream. */
protected abstract class PhysicalStreamHandler {

/** Called when a new underlying stream to the server has been opened. */
protected abstract void onNewStream() throws WindmillStreamShutdownException;
/** Called on each response from the server. */
public abstract void onResponse(ResponseT response);

/** Returns whether there are any pending requests that should be retried on a stream break. */
public abstract boolean hasPendingRequests();

/** Returns whether there are any pending requests that should be retried on a stream break. */
protected abstract boolean hasPendingRequests();
/**
* Called when the physical stream has finished. For streams with requests that should be
* retried, requests should be moved to parent state so that it is captured by the next
* flushPendingToStream call.
*/
public abstract void onDone(Status status);

/**
* Renders information useful for debugging as html.
*
* @implNote Don't require synchronization on AbstractWindmillStream.this, see the {@link
* #appendSummaryHtml(PrintWriter)} comment.
*/
public abstract void appendHtml(PrintWriter writer);

private final StreamDebugMetrics streamDebugMetrics = StreamDebugMetrics.create();
}

protected abstract PhysicalStreamHandler newResponseHandler();

protected abstract void onNewStream() throws WindmillStreamShutdownException;

/** Try to send a request to the server. Returns true if the request was successfully sent. */
@CanIgnoreReturnValue
protected final synchronized boolean trySend(RequestT request)
throws WindmillStreamShutdownException {
if (currentPhysicalStream == null) {
return false;
}
currentPhysicalStream.streamDebugMetrics.recordSend();
debugMetrics.recordSend();
try {
requestObserver.onNext(request);
Expand Down Expand Up @@ -182,10 +223,14 @@ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
@NonNull PhysicalStreamHandler streamHandler = newResponseHandler();
try {
synchronized (this) {
debugMetrics.recordStart();
requestObserver.reset(requestObserverFactory.get());
streamHandler.streamDebugMetrics.recordStart();
currentPhysicalStream = streamHandler;
currentPhysicalStreamForDebug.set(currentPhysicalStream);
requestObserver.reset(physicalStreamFactory.apply(new ResponseObserver(streamHandler)));
onNewStream();
if (clientClosed) {
halfClose();
Expand Down Expand Up @@ -272,6 +317,23 @@ public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
*/
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);

@Nullable PhysicalStreamHandler currentHandler = currentPhysicalStreamForDebug.get();
if (currentHandler != null) {
writer.format("Physical stream: ");
currentHandler.appendHtml(writer);
StreamDebugMetrics.Snapshot summaryMetrics =
currentHandler.streamDebugMetrics.getSummaryMetrics();
if (summaryMetrics.isClientClosed()) {
writer.write(" client closed");
}
writer.format(
" current stream is %dms old, last send %dms, last response %dms\n",
summaryMetrics.streamAge(),
summaryMetrics.timeSinceLastSend(),
summaryMetrics.timeSinceLastResponse());
}

StreamDebugMetrics.Snapshot summaryMetrics = debugMetrics.getSummaryMetrics();
summaryMetrics
.restartMetrics()
Expand Down Expand Up @@ -304,6 +366,8 @@ public final void appendSummaryHtml(PrintWriter writer) {
}

/**
* Add specific debug state for the logical stream.
*
* @implNote Don't require synchronization on stream, see the {@link
* #appendSummaryHtml(PrintWriter)} comment.
*/
Expand All @@ -315,6 +379,9 @@ public final synchronized void halfClose() {
debugMetrics.recordHalfClose();
clientClosed = true;
try {
if (currentPhysicalStream != null) {
currentPhysicalStream.streamDebugMetrics.recordHalfClose();
}
requestObserver.onCompleted();
} catch (ResettableThrowingStreamObserver.StreamClosedException e) {
logger.warn("Stream was previously closed.");
Expand Down Expand Up @@ -354,11 +421,17 @@ public final void shutdown() {
}
}

protected abstract void shutdownInternal();
protected synchronized void shutdownInternal() {}

/** Returns true if the stream was torn down and should not be restarted internally. */
private synchronized boolean maybeTearDownStream() {
if (isShutdown || (clientClosed && !hasPendingRequests())) {
private synchronized boolean maybeTearDownStream(PhysicalStreamHandler doneStream) {
if (clientClosed && !doneStream.hasPendingRequests()) {
shutdown();
}

if (isShutdown) {
// Once we have background closing physicalStreams we will need to improve this to wait for
// all of the work of the logical stream to be complete.
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
Expand All @@ -369,23 +442,49 @@ private synchronized boolean maybeTearDownStream() {
}

private class ResponseObserver implements StreamObserver<ResponseT> {
private final PhysicalStreamHandler handler;

ResponseObserver(PhysicalStreamHandler handler) {
this.handler = handler;
}

@Override
public void onNext(ResponseT response) {
backoff.reset();
debugMetrics.recordResponse();
onResponse(response);
handler.streamDebugMetrics.recordResponse();
handler.onResponse(response);
}

@Override
public void onError(Throwable t) {
if (maybeTearDownStream()) {
return;
}
executeSafely(() -> onPhysicalStreamCompletion(Status.fromThrowable(t), handler));
}

Status errorStatus = Status.fromThrowable(t);
recordStreamStatus(errorStatus);
@Override
public void onCompleted() {
executeSafely(() -> onPhysicalStreamCompletion(OK_STATUS, handler));
}
}

@SuppressWarnings("nullness")
private void clearPhysicalStreamForDebug() {
currentPhysicalStreamForDebug.set(null);
}

private void onPhysicalStreamCompletion(Status status, PhysicalStreamHandler handler) {
synchronized (this) {
if (currentPhysicalStream == handler) {
clearPhysicalStreamForDebug();
currentPhysicalStream = null;
}
}
handler.onDone(status);
if (maybeTearDownStream(handler)) {
return;
}
// Backoff on errors.;
if (!status.isOk()) {
try {
long sleep = backoff.nextBackOffMillis();
debugMetrics.recordSleep(sleep);
Expand All @@ -394,54 +493,43 @@ public void onError(Throwable t) {
Thread.currentThread().interrupt();
return;
}

executeSafely(AbstractWindmillStream.this::startStream);
}

@Override
public void onCompleted() {
if (maybeTearDownStream()) {
return;
}
recordStreamStatus(OK_STATUS);
executeSafely(AbstractWindmillStream.this::startStream);
}
recordStreamRestart(status);
startStream();
}

private void recordStreamStatus(Status status) {
int currentRestartCount = debugMetrics.incrementAndGetRestarts();
if (status.isOk()) {
String restartReason =
"Stream completed successfully but did not complete requested operations, "
+ "recreating";
logger.warn(restartReason);
debugMetrics.recordRestartReason(restartReason);
} else {
int currentErrorCount = debugMetrics.incrementAndGetErrors();
debugMetrics.recordRestartReason(status.toString());
Throwable t = status.getCause();
if (t instanceof StreamObserverCancelledException) {
logger.error(
"StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}",
getClass(),
backendWorkerToken,
t.getStackTrace(),
t);
} else if (currentRestartCount % logEveryNStreamFailures == 0) {
// Don't log every restart since it will get noisy, and many errors transient.
long nowMillis = Instant.now().getMillis();
logger.debug(
"{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}"
+ " with status: {}. created {}ms ago; {}. This is normal with autoscaling.",
AbstractWindmillStream.this.getClass(),
currentRestartCount,
currentErrorCount,
t,
status,
nowMillis - debugMetrics.getStartTimeMs(),
debugMetrics
.responseDebugString(nowMillis)
.orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
}
private void recordStreamRestart(Status status) {
int currentRestartCount = debugMetrics.incrementAndGetRestarts();
if (status.isOk()) {
String restartReason =
"Stream completed successfully but did not complete requested operations, "
+ "recreating";
logger.warn(restartReason);
debugMetrics.recordRestartReason(restartReason);
} else {
int currentErrorCount = debugMetrics.incrementAndGetErrors();
debugMetrics.recordRestartReason(status.toString());
Throwable t = status.getCause();
if (t instanceof StreamObserverCancelledException) {
logger.error(
"StreamObserver was unexpectedly cancelled for stream={}, worker={}. stacktrace={}",
getClass(),
backendWorkerToken,
t.getStackTrace(),
t);
} else if (currentRestartCount % logEveryNStreamFailures == 0) {
// Don't log every restart since it will get noisy, and many errors transient.
long nowMillis = Instant.now().getMillis();
logger.debug(
"{} has been restarted {} times. Streaming Windmill RPC Error Count: {}; last was: {}"
+ " with status: {}. created {}ms ago; {}. This is normal with autoscaling.",
AbstractWindmillStream.this.getClass(),
currentRestartCount,
currentErrorCount,
t,
status,
nowMillis - debugMetrics.getStartTimeMs(),
debugMetrics.responseDebugString(nowMillis).orElse(NEVER_RECEIVED_RESPONSE_LOG_STRING));
}
}
}
Expand Down
Loading
Loading