Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ public interface WindmillStream {
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
void setBudget(GetWorkBudget newBudget);

default void setBudget(long newItems, long newBytes) {
setBudget(GetWorkBudget.builder().setItems(newItems).setBytes(newBytes).build());
}
}

/** Interface for streaming GetDataRequests to Windmill. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import com.google.auto.value.AutoValue;
import java.io.PrintWriter;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -85,16 +87,19 @@ private GrpcCommitWorkStream(
int logEveryNStreamFailures,
JobHeader jobHeader,
AtomicLong idGenerator,
int streamingRpcBatchLimit) {
int streamingRpcBatchLimit,
Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executor) {
super(
LOG,
"CommitWorkStream",
startCommitWorkRpcFn,
backoff,
streamObserverFactory,
streamRegistry,
logEveryNStreamFailures,
backendWorkerToken);
backendWorkerToken,
halfClosePhysicalStreamAfter,
executor);
this.idGenerator = idGenerator;
this.jobHeader = jobHeader;
this.streamingRpcBatchLimit = streamingRpcBatchLimit;
Expand All @@ -110,7 +115,9 @@ static GrpcCommitWorkStream create(
int logEveryNStreamFailures,
JobHeader jobHeader,
AtomicLong idGenerator,
int streamingRpcBatchLimit) {
int streamingRpcBatchLimit,
Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executor) {
return new GrpcCommitWorkStream(
backendWorkerToken,
startCommitWorkRpcFn,
Expand All @@ -120,25 +127,33 @@ static GrpcCommitWorkStream create(
logEveryNStreamFailures,
jobHeader,
idGenerator,
streamingRpcBatchLimit);
streamingRpcBatchLimit,
halfClosePhysicalStreamAfter,
executor);
}

@Override
public void appendSpecificHtml(PrintWriter writer) {
writer.format("CommitWorkStream: %d pending", pending.size());
writer.format("CommitWorkStream: %d pending ", pending.size());
}

@Override
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
trySend(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
@SuppressWarnings("ReferenceEquality")
protected synchronized void onFlushPending(boolean isNewStream)
throws WindmillStreamShutdownException {
if (isNewStream) {
trySend(StreamingCommitWorkRequest.newBuilder().setHeader(jobHeader).build());
}
// Flush all pending requests that are no longer on active streams.
try (Batcher resendBatcher = new Batcher()) {
for (Map.Entry<Long, StreamAndRequest> entry : pending.entrySet()) {
CommitWorkPhysicalStreamHandler requestHandler = entry.getValue().handler;
checkState(requestHandler != currentPhysicalStream);
// When we have streams closing in the background we should avoid retrying the requests
// active on those streams.

if (requestHandler != null && closingPhysicalStreams.contains(requestHandler)) {
LOG.debug(
"Not resending request that is active on background half-closing physical stream.");
continue;
}
long id = entry.getKey();
PendingRequest request = entry.getValue().request;
if (!resendBatcher.canAccept(request.getBytes())) {
Expand Down Expand Up @@ -169,6 +184,7 @@ protected synchronized void sendHealthCheck() throws WindmillStreamShutdownExcep

private class CommitWorkPhysicalStreamHandler extends PhysicalStreamHandler {
@Override
@SuppressWarnings("ReferenceEquality")
public void onResponse(StreamingCommitResponse response) {
CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler();
for (int i = 0; i < response.getRequestIdCount(); ++i) {
Expand Down Expand Up @@ -206,6 +222,7 @@ public void onResponse(StreamingCommitResponse response) {
}

@Override
@SuppressWarnings("ReferenceEquality")
public boolean hasPendingRequests() {
return pending.entrySet().stream().anyMatch(e -> e.getValue().handler == this);
}
Expand All @@ -218,6 +235,7 @@ public void onDone(Status status) {
}

@Override
@SuppressWarnings("ReferenceEquality")
public void appendHtml(PrintWriter writer) {
writer.format(
"CommitWorkStream: %d pending",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.PrintWriter;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -98,16 +100,19 @@ private GrpcDirectGetWorkStream(
HeartbeatSender heartbeatSender,
GetDataClient getDataClient,
WorkCommitter workCommitter,
WorkItemScheduler workItemScheduler) {
WorkItemScheduler workItemScheduler,
Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executorService) {
super(
LOG,
"GetWorkStream",
startGetWorkRpcFn,
backoff,
streamObserverFactory,
streamRegistry,
logEveryNStreamFailures,
backendWorkerToken);
backendWorkerToken,
halfClosePhysicalStreamAfter,
executorService);
this.requestHeader = requestHeader;
this.workItemScheduler = workItemScheduler;
this.heartbeatSender = heartbeatSender;
Expand Down Expand Up @@ -138,7 +143,9 @@ static GrpcDirectGetWorkStream create(
HeartbeatSender heartbeatSender,
GetDataClient getDataClient,
WorkCommitter workCommitter,
WorkItemScheduler workItemScheduler) {
WorkItemScheduler workItemScheduler,
Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executor) {
return new GrpcDirectGetWorkStream(
backendWorkerToken,
startGetWorkRpcFn,
Expand All @@ -151,7 +158,9 @@ static GrpcDirectGetWorkStream create(
heartbeatSender,
getDataClient,
workCommitter,
workItemScheduler);
workItemScheduler,
halfClosePhysicalStreamAfter,
executor);
}

private static Watermarks createWatermarks(
Expand Down Expand Up @@ -230,7 +239,11 @@ protected PhysicalStreamHandler newResponseHandler() {
}

@Override
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
protected synchronized void onFlushPending(boolean isNewStream)
throws WindmillStreamShutdownException {
if (!isNewStream) {
return;
}
budgetTracker.reset();
GetWorkBudget initialGetWorkBudget = budgetTracker.computeBudgetExtension();
StreamingGetWorkRequest request =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -112,16 +113,19 @@ private GrpcGetDataStream(
AtomicLong idGenerator,
int streamingRpcBatchLimit,
boolean sendKeyedGetDataRequests,
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses,
java.time.Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executorService) {
super(
LOG,
"GetDataStream",
startGetDataRpcFn,
backoff,
streamObserverFactory,
streamRegistry,
logEveryNStreamFailures,
backendWorkerToken);
backendWorkerToken,
halfClosePhysicalStreamAfter,
executorService);
this.idGenerator = idGenerator;
this.jobHeader = jobHeader;
this.streamingRpcBatchLimit = streamingRpcBatchLimit;
Expand All @@ -146,7 +150,9 @@ static GrpcGetDataStream create(
AtomicLong idGenerator,
int streamingRpcBatchLimit,
boolean sendKeyedGetDataRequests,
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses,
java.time.Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executor) {
return new GrpcGetDataStream(
backendWorkerToken,
startGetDataRpcFn,
Expand All @@ -158,7 +164,9 @@ static GrpcGetDataStream create(
idGenerator,
streamingRpcBatchLimit,
sendKeyedGetDataRequests,
processHeartbeatResponses);
processHeartbeatResponses,
halfClosePhysicalStreamAfter,
executor);
}

private static WindmillStreamShutdownException shutdownExceptionFor(QueuedBatch batch) {
Expand Down Expand Up @@ -189,7 +197,7 @@ public void sendBatch(QueuedBatch batch) throws WindmillStreamShutdownException
}

if (!trySend(batch.asGetDataRequest())) {
// The stream broke before this call went through; onNewStream will retry the fetch.
// The stream broke before this call went through; onFlushPending will retry the fetch.
LOG.debug("GetData stream broke before call started.");
}
}
Expand Down Expand Up @@ -260,8 +268,11 @@ protected PhysicalStreamHandler newResponseHandler() {
}

@Override
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
protected synchronized void onFlushPending(boolean isNewStream)
throws WindmillStreamShutdownException {
if (isNewStream) {
trySend(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
}
while (!batches.isEmpty()) {
QueuedBatch batch = checkNotNull(batches.peekFirst());
verify(!batch.isEmpty());
Expand Down Expand Up @@ -392,6 +403,12 @@ protected synchronized void shutdownInternal() {
}
currentGetDataStream.pending.clear();
}
for (PhysicalStreamHandler handler : closingPhysicalStreams) {
for (AppendableInputStream ais : ((GetDataPhysicalStreamHandler) handler).pending.values()) {
ais.cancel();
}
((GetDataPhysicalStreamHandler) handler).pending.clear();
}
batches.forEach(
batch -> {
batch.markFinalized();
Expand All @@ -402,7 +419,12 @@ protected synchronized void shutdownInternal() {

@Override
public void appendSpecificHtml(PrintWriter writer) {
writer.format("GetDataStream: %d queued batches", batchesDebugSizeSupplier.get());
int batches = batchesDebugSizeSupplier.get();
if (batches > 0) {
writer.format("GetDataStream: %d queued batches ", batches);
} else {
writer.append("GetDataStream: no queued ");
}
}

private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn)
Expand Down Expand Up @@ -476,10 +498,11 @@ private void queueRequestAndWait(QueuedRequest request)
prevBatch.waitForSendOrFailNotification();
}
trySendBatch(batch);
} else {
// Wait for this batch to be sent before parsing the response.
batch.waitForSendOrFailNotification();
// Since the above send may not succeed, we fall through to block on sending or failure.
}

// Wait for this batch to be sent before parsing the response.
batch.waitForSendOrFailNotification();
}

private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException {
Expand All @@ -494,8 +517,8 @@ private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamS
final @Nullable GetDataPhysicalStreamHandler currentGetDataPhysicalStream =
(GetDataPhysicalStreamHandler) currentPhysicalStream;
if (currentGetDataPhysicalStream == null) {
// Leave the batch finalized but in the batches queue. Finalized batches will be sent on the
// new stream in onNewStream.
// Leave the batch finalized but in the batches queue. Finalized batches will be sent on a
// new stream in onFlushPending.
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;

import java.io.PrintWriter;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
Expand Down Expand Up @@ -68,16 +70,19 @@ private GrpcGetWorkStream(
Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
boolean requestBatchedGetWorkResponse,
WorkItemReceiver receiver) {
WorkItemReceiver receiver,
Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executor) {
super(
LOG,
"GetWorkStream",
startGetWorkRpcFn,
backoff,
streamObserverFactory,
streamRegistry,
logEveryNStreamFailures,
backendWorkerToken);
backendWorkerToken,
halfClosePhysicalStreamAfter,
executor);
this.request = request;
this.receiver = receiver;
this.inflightMessages = new AtomicLong();
Expand All @@ -97,7 +102,9 @@ public static GrpcGetWorkStream create(
Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
boolean requestBatchedGetWorkResponse,
WorkItemReceiver receiver) {
WorkItemReceiver receiver,
Duration halfClosePhysicalStreamAfter,
ScheduledExecutorService executor) {
return new GrpcGetWorkStream(
backendWorkerToken,
startGetWorkRpcFn,
Expand All @@ -107,7 +114,9 @@ public static GrpcGetWorkStream create(
streamRegistry,
logEveryNStreamFailures,
requestBatchedGetWorkResponse,
receiver);
receiver,
halfClosePhysicalStreamAfter,
executor);
}

private void sendRequestExtension(long moreItems, long moreBytes) {
Expand Down Expand Up @@ -163,7 +172,11 @@ protected PhysicalStreamHandler newResponseHandler() {
}

@Override
protected synchronized void onNewStream() throws WindmillStreamShutdownException {
protected synchronized void onFlushPending(boolean isNewStream)
throws WindmillStreamShutdownException {
if (!isNewStream) {
return;
}
inflightMessages.set(request.getMaxItems());
inflightBytes.set(request.getMaxBytes());
trySend(
Expand Down
Loading
Loading