Skip to content

Commit d991ce7

Browse files
authored
Merge pull request data-integrations#1526 from cloudsufi/fem/bigtable
[PLUGIN-1870] Add BigtableErrorDetailsProvider
2 parents 0f7bab8 + 57664da commit d991ce7

File tree

7 files changed

+161
-71
lines changed

7 files changed

+161
-71
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.bigtable.common;
18+
19+
import com.google.bigtable.repackaged.io.grpc.StatusRuntimeException;
20+
import com.google.common.base.Throwables;
21+
import io.cdap.cdap.api.exception.ProgramFailureException;
22+
import io.cdap.cdap.etl.api.exception.ErrorContext;
23+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
24+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
25+
import io.cdap.plugin.gcp.common.GCPUtils;
26+
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
27+
28+
import java.util.List;
29+
30+
31+
/**
32+
* A custom ErrorDetailsProvider for BigTable plugins.
33+
*/
34+
public class BigtableErrorDetailsProvider extends GCPErrorDetailsProvider {
35+
36+
@Override
37+
protected String getExternalDocumentationLink() {
38+
return GCPUtils.BIG_TABLE_SUPPORTED_DOC_URL;
39+
}
40+
41+
@Override
42+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
43+
ProgramFailureException ex = super.getExceptionDetails(e, errorContext);
44+
if (ex != null) {
45+
return ex;
46+
}
47+
List<Throwable> causalChain = Throwables.getCausalChain(e);
48+
for (Throwable t : causalChain) {
49+
if (t instanceof StatusRuntimeException) {
50+
return getProgramFailureExceptionFromBigTableException((StatusRuntimeException) t);
51+
}
52+
// Some RPC exception may be wrapped in a RetriesExhaustedWithDetailsException
53+
if (t instanceof RetriesExhaustedWithDetailsException) {
54+
RetriesExhaustedWithDetailsException r = (RetriesExhaustedWithDetailsException) t;
55+
List<Throwable> innerCauses = r.getCauses();
56+
for (Throwable innerCause : innerCauses) {
57+
if (innerCause instanceof Exception) {
58+
ProgramFailureException pfe = this.getExceptionDetails((Exception) innerCause, errorContext);
59+
if (pfe != null) {
60+
return pfe;
61+
}
62+
}
63+
}
64+
}
65+
}
66+
return null;
67+
}
68+
69+
private ProgramFailureException getProgramFailureExceptionFromBigTableException(StatusRuntimeException se) {
70+
return GCPErrorDetailsProviderUtil.getProgramFailureExceptionByGrpcStatusCode(se.getStatus().getCode().value(),
71+
se.getMessage(), se.getMessage(), GCPUtils.BIG_TABLE_SUPPORTED_DOC_URL, se);
72+
}
73+
}

src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3535
import io.cdap.cdap.etl.api.batch.BatchSink;
3636
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
37+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3738
import io.cdap.plugin.common.ConfigUtil;
3839
import io.cdap.plugin.common.LineageRecorder;
40+
import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider;
3941
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
4042
import io.cdap.plugin.gcp.common.SourceOutputFormatProvider;
4143
import org.apache.hadoop.conf.Configuration;
@@ -139,6 +141,8 @@ public void prepareRun(BatchSinkContext context) {
139141
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
140142
// We call emitLineage before since it creates the dataset with schema.
141143
emitLineage(context);
144+
// set error details provider
145+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName()));
142146
context.addOutput(Output.of(config.getReferenceName(),
143147
new SourceOutputFormatProvider(BigtableOutputFormat.class, conf)));
144148
}

src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
import io.cdap.cdap.api.data.format.StructuredRecord;
2626
import io.cdap.cdap.api.data.schema.Schema;
2727
import io.cdap.cdap.api.dataset.lib.KeyValue;
28+
import io.cdap.cdap.api.exception.ErrorCategory;
29+
import io.cdap.cdap.api.exception.ErrorType;
30+
import io.cdap.cdap.api.exception.ErrorUtils;
2831
import io.cdap.cdap.etl.api.Emitter;
2932
import io.cdap.cdap.etl.api.FailureCollector;
3033
import io.cdap.cdap.etl.api.PipelineConfigurer;
3134
import io.cdap.cdap.etl.api.StageConfigurer;
3235
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3336
import io.cdap.cdap.etl.api.batch.BatchSource;
3437
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
38+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3539
import io.cdap.plugin.common.ConfigUtil;
3640
import io.cdap.plugin.common.LineageRecorder;
3741
import io.cdap.plugin.common.SourceInputFormatProvider;
42+
import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider;
3843
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
3944
import org.apache.commons.lang3.ObjectUtils;
4045
import org.apache.hadoop.conf.Configuration;
@@ -121,6 +126,8 @@ public void prepareRun(BatchSourceContext context) {
121126
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
122127
// We call emitLineage before since it creates the dataset with schema.
123128
emitLineage(context, configuredSchema);
129+
// set error details provider
130+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName()));
124131
context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(BigtableInputFormat.class, conf)));
125132
}
126133

@@ -149,7 +156,9 @@ public void transform(KeyValue<ImmutableBytesWritable, Result> input, Emitter<St
149156
LOG.warn("Failed to process message, skipping it", e);
150157
break;
151158
case FAIL_PIPELINE:
152-
throw new RuntimeException("Failed to process message", e);
159+
String error = String.format("Failed to process message: %s", e.getMessage());
160+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
161+
error, error, ErrorType.USER, false, e);
153162
default:
154163
// this should never happen because it is validated at configure and prepare time
155164
throw new IllegalStateException(String.format("Unknown error handling strategy '%s'",

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.api.client.http.HttpResponseException;
2020
import com.google.api.gax.rpc.ApiException;
2121
import com.google.common.base.Throwables;
22+
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
2223
import io.cdap.cdap.api.exception.ErrorCategory;
2324
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
2425
import io.cdap.cdap.api.exception.ErrorType;
@@ -63,43 +64,31 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
6364
getExternalDocumentationLink(), errorContext);
6465
}
6566
if (t instanceof IllegalArgumentException) {
66-
return getProgramFailureException((IllegalArgumentException) t, errorContext);
67+
return getProgramFailureException((IllegalArgumentException) t, errorContext, ErrorType.USER);
6768
}
6869
if (t instanceof IllegalStateException) {
69-
return getProgramFailureException((IllegalStateException) t, errorContext);
70+
return getProgramFailureException((IllegalStateException) t, errorContext, ErrorType.SYSTEM);
71+
}
72+
if (t instanceof UnexpectedFormatException) {
73+
return getProgramFailureException((UnexpectedFormatException) t, errorContext, ErrorType.USER);
7074
}
7175
}
7276
return null;
7377
}
7478

7579
/**
7680
* Get a ProgramFailureException with the given error
77-
* information from {@link IllegalArgumentException}.
78-
*
79-
* @param e The IllegalArgumentException to get the error information from.
80-
* @return A ProgramFailureException with the given error information.
81-
*/
82-
private ProgramFailureException getProgramFailureException(IllegalArgumentException e,
83-
ErrorContext errorContext) {
84-
String errorMessage = e.getMessage();
85-
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
86-
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
87-
e.getClass().getName(), errorMessage), ErrorType.USER, false, e);
88-
}
89-
90-
/**
91-
* Get a ProgramFailureException with the given error
92-
* information from {@link IllegalStateException}.
81+
* information from {@link IllegalArgumentException}, {@link UnexpectedFormatException}.
9382
*
94-
* @param e The IllegalStateException to get the error information from.
83+
* @param e The IllegalArgumentException or UnexpectedFormatException to get the error information from.
9584
* @return A ProgramFailureException with the given error information.
9685
*/
97-
private ProgramFailureException getProgramFailureException(IllegalStateException e,
98-
ErrorContext errorContext) {
86+
private ProgramFailureException getProgramFailureException(Exception e,
87+
ErrorContext errorContext, ErrorType errorType) {
9988
String errorMessage = e.getMessage();
10089
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
10190
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
102-
e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e);
91+
e.getClass().getName(), errorMessage), errorType, false, e);
10392
}
10493

10594
/**

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProviderUtil.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.gax.rpc.ApiException;
2323
import com.google.common.base.Strings;
2424
import com.google.common.base.Throwables;
25+
import com.google.common.collect.ImmutableMap;
2526
import io.cdap.cdap.api.exception.ErrorCategory;
2627
import io.cdap.cdap.api.exception.ErrorCodeType;
2728
import io.cdap.cdap.api.exception.ErrorType;
@@ -30,14 +31,36 @@
3031
import io.cdap.cdap.etl.api.exception.ErrorContext;
3132

3233
import java.io.IOException;
34+
import java.util.Collections;
35+
import java.util.HashMap;
3336
import java.util.List;
37+
import java.util.Map;
3438
import javax.annotation.Nullable;
3539

3640
/**
3741
* Common functions for GCP error details provider related functionalities.
3842
*/
3943
public final class GCPErrorDetailsProviderUtil {
4044

45+
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
46+
public static final Map<Integer, Integer> GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP = Collections.unmodifiableMap(
47+
new HashMap<Integer, Integer>() {{
48+
put(3, 400); // INVALID_ARGUMENT <--> HTTP 400 (Bad Request)
49+
put(4, 504); // DEADLINE_EXCEEDED <--> HTTP 504 (Gateway Timeout)
50+
put(5, 404); // NOT_FOUND <--> HTTP 404 (Not Found)
51+
put(6, 409); // ALREADY_EXISTS <--> HTTP 409 (Conflict)
52+
put(7, 403); // PERMISSION_DENIED <--> HTTP 403 (Forbidden)
53+
put(8, 429); // RESOURCE_EXHAUSTED <--> HTTP 429 (Too Many Requests)
54+
put(9, 400); // FAILED_PRECONDITION <--> HTTP 400 (Bad Request)
55+
put(10, 409); // ABORTED <--> HTTP 409 (Conflict)
56+
put(11, 400); // OUT_OF_RANGE <--> HTTP 400 (Bad Request)
57+
put(12, 501); // UNIMPLEMENTED <--> HTTP 501 (Not Implemented)
58+
put(13, 500); // INTERNAL <--> HTTP 500 (Internal Server Error)
59+
put(14, 503); // UNAVAILABLE <--> HTTP 503 (Service Unavailable)
60+
put(15, 500); // DATA_LOSS <--> HTTP 500 (Internal Server Error)
61+
put(16, 401); // UNAUTHENTICATED <--> HTTP 401 (Unauthorized)
62+
}});
63+
4164
/**
4265
* Get a ProgramFailureException with the given error
4366
* information from {@link HttpResponseException}.
@@ -130,4 +153,39 @@ private static String getErrorMessage(GoogleJsonResponseException exception) {
130153
}
131154
return exception.getMessage();
132155
}
156+
157+
158+
/**
159+
* Get the HTTP status code for a given gRPC error code.
160+
*
161+
* @param grpcStatusCode the int value of the gRPC error code
162+
*/
163+
public static ErrorUtils.ActionErrorPair getActionErrorByGrpcStatusCode(int grpcStatusCode) {
164+
if (!GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP.containsKey(grpcStatusCode)) {
165+
return null;
166+
}
167+
return ErrorUtils.getActionErrorByStatusCode(GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP.get(grpcStatusCode));
168+
}
169+
170+
public static ProgramFailureException getProgramFailureExceptionByGrpcStatusCode(int grpcErrorCodeValue,
171+
String grpcErrorReason, String grpcErrorMessage, String supportedDocUrl, Exception se) {
172+
int httpStatusCode = GCPErrorDetailsProviderUtil.GCP_GRPC_ERROR_CODE_HTTP_STATUS_CODE_MAP.
173+
getOrDefault(grpcErrorCodeValue, 500);
174+
ErrorUtils.ActionErrorPair actionErrorPair = GCPErrorDetailsProviderUtil.getActionErrorByGrpcStatusCode(
175+
grpcErrorCodeValue);
176+
String errorReason = grpcErrorReason;
177+
if (actionErrorPair != null) {
178+
errorReason = String.format("%s %s. %s", httpStatusCode, grpcErrorMessage, actionErrorPair.getCorrectiveAction());
179+
}
180+
if (!errorReason.endsWith(".")) {
181+
errorReason = errorReason + ".";
182+
}
183+
errorReason = String.format("%s For more details, see %s.", errorReason, supportedDocUrl);
184+
185+
String errorMessageWithCode = String.format("[ErrorCode='%s'] %s", httpStatusCode, grpcErrorMessage);
186+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
187+
String.format("%s: %s", se.getClass().getName(), errorMessageWithCode),
188+
actionErrorPair != null ? actionErrorPair.getErrorType() : ErrorType.UNKNOWN, true, ErrorCodeType.HTTP,
189+
String.valueOf(httpStatusCode), supportedDocUrl, se);
190+
}
133191
}

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.google.cloud.storage.StorageException;
3636
import com.google.cloud.storage.StorageOptions;
3737
import com.google.gson.reflect.TypeToken;
38-
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
3938
import io.cdap.plugin.gcp.gcs.GCSPath;
4039
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
4140
import org.apache.hadoop.conf.Configuration;
@@ -90,6 +89,7 @@ public class GCPUtils {
9089
public static final String SPANNER_SUPPORTED_DOC_URL = "https://cloud.google.com/spanner/docs/error-codes";
9190
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;
9291
public static final String DATASTORE_SUPPORTED_DOC_URL = "https://cloud.google.com/datastore/docs/concepts/errors";
92+
public static final String BIG_TABLE_SUPPORTED_DOC_URL = "https://cloud.google.com/bigtable/docs/status-codes";
9393

9494
/**
9595
* Load a service account from the local file system.

src/main/java/io/cdap/plugin/gcp/spanner/common/SpannerErrorDetailsProvider.java

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,47 +16,19 @@
1616

1717
package io.cdap.plugin.gcp.spanner.common;
1818

19-
import com.google.cloud.spanner.ErrorCode;
2019
import com.google.cloud.spanner.SpannerException;
2120
import com.google.common.base.Throwables;
22-
import io.cdap.cdap.api.exception.ErrorCategory;
23-
import io.cdap.cdap.api.exception.ErrorCodeType;
24-
import io.cdap.cdap.api.exception.ErrorType;
25-
import io.cdap.cdap.api.exception.ErrorUtils;
2621
import io.cdap.cdap.api.exception.ProgramFailureException;
2722
import io.cdap.cdap.etl.api.exception.ErrorContext;
2823
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
24+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
2925
import io.cdap.plugin.gcp.common.GCPUtils;
30-
import java.util.HashMap;
3126
import java.util.List;
32-
import java.util.Map;
3327

3428
/**
3529
* A custom ErrorDetailsProvider for Spanner.
3630
*/
3731
public class SpannerErrorDetailsProvider extends GCPErrorDetailsProvider {
38-
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";
39-
40-
static Map<ErrorCode, Integer> actionErrorMap = new HashMap<>();
41-
42-
static {
43-
actionErrorMap.put(ErrorCode.CANCELLED, 499);
44-
actionErrorMap.put(ErrorCode.UNKNOWN, 500);
45-
actionErrorMap.put(ErrorCode.INVALID_ARGUMENT, 400);
46-
actionErrorMap.put(ErrorCode.DEADLINE_EXCEEDED, 504);
47-
actionErrorMap.put(ErrorCode.NOT_FOUND, 404);
48-
actionErrorMap.put(ErrorCode.ALREADY_EXISTS, 409);
49-
actionErrorMap.put(ErrorCode.PERMISSION_DENIED, 403);
50-
actionErrorMap.put(ErrorCode.UNAUTHENTICATED, 401);
51-
actionErrorMap.put(ErrorCode.RESOURCE_EXHAUSTED, 429);
52-
actionErrorMap.put(ErrorCode.FAILED_PRECONDITION, 400);
53-
actionErrorMap.put(ErrorCode.ABORTED, 409);
54-
actionErrorMap.put(ErrorCode.OUT_OF_RANGE, 400);
55-
actionErrorMap.put(ErrorCode.UNIMPLEMENTED, 501);
56-
actionErrorMap.put(ErrorCode.INTERNAL, 500);
57-
actionErrorMap.put(ErrorCode.UNAVAILABLE, 503);
58-
actionErrorMap.put(ErrorCode.DATA_LOSS, 500);
59-
}
6032

6133
@Override
6234
protected String getExternalDocumentationLink() {
@@ -79,23 +51,8 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
7951
}
8052

8153
private ProgramFailureException getProgramFailureExceptionFromSpannerException(SpannerException se) {
82-
int httpStatusCode = actionErrorMap.get(se.getErrorCode());
83-
ErrorUtils.ActionErrorPair actionErrorPair = null;
84-
String errorReason = se.getReason();
85-
String errorMessage = se.getMessage();
86-
if (actionErrorMap.containsKey(se.getErrorCode())) {
87-
actionErrorPair = ErrorUtils.getActionErrorByStatusCode(httpStatusCode);
88-
errorReason = String.format("%s %s. %s", httpStatusCode, errorMessage, actionErrorPair.getCorrectiveAction());
89-
}
90-
if (!errorReason.endsWith(".")) {
91-
errorReason = errorReason + ".";
92-
}
93-
errorReason = String.format("%s For more details, see %s.", errorReason, GCPUtils.SPANNER_SUPPORTED_DOC_URL);
94-
95-
String errorMessageWithCode = String.format("[ErrorCode='%s'] %s", httpStatusCode, errorMessage);
96-
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
97-
errorReason, String.format("%s: %s", se.getClass().getName(), errorMessageWithCode),
98-
actionErrorPair != null ? actionErrorPair.getErrorType() : ErrorType.UNKNOWN, true, ErrorCodeType.HTTP,
99-
String.valueOf(httpStatusCode), GCPUtils.SPANNER_SUPPORTED_DOC_URL, se);
54+
return GCPErrorDetailsProviderUtil.getProgramFailureExceptionByGrpcStatusCode(
55+
se.getErrorCode().getGrpcStatusCode().value(), se.getReason(), se.getMessage(),
56+
GCPUtils.SPANNER_SUPPORTED_DOC_URL, se);
10057
}
10158
}

0 commit comments

Comments
 (0)