Skip to content

Commit 77e903d

Browse files
committed
Error Management Big-Table
1 parent d6dbdb7 commit 77e903d

File tree

5 files changed

+128
-23
lines changed

5 files changed

+128
-23
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.bigtable.common;
18+
19+
import com.google.bigtable.repackaged.io.grpc.Status;
20+
import com.google.bigtable.repackaged.io.grpc.StatusRuntimeException;
21+
import com.google.common.base.Throwables;
22+
import io.cdap.cdap.api.exception.ErrorCategory;
23+
import io.cdap.cdap.api.exception.ErrorCodeType;
24+
import io.cdap.cdap.api.exception.ErrorType;
25+
import io.cdap.cdap.api.exception.ErrorUtils;
26+
import io.cdap.cdap.api.exception.ProgramFailureException;
27+
import io.cdap.cdap.etl.api.exception.ErrorContext;
28+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
29+
import io.cdap.plugin.gcp.common.GCPUtils;
30+
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
/**
36+
* A custom ErrorDetailsProvider for BigTable plugins.
37+
*/
38+
public class BigtableErrorDetailsProvider extends GCPErrorDetailsProvider {
39+
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";
40+
41+
static Map<Status.Code, Integer> actionErrorMap = new HashMap<>();
42+
43+
static {
44+
actionErrorMap.put(Status.Code.CANCELLED, 499);
45+
actionErrorMap.put(Status.Code.UNKNOWN, 500);
46+
actionErrorMap.put(Status.Code.INVALID_ARGUMENT, 400);
47+
actionErrorMap.put(Status.Code.DEADLINE_EXCEEDED, 504);
48+
actionErrorMap.put(Status.Code.NOT_FOUND, 404);
49+
actionErrorMap.put(Status.Code.ALREADY_EXISTS, 409);
50+
actionErrorMap.put(Status.Code.PERMISSION_DENIED, 403);
51+
actionErrorMap.put(Status.Code.UNAUTHENTICATED, 401);
52+
actionErrorMap.put(Status.Code.RESOURCE_EXHAUSTED, 429);
53+
actionErrorMap.put(Status.Code.FAILED_PRECONDITION, 400);
54+
actionErrorMap.put(Status.Code.ABORTED, 409);
55+
actionErrorMap.put(Status.Code.OUT_OF_RANGE, 400);
56+
actionErrorMap.put(Status.Code.UNIMPLEMENTED, 501);
57+
actionErrorMap.put(Status.Code.INTERNAL, 500);
58+
actionErrorMap.put(Status.Code.UNAVAILABLE, 503);
59+
actionErrorMap.put(Status.Code.DATA_LOSS, 500);
60+
}
61+
62+
@Override
63+
protected String getExternalDocumentationLink() {
64+
return GCPUtils.BIG_TABLE_SUPPORTED_DOC_URL;
65+
}
66+
67+
@Override
68+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
69+
ProgramFailureException ex = super.getExceptionDetails(e, errorContext);
70+
if (ex != null) {
71+
return ex;
72+
}
73+
List<Throwable> causalChain = Throwables.getCausalChain(e);
74+
for (Throwable t : causalChain) {
75+
if (t instanceof StatusRuntimeException) {
76+
return getProgramFailureExceptionFromBigTableException((StatusRuntimeException) t);
77+
}
78+
}
79+
return null;
80+
}
81+
82+
private ProgramFailureException getProgramFailureExceptionFromBigTableException(StatusRuntimeException se) {
83+
int httpStatusCode = actionErrorMap.get(se.getStatus().getCode());
84+
ErrorUtils.ActionErrorPair actionErrorPair = null;
85+
String errorReason = se.getMessage();
86+
String errorMessage = se.getMessage();
87+
if (actionErrorMap.containsKey(se.getStatus().getCode())) {
88+
actionErrorPair = ErrorUtils.getActionErrorByStatusCode(httpStatusCode);
89+
errorReason = String.format("%s %s. %s", httpStatusCode, errorMessage, actionErrorPair.getCorrectiveAction());
90+
}
91+
if (!errorReason.endsWith(".")) {
92+
errorReason = errorReason + ".";
93+
}
94+
errorReason = String.format("%s For more details, see %s.", errorReason, GCPUtils.BIG_TABLE_SUPPORTED_DOC_URL);
95+
96+
String errorMessageWithCode = String.format("[ErrorCode='%s'] %s", httpStatusCode, errorMessage);
97+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
98+
errorReason, String.format("%s: %s", se.getClass().getName(), errorMessageWithCode),
99+
actionErrorPair != null ? actionErrorPair.getErrorType() : ErrorType.UNKNOWN, true, ErrorCodeType.HTTP,
100+
String.valueOf(httpStatusCode), GCPUtils.SPANNER_SUPPORTED_DOC_URL, se);
101+
}
102+
}

src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3535
import io.cdap.cdap.etl.api.batch.BatchSink;
3636
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
37+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3738
import io.cdap.plugin.common.ConfigUtil;
3839
import io.cdap.plugin.common.LineageRecorder;
40+
import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider;
3941
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
4042
import io.cdap.plugin.gcp.common.SourceOutputFormatProvider;
4143
import org.apache.hadoop.conf.Configuration;
@@ -139,6 +141,8 @@ public void prepareRun(BatchSinkContext context) {
139141
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
140142
// We call emitLineage before since it creates the dataset with schema.
141143
emitLineage(context);
144+
// set error details provider
145+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName()));
142146
context.addOutput(Output.of(config.getReferenceName(),
143147
new SourceOutputFormatProvider(BigtableOutputFormat.class, conf)));
144148
}

src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
import io.cdap.cdap.api.data.format.StructuredRecord;
2626
import io.cdap.cdap.api.data.schema.Schema;
2727
import io.cdap.cdap.api.dataset.lib.KeyValue;
28+
import io.cdap.cdap.api.exception.ErrorCategory;
29+
import io.cdap.cdap.api.exception.ErrorType;
30+
import io.cdap.cdap.api.exception.ErrorUtils;
2831
import io.cdap.cdap.etl.api.Emitter;
2932
import io.cdap.cdap.etl.api.FailureCollector;
3033
import io.cdap.cdap.etl.api.PipelineConfigurer;
3134
import io.cdap.cdap.etl.api.StageConfigurer;
3235
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3336
import io.cdap.cdap.etl.api.batch.BatchSource;
3437
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
38+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3539
import io.cdap.plugin.common.ConfigUtil;
3640
import io.cdap.plugin.common.LineageRecorder;
3741
import io.cdap.plugin.common.SourceInputFormatProvider;
42+
import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider;
3843
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
3944
import org.apache.commons.lang3.ObjectUtils;
4045
import org.apache.hadoop.conf.Configuration;
@@ -121,6 +126,8 @@ public void prepareRun(BatchSourceContext context) {
121126
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
122127
// We call emitLineage before since it creates the dataset with schema.
123128
emitLineage(context, configuredSchema);
129+
// set error details provider
130+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName()));
124131
context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(BigtableInputFormat.class, conf)));
125132
}
126133

@@ -149,7 +156,9 @@ public void transform(KeyValue<ImmutableBytesWritable, Result> input, Emitter<St
149156
LOG.warn("Failed to process message, skipping it", e);
150157
break;
151158
case FAIL_PIPELINE:
152-
throw new RuntimeException("Failed to process message", e);
159+
String error = String.format("Failed to process message: %s", e.getMessage());
160+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
161+
error, error, ErrorType.USER, false, e);
153162
default:
154163
// this should never happen because it is validated at configure and prepare time
155164
throw new IllegalStateException(String.format("Unknown error handling strategy '%s'",

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.api.client.http.HttpResponseException;
2020
import com.google.api.gax.rpc.ApiException;
2121
import com.google.common.base.Throwables;
22+
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
2223
import io.cdap.cdap.api.exception.ErrorCategory;
2324
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
2425
import io.cdap.cdap.api.exception.ErrorType;
@@ -63,43 +64,31 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
6364
getExternalDocumentationLink(), errorContext);
6465
}
6566
if (t instanceof IllegalArgumentException) {
66-
return getProgramFailureException((IllegalArgumentException) t, errorContext);
67+
return getProgramFailureException((IllegalArgumentException) t, errorContext, ErrorType.USER);
6768
}
6869
if (t instanceof IllegalStateException) {
69-
return getProgramFailureException((IllegalStateException) t, errorContext);
70+
return getProgramFailureException((IllegalStateException) t, errorContext, ErrorType.SYSTEM);
71+
}
72+
if (t instanceof UnexpectedFormatException) {
73+
return getProgramFailureException((UnexpectedFormatException) t, errorContext, ErrorType.USER);
7074
}
7175
}
7276
return null;
7377
}
7478

7579
/**
7680
* Get a ProgramFailureException with the given error
77-
* information from {@link IllegalArgumentException}.
78-
*
79-
* @param e The IllegalArgumentException to get the error information from.
80-
* @return A ProgramFailureException with the given error information.
81-
*/
82-
private ProgramFailureException getProgramFailureException(IllegalArgumentException e,
83-
ErrorContext errorContext) {
84-
String errorMessage = e.getMessage();
85-
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
86-
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
87-
e.getClass().getName(), errorMessage), ErrorType.USER, false, e);
88-
}
89-
90-
/**
91-
* Get a ProgramFailureException with the given error
92-
* information from {@link IllegalStateException}.
81+
* information from {@link IllegalArgumentException}, {@link UnexpectedFormatException}.
9382
*
94-
* @param e The IllegalStateException to get the error information from.
83+
* @param e The IllegalArgumentException or UnexpectedFormatException to get the error information from.
9584
* @return A ProgramFailureException with the given error information.
9685
*/
97-
private ProgramFailureException getProgramFailureException(IllegalStateException e,
98-
ErrorContext errorContext) {
86+
private ProgramFailureException getProgramFailureException(Exception e,
87+
ErrorContext errorContext, ErrorType errorType) {
9988
String errorMessage = e.getMessage();
10089
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
10190
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
102-
e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e);
91+
e.getClass().getName(), errorMessage), errorType, false, e);
10392
}
10493

10594
/**

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public class GCPUtils {
9090
public static final String SPANNER_SUPPORTED_DOC_URL = "https://cloud.google.com/spanner/docs/error-codes";
9191
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;
9292
public static final String DATASTORE_SUPPORTED_DOC_URL = "https://cloud.google.com/datastore/docs/concepts/errors";
93+
public static final String BIG_TABLE_SUPPORTED_DOC_URL = "https://cloud.google.com/bigtable/docs/status-codes";
9394

9495
/**
9596
* Load a service account from the local file system.

0 commit comments

Comments
 (0)