diff --git a/src/main/java/io/cdap/plugin/gcp/bigtable/common/BigtableErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/gcp/bigtable/common/BigtableErrorDetailsProvider.java new file mode 100644 index 0000000000..49579ada22 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigtable/common/BigtableErrorDetailsProvider.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigtable.common; + +import com.google.bigtable.repackaged.io.grpc.StatusRuntimeException; +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil; +import io.cdap.plugin.gcp.common.GCPUtils; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; + +import java.util.List; + + +/** + * A custom ErrorDetailsProvider for BigTable plugins. + */ +public class BigtableErrorDetailsProvider extends GCPErrorDetailsProvider { + + @Override + protected String getExternalDocumentationLink() { + return GCPUtils.BIG_TABLE_SUPPORTED_DOC_URL; + } + + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + ProgramFailureException ex = super.getExceptionDetails(e, errorContext); + if (ex != null) { + return ex; + } + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof StatusRuntimeException) { + return getProgramFailureExceptionFromBigTableException((StatusRuntimeException) t); + } + // Some RPC exception may be wrapped in a RetriesExhaustedWithDetailsException + if (t instanceof RetriesExhaustedWithDetailsException) { + RetriesExhaustedWithDetailsException r = (RetriesExhaustedWithDetailsException) t; + List innerCauses = r.getCauses(); + for (Throwable innerCause : innerCauses) { + if (innerCause instanceof Exception) { + ProgramFailureException pfe = this.getExceptionDetails((Exception) innerCause, errorContext); + if (pfe != null) { + return pfe; + } + } + } + } + } + return null; + } + + private ProgramFailureException getProgramFailureExceptionFromBigTableException(StatusRuntimeException se) { + return GCPErrorDetailsProviderUtil.getProgramFailureExceptionByGrpcStatusCode(se.getStatus().getCode().value(), + se.getMessage(), se.getMessage(), GCPUtils.BIG_TABLE_SUPPORTED_DOC_URL, se); + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java b/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java index 6a57bd6bc8..1301f0cc3d 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java @@ -34,8 +34,10 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider; import io.cdap.plugin.gcp.bigtable.common.HBaseColumn; import io.cdap.plugin.gcp.common.SourceOutputFormatProvider; import org.apache.hadoop.conf.Configuration; @@ -139,6 +141,8 @@ public void prepareRun(BatchSinkContext context) { // Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists. // We call emitLineage before since it creates the dataset with schema. emitLineage(context); + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName())); context.addOutput(Output.of(config.getReferenceName(), new SourceOutputFormatProvider(BigtableOutputFormat.class, conf))); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java b/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java index e0026daefb..d1d05ee822 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java @@ -25,6 +25,9 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -32,9 +35,11 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; import io.cdap.plugin.common.SourceInputFormatProvider; +import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider; import io.cdap.plugin.gcp.bigtable.common.HBaseColumn; import org.apache.commons.lang3.ObjectUtils; import org.apache.hadoop.conf.Configuration; @@ -121,6 +126,8 @@ public void prepareRun(BatchSourceContext context) { // Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists. // We call emitLineage before since it creates the dataset with schema. emitLineage(context, configuredSchema); + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName())); context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(BigtableInputFormat.class, conf))); } @@ -149,7 +156,9 @@ public void transform(KeyValue input, Emitter GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP = Collections.unmodifiableMap( + new HashMap() {{ + put(3, 400); // INVALID_ARGUMENT <--> HTTP 400 (Bad Request) + put(4, 504); // DEADLINE_EXCEEDED <--> HTTP 504 (Gateway Timeout) + put(5, 404); // NOT_FOUND <--> HTTP 404 (Not Found) + put(6, 409); // ALREADY_EXISTS <--> HTTP 409 (Conflict) + put(7, 403); // PERMISSION_DENIED <--> HTTP 403 (Forbidden) + put(8, 429); // RESOURCE_EXHAUSTED <--> HTTP 429 (Too Many Requests) + put(9, 400); // FAILED_PRECONDITION <--> HTTP 400 (Bad Request) + put(10, 409); // ABORTED <--> HTTP 409 (Conflict) + put(11, 400); // OUT_OF_RANGE <--> HTTP 400 (Bad Request) + put(12, 501); // UNIMPLEMENTED <--> HTTP 501 (Not Implemented) + put(13, 500); // INTERNAL <--> HTTP 500 (Internal Server Error) + put(14, 503); // UNAVAILABLE <--> HTTP 503 (Service Unavailable) + put(15, 500); // DATA_LOSS <--> HTTP 500 (Internal Server Error) + put(16, 401); // UNAUTHENTICATED <--> HTTP 401 (Unauthorized) + }}); + /** * Get a ProgramFailureException with the given error * information from {@link HttpResponseException}. @@ -130,4 +153,39 @@ private static String getErrorMessage(GoogleJsonResponseException exception) { } return exception.getMessage(); } + + + /** + * Get the HTTP status code for a given gRPC error code. + * + * @param grpcStatusCode the int value of the gRPC error code + */ + public static ErrorUtils.ActionErrorPair getActionErrorByGrpcStatusCode(int grpcStatusCode) { + if (!GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP.containsKey(grpcStatusCode)) { + return null; + } + return ErrorUtils.getActionErrorByStatusCode(GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP.get(grpcStatusCode)); + } + + public static ProgramFailureException getProgramFailureExceptionByGrpcStatusCode(int grpcErrorCodeValue, + String grpcErrorReason, String grpcErrorMessage, String supportedDocUrl, Exception se) { + int httpStatusCode = GCPErrorDetailsProviderUtil.GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP. + getOrDefault(grpcErrorCodeValue, 500); + ErrorUtils.ActionErrorPair actionErrorPair = GCPErrorDetailsProviderUtil.getActionErrorByGrpcStatusCode( + grpcErrorCodeValue); + String errorReason = grpcErrorReason; + if (actionErrorPair != null) { + errorReason = String.format("%s %s. %s", httpStatusCode, grpcErrorMessage, actionErrorPair.getCorrectiveAction()); + } + if (!errorReason.endsWith(".")) { + errorReason = errorReason + "."; + } + errorReason = String.format("%s For more details, see %s.", errorReason, supportedDocUrl); + + String errorMessageWithCode = String.format("[ErrorCode='%s'] %s", httpStatusCode, grpcErrorMessage); + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, + String.format("%s: %s", se.getClass().getName(), errorMessageWithCode), + actionErrorPair != null ? actionErrorPair.getErrorType() : ErrorType.UNKNOWN, true, ErrorCodeType.HTTP, + String.valueOf(httpStatusCode), supportedDocUrl, se); + } } diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java index 4af3ed1306..1d92d3553d 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java @@ -35,7 +35,6 @@ import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.google.gson.reflect.TypeToken; -import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.gcs.GCSPath; import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider; import org.apache.hadoop.conf.Configuration; @@ -90,6 +89,7 @@ public class GCPUtils { public static final String SPANNER_SUPPORTED_DOC_URL = "https://cloud.google.com/spanner/docs/error-codes"; public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120; public static final String DATASTORE_SUPPORTED_DOC_URL = "https://cloud.google.com/datastore/docs/concepts/errors"; + public static final String BIG_TABLE_SUPPORTED_DOC_URL = "https://cloud.google.com/bigtable/docs/status-codes"; /** * Load a service account from the local file system. diff --git a/src/main/java/io/cdap/plugin/gcp/spanner/common/SpannerErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/gcp/spanner/common/SpannerErrorDetailsProvider.java index 1303d2859b..085e81e87d 100644 --- a/src/main/java/io/cdap/plugin/gcp/spanner/common/SpannerErrorDetailsProvider.java +++ b/src/main/java/io/cdap/plugin/gcp/spanner/common/SpannerErrorDetailsProvider.java @@ -16,47 +16,19 @@ package io.cdap.plugin.gcp.spanner.common; -import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerException; import com.google.common.base.Throwables; -import io.cdap.cdap.api.exception.ErrorCategory; -import io.cdap.cdap.api.exception.ErrorCodeType; -import io.cdap.cdap.api.exception.ErrorType; -import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.exception.ProgramFailureException; import io.cdap.cdap.etl.api.exception.ErrorContext; import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider; +import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil; import io.cdap.plugin.gcp.common.GCPUtils; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * A custom ErrorDetailsProvider for Spanner. */ public class SpannerErrorDetailsProvider extends GCPErrorDetailsProvider { - private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s"; - - static Map actionErrorMap = new HashMap<>(); - - static { - actionErrorMap.put(ErrorCode.CANCELLED, 499); - actionErrorMap.put(ErrorCode.UNKNOWN, 500); - actionErrorMap.put(ErrorCode.INVALID_ARGUMENT, 400); - actionErrorMap.put(ErrorCode.DEADLINE_EXCEEDED, 504); - actionErrorMap.put(ErrorCode.NOT_FOUND, 404); - actionErrorMap.put(ErrorCode.ALREADY_EXISTS, 409); - actionErrorMap.put(ErrorCode.PERMISSION_DENIED, 403); - actionErrorMap.put(ErrorCode.UNAUTHENTICATED, 401); - actionErrorMap.put(ErrorCode.RESOURCE_EXHAUSTED, 429); - actionErrorMap.put(ErrorCode.FAILED_PRECONDITION, 400); - actionErrorMap.put(ErrorCode.ABORTED, 409); - actionErrorMap.put(ErrorCode.OUT_OF_RANGE, 400); - actionErrorMap.put(ErrorCode.UNIMPLEMENTED, 501); - actionErrorMap.put(ErrorCode.INTERNAL, 500); - actionErrorMap.put(ErrorCode.UNAVAILABLE, 503); - actionErrorMap.put(ErrorCode.DATA_LOSS, 500); - } @Override protected String getExternalDocumentationLink() { @@ -79,23 +51,8 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err } private ProgramFailureException getProgramFailureExceptionFromSpannerException(SpannerException se) { - int httpStatusCode = actionErrorMap.get(se.getErrorCode()); - ErrorUtils.ActionErrorPair actionErrorPair = null; - String errorReason = se.getReason(); - String errorMessage = se.getMessage(); - if (actionErrorMap.containsKey(se.getErrorCode())) { - actionErrorPair = ErrorUtils.getActionErrorByStatusCode(httpStatusCode); - errorReason = String.format("%s %s. %s", httpStatusCode, errorMessage, actionErrorPair.getCorrectiveAction()); - } - if (!errorReason.endsWith(".")) { - errorReason = errorReason + "."; - } - errorReason = String.format("%s For more details, see %s.", errorReason, GCPUtils.SPANNER_SUPPORTED_DOC_URL); - - String errorMessageWithCode = String.format("[ErrorCode='%s'] %s", httpStatusCode, errorMessage); - return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), - errorReason, String.format("%s: %s", se.getClass().getName(), errorMessageWithCode), - actionErrorPair != null ? actionErrorPair.getErrorType() : ErrorType.UNKNOWN, true, ErrorCodeType.HTTP, - String.valueOf(httpStatusCode), GCPUtils.SPANNER_SUPPORTED_DOC_URL, se); + return GCPErrorDetailsProviderUtil.getProgramFailureExceptionByGrpcStatusCode( + se.getErrorCode().getGrpcStatusCode().value(), se.getReason(), se.getMessage(), + GCPUtils.SPANNER_SUPPORTED_DOC_URL, se); } }