Skip to content
Merged
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, Operation>,
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsAlreadyExists");
private final Counter flushOperationsInvalidArgument =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsInvalidArgument");
private final Counter flushOperationsOffsetBeyondEnd =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsOffsetBeyondEnd");
private final Distribution flushLatencyDistribution =
Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class, "flushOperationLatencyMs");
private final Counter finalizeOperationsSent =
Expand All @@ -70,6 +72,59 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, Operation>,
private final Counter finalizeOperationsFailed =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsFailed");

/** Custom exception to indicate that a stream is invalid due to an offset error. */
public static class StreamOffsetBeyondEndException extends IOException {
public StreamOffsetBeyondEndException(String message, Throwable cause) {
super(message, cause);
}
}

/**
* Checks if the given throwable indicates that an offset is beyond the end of a BigQuery stream.
* It primarily uses {@code io.grpc.Status.fromThrowable} to determine the gRPC status code and
* then checks for specific message content.
*/
private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
if (t == null) {
return false;
}

// Status.fromThrowable() searches the cause chain for the most specific gRPC status.
io.grpc.Status grpcStatus = io.grpc.Status.fromThrowable(t);

// Check if grpcStatus is valid and the code is OUT_OF_RANGE
if (grpcStatus != null && grpcStatus.getCode() == io.grpc.Status.Code.OUT_OF_RANGE) {
// The gRPC status is OUT_OF_RANGE.
// Now, verify the message content for the specific "is beyond the end of the stream" text.
// This text might be in the grpcStatus's description, or in the message of the original
// throwable 't', or one of its causes.

// Check the description from the derived gRPC status first.
// grpcStatus is confirmed not null here.
String description = grpcStatus.getDescription();
if (description != null
&& description.toLowerCase().contains("is beyond the end of the stream")) {
return true;
}

// If the description didn't match, iterate through the exception chain of 't'
// to find a message that confirms the "offset beyond end of stream" scenario.
Throwable currentThrowable = t;
while (currentThrowable != null) {
String message = currentThrowable.getMessage();
if (message != null && message.toLowerCase().contains("is beyond the end of the stream")) {
// If any exception in the chain has this message, and the overall gRPC status
// (determined by Status.fromThrowable(t)) is OUT_OF_RANGE, we consider it a match.
return true;
}
currentThrowable = currentThrowable.getCause();
}
}
// If grpcStatus was null, or the gRPC status code was not OUT_OF_RANGE,
// or if it was OUT_OF_RANGE but no matching message was found.
return false;
}

@DefaultSchema(JavaFieldSchema.class)
static class Operation implements Comparable<Operation>, Serializable {
final long flushOffset;
Expand Down Expand Up @@ -186,6 +241,20 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, Operati
if (statusCode.equals(Code.NOT_FOUND)) {
return RetryType.DONT_RETRY;
}

// check the offset beyond the end of the stream
if (isOffsetBeyondEndOfStreamError(error)) {
flushOperationsOffsetBeyondEnd.inc();
LOG.warn(
"Flush of stream {} to offset {} failed because the offset is beyond the end of the stream. "
+ "This typically means the stream was finalized or truncated by BQ. "
+ "The operation will not be retried on this stream. Error: {}",
streamId,
offset,
error.toString());
// This specific error is not retriable on the same stream.
return RetryType.DONT_RETRY;
}
}
return RetryType.RETRY_ALL_OPERATIONS;
},
Expand Down
Loading