Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.io.PrintWriter;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -79,7 +85,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
protected final Sleeper sleeper;

private final Logger logger;
private final ExecutorService executor;
private final ScheduledExecutorService executor;
private final BackOff backoff;
private final CountDownLatch finishLatch;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
Expand All @@ -89,6 +95,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final Function<StreamObserver<ResponseT>, TerminatingStreamObserver<RequestT>>
physicalStreamFactory;
protected final long physicalStreamDeadlineSeconds;
private final Duration halfClosePhysicalStreamAfter;
private final ResettableThrowingStreamObserver<RequestT> requestObserver;

private final StreamDebugMetrics debugMetrics;
Expand All @@ -106,6 +113,14 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
@GuardedBy("this")
protected @Nullable PhysicalStreamHandler currentPhysicalStream;

@GuardedBy("this")
@Nullable
Future<?> halfCloseFuture = null;

// Physical streams that have been half-closed and are waiting for responses or stream failure.
@GuardedBy("this")
protected final Set<PhysicalStreamHandler> closingPhysicalStreams;

// Generally the same as currentPhysicalStream, set under synchronization of this but can be read
// without.
private final AtomicReference<PhysicalStreamHandler> currentPhysicalStreamForDebug =
Expand All @@ -114,6 +129,9 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
@GuardedBy("this")
private boolean started;

// If halfClosePhysicalStream is non-zero, substreams created for the logical
// AbstractWindmillStream
// will be half-closed and a new physical stream will be created after this duraction.
protected AbstractWindmillStream(
Logger logger,
String debugStreamType,
Expand All @@ -122,13 +140,23 @@ protected AbstractWindmillStream(
StreamObserverFactory streamObserverFactory,
Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
String backendWorkerToken) {
String backendWorkerToken,
Duration halfClosePhysicalStreamAfter) {
checkArgument(!halfClosePhysicalStreamAfter.isNegative());
this.backendWorkerToken = backendWorkerToken;
this.physicalStreamFactory =
(StreamObserver<ResponseT> observer) -> streamObserverFactory.from(clientFactory, observer);
this.physicalStreamDeadlineSeconds = streamObserverFactory.getDeadlineSeconds();
if (!halfClosePhysicalStreamAfter.isZero()
&& halfClosePhysicalStreamAfter.compareTo(Duration.ofSeconds(physicalStreamDeadlineSeconds))
>= 0) {
logger.debug("Not attempting to half-close cleanly as stream deadline is shorter.");
halfClosePhysicalStreamAfter = Duration.ZERO;
}
this.halfClosePhysicalStreamAfter = halfClosePhysicalStreamAfter;
this.closingPhysicalStreams = new HashSet<>();
this.executor =
Executors.newSingleThreadExecutor(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(createThreadName(debugStreamType, backendWorkerToken))
Expand Down Expand Up @@ -182,7 +210,8 @@ protected abstract class PhysicalStreamHandler {

protected abstract PhysicalStreamHandler newResponseHandler();

protected abstract void onNewStream() throws WindmillStreamShutdownException;
protected abstract void onFlushPending(boolean isNewStream)
throws WindmillStreamShutdownException;

/** Try to send a request to the server. Returns true if the request was successfully sent. */
@CanIgnoreReturnValue
Expand Down Expand Up @@ -214,26 +243,34 @@ public final void start() {
}

if (shouldStartStream) {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
startStream();
}
}

/** Starts the underlying stream. */
private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
@NonNull PhysicalStreamHandler streamHandler = newResponseHandler();
try {
synchronized (this) {
checkState(currentPhysicalStream == null, "Overwriting existing physical stream");
checkState(halfCloseFuture == null, "Unexpected half-close future");
debugMetrics.recordStart();
streamHandler.streamDebugMetrics.recordStart();
currentPhysicalStream = streamHandler;
currentPhysicalStreamForDebug.set(currentPhysicalStream);
requestObserver.reset(physicalStreamFactory.apply(new ResponseObserver(streamHandler)));
onNewStream();
onFlushPending(true);
if (clientClosed) {
halfClose();
} else if (!halfClosePhysicalStreamAfter.isZero()) {
halfCloseFuture =
executor.schedule(
() -> onHalfClosePhysicalStreamTimeout(streamHandler),
halfClosePhysicalStreamAfter.getSeconds(),
TimeUnit.SECONDS);
}
return;
}
Expand All @@ -243,8 +280,17 @@ private void startStream() {
break;
} catch (Exception e) {
logger.error("Failed to create new stream, retrying: ", e);
synchronized (this) {
currentPhysicalStream = null;
if (halfCloseFuture != null) {
halfCloseFuture.cancel(false);
halfCloseFuture = null;
}
clearPhysicalStreamForDebug();
}
try {
long sleep = backoff.nextBackOffMillis();
debugMetrics.recordRestartReason("Failed to create new stream, retrying: " + e);
debugMetrics.recordSleep(sleep);
sleeper.sleep(sleep);
} catch (InterruptedException ie) {
Expand Down Expand Up @@ -375,7 +421,12 @@ public final void appendSummaryHtml(PrintWriter writer) {

@Override
public final synchronized void halfClose() {
// Synchronization of close and onCompleted necessary for correct retry logic in onNewStream.
if (clientClosed) {
logger.warn("Stream was previously closed.");
return;
}
// Synchronization of close and onCompleted necessary for correct retry logic in
// onPhysicalStreamCompleted.
debugMetrics.recordHalfClose();
clientClosed = true;
try {
Expand Down Expand Up @@ -423,24 +474,6 @@ public final void shutdown() {

protected synchronized void shutdownInternal() {}

/** Returns true if the stream was torn down and should not be restarted internally. */
private synchronized boolean maybeTearDownStream(PhysicalStreamHandler doneStream) {
if (clientClosed && !doneStream.hasPendingRequests()) {
shutdown();
}

if (isShutdown) {
// Once we have background closing physicalStreams we will need to improve this to wait for
// all of the work of the logical stream to be complete.
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
return true;
}

return false;
}

private class ResponseObserver implements StreamObserver<ResponseT> {
private final PhysicalStreamHandler handler;

Expand Down Expand Up @@ -472,17 +505,74 @@ private void clearPhysicalStreamForDebug() {
currentPhysicalStreamForDebug.set(null);
}

private void onHalfClosePhysicalStreamTimeout(PhysicalStreamHandler handler) {
synchronized (this) {
if (currentPhysicalStream != handler || clientClosed || isShutdown) {
return;
}
try {
handler.streamDebugMetrics.recordHalfClose();
closingPhysicalStreams.add(handler);
currentPhysicalStream = null;
halfCloseFuture = null; // This is the currently running future.
clearPhysicalStreamForDebug();
requestObserver.onCompleted();
} catch (Exception e) {
// XXX figure out
}
try {
@NonNull PhysicalStreamHandler streamHandler = newResponseHandler();
Copy link
Contributor

@arunpandianp arunpandianp Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call startStream() here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to do it in the synchronized block. we also want to do it after the handling of the error in case that will trigger new messages to be sent on the new stream.

streamHandler.streamDebugMetrics.recordStart();
currentPhysicalStream = streamHandler;
currentPhysicalStreamForDebug.set(currentPhysicalStream);
requestObserver.reset(physicalStreamFactory.apply(new ResponseObserver(streamHandler)));
onFlushPending(true);
} catch (Exception e) {
// XXX figure out
}
}
}

private void onPhysicalStreamCompletion(Status status, PhysicalStreamHandler handler) {
synchronized (this) {
final boolean wasActiveStream = currentPhysicalStream == handler;
if (currentPhysicalStream == handler) {
clearPhysicalStreamForDebug();
currentPhysicalStream = null;
if (halfCloseFuture != null) {
halfCloseFuture.cancel(false);
halfCloseFuture = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these steps are repeated in a few places, consider moving them to a function like clearPhysicalStream()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
} else {
checkState(closingPhysicalStreams.remove(handler));
}
handler.onDone(status);
if (wasActiveStream
&& clientClosed
&& !handler.hasPendingRequests()
&& closingPhysicalStreams.stream().noneMatch(PhysicalStreamHandler::hasPendingRequests)) {
shutdown();
}
if (isShutdown && currentPhysicalStream == null && closingPhysicalStreams.isEmpty()) {
logger.debug("Completing shutdown of stream after shutdown and all streams terminated.");
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
executor.shutdownNow();
return;
}
if (currentPhysicalStream != null) {
try {
onFlushPending(false);
} catch (WindmillStreamShutdownException e) {
// XXX figure out
}
return;
}
if (!wasActiveStream) {
return;
}
}
handler.onDone(status);
if (maybeTearDownStream(handler)) {
return;
}

// Backoff on errors.;
if (!status.isOk()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ public interface WindmillStream {
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
void setBudget(GetWorkBudget newBudget);

default void setBudget(long newItems, long newBytes) {
setBudget(GetWorkBudget.builder().setItems(newItems).setBytes(newBytes).build());
}
}

/** Interface for streaming GetDataRequests to Windmill. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.auto.value.AutoValue;
import java.io.PrintWriter;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -85,7 +86,8 @@ private GrpcCommitWorkStream(
int logEveryNStreamFailures,
JobHeader jobHeader,
AtomicLong idGenerator,
int streamingRpcBatchLimit) {
int streamingRpcBatchLimit,
Duration halfClosePhysicalStreamAfter) {
super(
LOG,
"CommitWorkStream",
Expand All @@ -94,7 +96,8 @@ private GrpcCommitWorkStream(
streamObserverFactory,
streamRegistry,
logEveryNStreamFailures,
backendWorkerToken);
backendWorkerToken,
halfClosePhysicalStreamAfter);
this.idGenerator = idGenerator;
this.jobHeader = jobHeader;
this.streamingRpcBatchLimit = streamingRpcBatchLimit;
Expand All @@ -110,7 +113,8 @@ static GrpcCommitWorkStream create(
int logEveryNStreamFailures,
JobHeader jobHeader,
AtomicLong idGenerator,
int streamingRpcBatchLimit) {
int streamingRpcBatchLimit,
Duration halfClosePhysicalStreamAfter) {
return new GrpcCommitWorkStream(
backendWorkerToken,
startCommitWorkRpcFn,
Expand All @@ -120,7 +124,8 @@ static GrpcCommitWorkStream create(
logEveryNStreamFailures,
jobHeader,
idGenerator,
streamingRpcBatchLimit);
streamingRpcBatchLimit,
halfClosePhysicalStreamAfter);
}

@Override
Expand All @@ -129,16 +134,21 @@ public void appendSpecificHtml(PrintWriter writer) {
}

@Override
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
trySend(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
protected synchronized void onFlushPending(boolean isNewStream)
throws WindmillStreamShutdownException {
if (isNewStream) {
trySend(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
}
// Flush all pending requests that are no longer on active streams.
try (Batcher resendBatcher = new Batcher()) {
for (Map.Entry<Long, StreamAndRequest> entry : pending.entrySet()) {
CommitWorkPhysicalStreamHandler requestHandler = entry.getValue().handler;
checkState(requestHandler != currentPhysicalStream);
// When we have streams closing in the background we should avoid retrying the requests
// active on those streams.

if (requestHandler != null && closingPhysicalStreams.contains(requestHandler)) {
LOG.debug(
"Not resending request that is active on background half-closing physical stream.");
continue;
}
long id = entry.getKey();
PendingRequest request = entry.getValue().request;
if (!resendBatcher.canAccept(request.getBytes())) {
Expand Down
Loading
Loading