Skip to content

Commit f59a45e

Browse files
authored
[Dataflow Java Streaming] Reduce log spam for direct path mode and improve status page. (#35880)
1 parent a6f63ac commit f59a45e

File tree

4 files changed

+27
-15
lines changed

4 files changed

+27
-15
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private static class StreamAndRequest {
7676
private final AtomicLong idGenerator;
7777
private final JobHeader jobHeader;
7878
private final int streamingRpcBatchLimit;
79+
private volatile boolean logMissingResponse = true;
7980

8081
private GrpcCommitWorkStream(
8182
String backendWorkerToken,
@@ -200,7 +201,9 @@ public void onResponse(StreamingCommitResponse response) {
200201

201202
@Nullable StreamAndRequest entry = pending.remove(requestId);
202203
if (entry == null) {
203-
LOG.error("Got unknown commit request ID: {}", requestId);
204+
if (logMissingResponse) {
205+
LOG.error("Got unknown commit request ID: {}", requestId);
206+
}
204207
continue;
205208
}
206209
if (entry.handler != this) {
@@ -250,6 +253,7 @@ protected PhysicalStreamHandler newResponseHandler() {
250253

251254
@Override
252255
protected synchronized void shutdownInternal() {
256+
logMissingResponse = false;
253257
Iterator<StreamAndRequest> pendingRequests = pending.values().iterator();
254258
while (pendingRequests.hasNext()) {
255259
PendingRequest pendingRequest = pendingRequests.next().request;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.localhostChannel;
2222

2323
import com.google.auto.value.AutoValue;
24+
import java.time.Duration;
25+
import java.time.Instant;
2426
import java.util.List;
2527
import java.util.Random;
2628
import java.util.Set;
@@ -132,25 +134,26 @@ ImmutableSet<HostAndPort> getDispatcherEndpoints() {
132134

133135
/** Will block the calling thread until the initial endpoints are present. */
134136
public CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStubBlocking() {
135-
boolean initialized = false;
136-
long secondsWaited = 0;
137-
while (!initialized) {
138-
LOG.info(
139-
"Blocking until Windmill Service endpoint has been set. "
140-
+ "Currently waited for [{}] seconds.",
141-
secondsWaited);
137+
Instant startTime = Instant.now();
138+
while (true) {
142139
try {
143-
initialized = onInitializedEndpoints.await(10, TimeUnit.SECONDS);
144-
secondsWaited += 10;
140+
if (onInitializedEndpoints.await(10, TimeUnit.SECONDS)) {
141+
break;
142+
}
143+
LOG.info(
144+
"Blocking until Windmill Service endpoint has been set. " + "Currently waited for {}.",
145+
Duration.between(startTime, Instant.now()));
145146
} catch (InterruptedException e) {
146147
LOG.error(
147148
"Interrupted while waiting for initial Windmill Service endpoints. "
148149
+ "These endpoints are required to do any pipeline processing.",
149150
e);
150151
}
151152
}
152-
153-
LOG.info("Windmill Service endpoint initialized after {} seconds.", secondsWaited);
153+
Duration elapsed = Duration.between(startTime, Instant.now());
154+
if (elapsed.getSeconds() >= 5) {
155+
LOG.info("Windmill Service endpoint initialized after {}.", elapsed);
156+
}
154157

155158
ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> windmillMetadataServiceStubs =
156159
dispatcherStubs.get().windmillMetadataServiceStubs();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.errorprone.annotations.concurrent.GuardedBy;
2121
import java.io.PrintWriter;
2222
import java.time.Duration;
23+
import java.time.Instant;
2324
import java.util.Optional;
2425
import java.util.Set;
2526
import java.util.concurrent.ScheduledExecutorService;
@@ -52,6 +53,9 @@ public final class GrpcGetWorkerMetadataStream
5253
@GuardedBy("metadataLock")
5354
private WorkerMetadataResponse latestResponse;
5455

56+
@GuardedBy("metadataLock")
57+
private Instant latestResponseReceived = Instant.EPOCH;
58+
5559
private GrpcGetWorkerMetadataStream(
5660
Function<StreamObserver<WorkerMetadataResponse>, StreamObserver<WorkerMetadataRequest>>
5761
startGetWorkerMetadataRpcFn,
@@ -112,6 +116,7 @@ private Optional<WindmillEndpoints> extractWindmillEndpointsFrom(
112116
synchronized (metadataLock) {
113117
if (response.getMetadataVersion() > latestResponse.getMetadataVersion()) {
114118
this.latestResponse = response;
119+
this.latestResponseReceived = Instant.now();
115120
return Optional.of(WindmillEndpoints.from(response));
116121
} else {
117122
// If the currentMetadataVersion is greater than or equal to one in the response, the
@@ -165,8 +170,8 @@ protected void sendHealthCheck() throws WindmillStreamShutdownException {
165170
protected void appendSpecificHtml(PrintWriter writer) {
166171
synchronized (metadataLock) {
167172
writer.format(
168-
"GetWorkerMetadataStream: job_header=[%s], current_metadata=[%s] ",
169-
workerMetadataRequest.getHeader(), latestResponse);
173+
"GetWorkerMetadataStream: job_header=[%s], current_metadata=[%s] received_at=[%s]",
174+
workerMetadataRequest.getHeader(), latestResponse, latestResponseReceived);
170175
}
171176
}
172177
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ private ExecuteWorkResult executeWork(
425425
// If processing failed due to a thrown exception, close the executionState. Do not
426426
// return/release the executionState back to computationState as that will lead to this
427427
// executionState instance being reused.
428-
LOG.info("Invalidating executor after work item {} failed with Exception:", key, t);
428+
LOG.debug("Invalidating executor after work item {} failed", workItem.getWorkToken(), t);
429429
computationWorkExecutor.invalidate();
430430

431431
// Re-throw the exception, it will be caught and handled by workFailureProcessor downstream.

0 commit comments

Comments
 (0)