Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ public void invalidateCache() {
try {
activeReader.close();
} catch (IOException e) {
LOG.warn("Failed to close reader for {}-{}", computationId, key.toStringUtf8(), e);
LOG.warn(
"Failed to close reader for {}-{}", computationId, getWorkItem().getShardingKey(), e);
}
}
activeReader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public static KeyCommitTooLargeException causedBy(
StringBuilder message = new StringBuilder();
message.append("Commit request for stage ");
message.append(computationId);
message.append(" and key ");
message.append(request.getKey().toStringUtf8());
message.append(" and sharding key ");
message.append(request.getShardingKey());
if (request.getSerializedSize() > 0) {
message.append(
" has size "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ private boolean shouldRetryLocally(String computationId, Work work, Throwable t)
Throwable parsedException = (t instanceof UserCodeException && cause != null) ? cause : t;
if (KeyTokenInvalidException.isKeyTokenInvalidException(parsedException)) {
LOG.debug(
"Execution of work for computation '{}' on key '{}' failed due to token expiration. "
"Execution of work for computation '{}' on sharding key '{}' failed due to token expiration. "
+ "Work will not be retried locally.",
computationId,
work.getWorkItem().getKey().toStringUtf8());
work.getWorkItem().getShardingKey());
} else if (WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
LOG.debug(
"Execution of work for computation '{}' on key '{}' failed. "
"Execution of work for computation '{}' on sharding key '{}' failed. "
+ "Work will not be retried locally.",
computationId,
work.getWorkItem().getShardingKey());
Expand All @@ -152,36 +152,36 @@ private boolean shouldRetryLocally(String computationId, Work work, Throwable t)
Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), clock.get());
if (!failureTracker.trackFailure(computationId, work.getWorkItem(), parsedException)) {
LOG.error(
"Execution of work for computation '{}' on key '{}' failed with uncaught exception, "
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception, "
+ "and Windmill indicated not to retry locally.",
computationId,
work.getWorkItem().getKey().toStringUtf8(),
work.getWorkItem().getShardingKey(),
parsedException);
} else if (isOutOfMemoryError(parsedException)) {
String heapDump = tryToDumpHeap();
LOG.error(
"Execution of work for computation '{}' for key '{}' failed with out-of-memory. "
"Execution of work for computation '{}' for sharding key '{}' failed with out-of-memory. "
+ "Work will not be retried locally. Heap dump {}.",
computationId,
work.getWorkItem().getKey().toStringUtf8(),
work.getWorkItem().getShardingKey(),
heapDump,
parsedException);
} else if (elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
LOG.error(
"Execution of work for computation '{}' for key '{}' failed with uncaught exception, "
"Execution of work for computation '{}' for sharding key '{}' failed with uncaught exception, "
+ "and it will not be retried locally because the elapsed time since start {} "
+ "exceeds {}.",
computationId,
work.getWorkItem().getKey().toStringUtf8(),
work.getWorkItem().getShardingKey(),
elapsedTimeSinceStart,
MAX_LOCAL_PROCESSING_RETRY_DURATION,
parsedException);
} else {
LOG.error(
"Execution of work for computation '{}' on key '{}' failed with uncaught exception. "
"Execution of work for computation '{}' on sharding key '{}' failed with uncaught exception. "
+ "Work will be retried locally.",
computationId,
work.getWorkItem().getKey().toStringUtf8(),
work.getWorkItem().getShardingKey(),
parsedException);
return true;
}
Expand Down
Loading