Skip to content

Commit 5d658c1

Browse files
authored
[Dataflow Streaming] Improve logs (#37152)
1 parent c0fe932 commit 5d658c1

File tree

3 files changed

+15
-14
lines changed

3 files changed

+15
-14
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,8 @@ public void invalidateCache() {
409409
try {
410410
activeReader.close();
411411
} catch (IOException e) {
412-
LOG.warn("Failed to close reader for {}-{}", computationId, key.toStringUtf8(), e);
412+
LOG.warn(
413+
"Failed to close reader for {}-{}", computationId, getWorkItem().getShardingKey(), e);
413414
}
414415
}
415416
activeReader = null;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ public static KeyCommitTooLargeException causedBy(
2626
StringBuilder message = new StringBuilder();
2727
message.append("Commit request for stage ");
2828
message.append(computationId);
29-
message.append(" and key ");
30-
message.append(request.getKey().toStringUtf8());
29+
message.append(" and sharding key ");
30+
message.append(request.getShardingKey());
3131
if (request.getSerializedSize() > 0) {
3232
message.append(
3333
" has size "

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,13 @@ private boolean shouldRetryLocally(String computationId, Work work, Throwable t)
136136
Throwable parsedException = (t instanceof UserCodeException && cause != null) ? cause : t;
137137
if (KeyTokenInvalidException.isKeyTokenInvalidException(parsedException)) {
138138
LOG.debug(
139-
"Execution of work for computation '{}' on key '{}' failed due to token expiration. "
139+
"Execution of work for computation '{}' on sharding key '{}' failed due to token expiration. "
140140
+ "Work will not be retried locally.",
141141
computationId,
142-
work.getWorkItem().getKey().toStringUtf8());
142+
work.getWorkItem().getShardingKey());
143143
} else if (WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
144144
LOG.debug(
145-
"Execution of work for computation '{}' on key '{}' failed. "
145+
"Execution of work for computation '{}' on sharding key '{}' failed. "
146146
+ "Work will not be retried locally.",
147147
computationId,
148148
work.getWorkItem().getShardingKey());
@@ -152,36 +152,36 @@ private boolean shouldRetryLocally(String computationId, Work work, Throwable t)
152152
Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), clock.get());
153153
if (!failureTracker.trackFailure(computationId, work.getWorkItem(), parsedException)) {
154154
LOG.error(
155-
"Execution of work for computation '{}' on key '{}' failed with uncaught exception, "
155+
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception, "
156156
+ "and Windmill indicated not to retry locally.",
157157
computationId,
158-
work.getWorkItem().getKey().toStringUtf8(),
158+
work.getWorkItem().getShardingKey(),
159159
parsedException);
160160
} else if (isOutOfMemoryError(parsedException)) {
161161
String heapDump = tryToDumpHeap();
162162
LOG.error(
163-
"Execution of work for computation '{}' for key '{}' failed with out-of-memory. "
163+
"Execution of work for computation '{}' for sharding key '{}' failed with out-of-memory. "
164164
+ "Work will not be retried locally. Heap dump {}.",
165165
computationId,
166-
work.getWorkItem().getKey().toStringUtf8(),
166+
work.getWorkItem().getShardingKey(),
167167
heapDump,
168168
parsedException);
169169
} else if (elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
170170
LOG.error(
171-
"Execution of work for computation '{}' for key '{}' failed with uncaught exception, "
171+
"Execution of work for computation '{}' for sharding key '{}' failed with uncaught exception, "
172172
+ "and it will not be retried locally because the elapsed time since start {} "
173173
+ "exceeds {}.",
174174
computationId,
175-
work.getWorkItem().getKey().toStringUtf8(),
175+
work.getWorkItem().getShardingKey(),
176176
elapsedTimeSinceStart,
177177
MAX_LOCAL_PROCESSING_RETRY_DURATION,
178178
parsedException);
179179
} else {
180180
LOG.error(
181-
"Execution of work for computation '{}' on key '{}' failed with uncaught exception. "
181+
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception. "
182182
+ "Work will be retried locally.",
183183
computationId,
184-
work.getWorkItem().getKey().toStringUtf8(),
184+
work.getWorkItem().getShardingKey(),
185185
parsedException);
186186
return true;
187187
}

0 commit comments

Comments
 (0)