diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java index dec86c3360b0..fd3853d15e0f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java @@ -61,6 +61,8 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn, Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsAlreadyExists"); private final Counter flushOperationsInvalidArgument = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsInvalidArgument"); + private final Counter flushOperationsOffsetBeyondEnd = + Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsOffsetBeyondEnd"); private final Distribution flushLatencyDistribution = Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class, "flushOperationLatencyMs"); private final Counter finalizeOperationsSent = @@ -70,6 +72,52 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn, private final Counter finalizeOperationsFailed = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsFailed"); + /** + * Checks if the given throwable indicates that an offset is beyond the end of a BigQuery stream. + * It primarily uses {@code io.grpc.Status.fromThrowable} to determine the gRPC status code and + * then checks for specific message content. + */ + private boolean isOffsetBeyondEndOfStreamError(Throwable t) { + if (t == null) { + return false; + } + + // Status.fromThrowable() searches the cause chain for the most specific gRPC status. + io.grpc.Status grpcStatus = io.grpc.Status.fromThrowable(t); + + // Check if grpcStatus is valid and the code is OUT_OF_RANGE + if (grpcStatus != null && grpcStatus.getCode() == io.grpc.Status.Code.OUT_OF_RANGE) { + // The gRPC status is OUT_OF_RANGE. + // Now, verify the message content for the specific "is beyond the end of the stream" text. + // This text might be in the grpcStatus's description, or in the message of the original + // throwable 't', or one of its causes. + + // Check the description from the derived gRPC status first. + // grpcStatus is confirmed not null here. + String description = grpcStatus.getDescription(); + if (description != null + && description.toLowerCase().contains("is beyond the end of the stream")) { + return true; + } + + // If the description didn't match, iterate through the exception chain of 't' + // to find a message that confirms the "offset beyond end of stream" scenario. + Throwable currentThrowable = t; + while (currentThrowable != null) { + String message = currentThrowable.getMessage(); + if (message != null && message.toLowerCase().contains("is beyond the end of the stream")) { + // If any exception in the chain has this message, and the overall gRPC status + // (determined by Status.fromThrowable(t)) is OUT_OF_RANGE, we consider it a match. + return true; + } + currentThrowable = currentThrowable.getCause(); + } + } + // If grpcStatus was null, or the gRPC status code was not OUT_OF_RANGE, + // or if it was OUT_OF_RANGE but no matching message was found. + return false; + } + @DefaultSchema(JavaFieldSchema.class) static class Operation implements Comparable, Serializable { final long flushOffset; @@ -186,6 +234,20 @@ public void process(PipelineOptions pipelineOptions, @Element KV