Skip to content
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, Operation>,
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsAlreadyExists");
private final Counter flushOperationsInvalidArgument =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsInvalidArgument");
private final Counter flushOperationsOffsetBeyondEnd =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsOffsetBeyondEnd");
private final Distribution flushLatencyDistribution =
Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class, "flushOperationLatencyMs");
private final Counter finalizeOperationsSent =
Expand All @@ -70,6 +72,48 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, Operation>,
private final Counter finalizeOperationsFailed =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsFailed");

/** Custom exception to indicate that a stream is invalid due to an offset error. */
public static class StreamOffsetBeyondEndException extends IOException {
public StreamOffsetBeyondEndException(String message, Throwable cause) {
super(message, cause);
}
}

/**
* Checks if the given throwable is or is caused by an ApiException indicating that an offset is
* beyond the end of a BigQuery stream.
*/
private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
if (t == null) {
return false;
}
if (t instanceof ApiException) {
ApiException apiException = (ApiException) t;
if (apiException.getStatusCode().getCode() == Code.OUT_OF_RANGE) {
// Check if the cause is gRPC StatusRuntimeException for more specific message check
Throwable cause = apiException.getCause();
if (cause instanceof io.grpc.StatusRuntimeException) {
io.grpc.StatusRuntimeException grpcException = (io.grpc.StatusRuntimeException) cause;
return grpcException.getStatus().getCode() == io.grpc.Status.Code.OUT_OF_RANGE
&& grpcException.getMessage() != null
&& grpcException
.getMessage()
.toLowerCase()
.contains("is beyond the end of the stream");
}
// Fallback to checking the ApiException message directly if cause is not gRPC
return apiException.getMessage() != null
&& apiException.getMessage().toLowerCase().contains("is beyond the end of the stream");
}
}
// Recursively check the cause, as the specific exception might be wrapped.
Throwable cause = t.getCause();
if (cause == null) {
return false;
}
return isOffsetBeyondEndOfStreamError(cause);
}

@DefaultSchema(JavaFieldSchema.class)
static class Operation implements Comparable<Operation>, Serializable {
final long flushOffset;
Expand Down Expand Up @@ -170,6 +214,29 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, Operati
BigQuerySinkMetrics.reportFailedRPCMetrics(
failedContext, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);

if (error != null && isOffsetBeyondEndOfStreamError(error)) {
flushOperationsOffsetBeyondEnd.inc();
LOG.warn(
"Flush of stream {} to offset {} failed because the offset is beyond the end of the stream. "
+ "This typically means the stream was finalized or truncated by BQ. "
+ "The operation will not be retried on this stream. Error: {}",
streamId,
offset,
error.toString());
// This specific error is not retriable on the same stream.
// Throwing a runtime exception to break out of the RetryManager and signal
// to the Beam runner that the bundle should be retried, which will then
// allow an upstream DoFn to create a new stream.
throw new RuntimeException(
new StreamOffsetBeyondEndException(
"Offset "
+ offset
+ " is beyond the end of stream "
+ streamId
+ ". Stream is considered invalid for further appends.",
error));
}

if (error instanceof ApiException) {
Code statusCode = ((ApiException) error).getStatusCode().getCode();
if (statusCode.equals(Code.ALREADY_EXISTS)) {
Expand Down
Loading