Skip to content

Commit 360616c

Browse files
committed
Error Management Big-Table
1 parent 6db2c69 commit 360616c

File tree

5 files changed

+55
-3
lines changed

5 files changed

+55
-3
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.bigtable.common;
18+
19+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
20+
import io.cdap.plugin.gcp.common.GCPUtils;
21+
22+
/**
23+
* A custom ErrorDetailsProvider for BigTable plugins.
24+
*/
25+
public class BigtableErrorDetailsProvider extends GCPErrorDetailsProvider {
26+
27+
@Override
28+
protected String getExternalDocumentationLink() {
29+
return GCPUtils.BIG_TABLE_SUPPORTED_DOC_URL;
30+
}
31+
}

src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@
3434
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3535
import io.cdap.cdap.etl.api.batch.BatchSink;
3636
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
37+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3738
import io.cdap.plugin.common.ConfigUtil;
3839
import io.cdap.plugin.common.LineageRecorder;
40+
import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider;
3941
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
4042
import io.cdap.plugin.gcp.common.SourceOutputFormatProvider;
43+
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
4144
import org.apache.hadoop.conf.Configuration;
4245
import org.apache.hadoop.hbase.HColumnDescriptor;
4346
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -139,6 +142,8 @@ public void prepareRun(BatchSinkContext context) {
139142
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
140143
// We call emitLineage before since it creates the dataset with schema.
141144
emitLineage(context);
145+
// set error details provider
146+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName()));
142147
context.addOutput(Output.of(config.getReferenceName(),
143148
new SourceOutputFormatProvider(BigtableOutputFormat.class, conf)));
144149
}

src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
import io.cdap.cdap.api.data.format.StructuredRecord;
2626
import io.cdap.cdap.api.data.schema.Schema;
2727
import io.cdap.cdap.api.dataset.lib.KeyValue;
28+
import io.cdap.cdap.api.exception.ErrorCategory;
29+
import io.cdap.cdap.api.exception.ErrorType;
30+
import io.cdap.cdap.api.exception.ErrorUtils;
2831
import io.cdap.cdap.etl.api.Emitter;
2932
import io.cdap.cdap.etl.api.FailureCollector;
3033
import io.cdap.cdap.etl.api.PipelineConfigurer;
3134
import io.cdap.cdap.etl.api.StageConfigurer;
3235
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3336
import io.cdap.cdap.etl.api.batch.BatchSource;
3437
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
38+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
3539
import io.cdap.plugin.common.ConfigUtil;
3640
import io.cdap.plugin.common.LineageRecorder;
3741
import io.cdap.plugin.common.SourceInputFormatProvider;
42+
import io.cdap.plugin.gcp.bigtable.common.BigtableErrorDetailsProvider;
3843
import io.cdap.plugin.gcp.bigtable.common.HBaseColumn;
3944
import org.apache.commons.lang3.ObjectUtils;
4045
import org.apache.hadoop.conf.Configuration;
@@ -121,6 +126,8 @@ public void prepareRun(BatchSourceContext context) {
121126
// Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists.
122127
// We call emitLineage before since it creates the dataset with schema.
123128
emitLineage(context, configuredSchema);
129+
// set error details provider
130+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(BigtableErrorDetailsProvider.class.getName()));
124131
context.setInput(Input.of(config.referenceName, new SourceInputFormatProvider(BigtableInputFormat.class, conf)));
125132
}
126133

@@ -149,7 +156,9 @@ public void transform(KeyValue<ImmutableBytesWritable, Result> input, Emitter<St
149156
LOG.warn("Failed to process message, skipping it", e);
150157
break;
151158
case FAIL_PIPELINE:
152-
throw new RuntimeException("Failed to process message", e);
159+
String error = String.format("Failed to process message: %s", e.getMessage());
160+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
161+
error, error, ErrorType.UNKNOWN, false, e);
153162
default:
154163
// this should never happen because it is validated at configure and prepare time
155164
throw new IllegalStateException(String.format("Unknown error handling strategy '%s'",

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.api.client.http.HttpResponseException;
2020
import com.google.common.base.Throwables;
21+
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
2122
import io.cdap.cdap.api.exception.ErrorCategory;
2223
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
2324
import io.cdap.cdap.api.exception.ErrorType;
@@ -59,18 +60,21 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
5960
if (t instanceof IllegalStateException) {
6061
return getProgramFailureException((IllegalStateException) t, errorContext);
6162
}
63+
if (t instanceof UnexpectedFormatException) {
64+
return getProgramFailureException((UnexpectedFormatException) t, errorContext);
65+
}
6266
}
6367
return null;
6468
}
6569

6670
/**
6771
* Get a ProgramFailureException with the given error
68-
* information from {@link IllegalArgumentException}.
72+
* information from {@link IllegalArgumentException}, {@link UnexpectedFormatException}.
6973
*
7074
* @param e The IllegalArgumentException to get the error information from.
7175
* @return A ProgramFailureException with the given error information.
7276
*/
73-
private ProgramFailureException getProgramFailureException(IllegalArgumentException e,
77+
private ProgramFailureException getProgramFailureException(Exception e,
7478
ErrorContext errorContext) {
7579
String errorMessage = e.getMessage();
7680
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
@@ -93,6 +97,8 @@ private ProgramFailureException getProgramFailureException(IllegalStateException
9397
e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e);
9498
}
9599

100+
101+
96102
/**
97103
* Get the external documentation link for the client errors if available.
98104
*

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class GCPUtils {
8181
public static final int MILLISECONDS_MULTIPLIER = 1000;
8282
public static final String GCS_SUPPORTED_DOC_URL = "https://cloud.google.com/storage/docs/json_api/v1/status-codes";
8383
public static final String BQ_SUPPORTED_DOC_URL = "https://cloud.google.com/bigquery/docs/error-messages";
84+
public static final String BIG_TABLE_SUPPORTED_DOC_URL = "https://cloud.google.com/bigtable/docs/status-codes";
8485

8586
/**
8687
* Load a service account from the local file system.

0 commit comments

Comments
 (0)