Skip to content

Commit c842cd7

Browse files
Implement Program Failure Exception Handling in GCS plugins to catch known errors
1 parent a1a4049 commit c842cd7

14 files changed

+498
-62
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,8 +1010,8 @@
10101010
<version>1.1.0</version>
10111011
<configuration>
10121012
<cdapArtifacts>
1013-
<parent>system:cdap-data-pipeline[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
1014-
<parent>system:cdap-data-streams[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
1013+
<parent>system:cdap-data-pipeline[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
1014+
<parent>system:cdap-data-streams[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
10151015
</cdapArtifacts>
10161016
</configuration>
10171017
<executions>
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.common;
18+
19+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
20+
import com.google.api.client.http.HttpResponseException;
21+
import com.google.common.base.Throwables;
22+
import io.cdap.cdap.api.exception.ErrorCategory;
23+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
24+
import io.cdap.cdap.api.exception.ErrorUtils;
25+
import io.cdap.cdap.api.exception.ProgramFailureException;
26+
import java.io.IOException;
27+
import java.util.List;
28+
29+
/**
30+
* Utility class to handle exceptions.
31+
*/
32+
public class ExceptionUtils {
33+
34+
/**
35+
* Get a ProgramFailureException with the given error
36+
* information from {@link HttpResponseException}.
37+
*
38+
* @param e The HttpResponseException to get the error information from.
39+
* @return A ProgramFailureException with the given error information.
40+
*/
41+
private static ProgramFailureException getProgramFailureException(HttpResponseException e) {
42+
Integer statusCode = e.getStatusCode();
43+
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
44+
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
45+
pair.getCorrectiveAction());
46+
47+
String errorMessage = e.getMessage();
48+
if (e instanceof GoogleJsonResponseException) {
49+
GoogleJsonResponseException exception = (GoogleJsonResponseException) e;
50+
errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() :
51+
exception.getMessage();
52+
}
53+
54+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
55+
errorReason, errorMessage, pair.getErrorType(), true, e);
56+
}
57+
58+
/**
59+
* Get a ProgramFailureException with the given error
60+
* information from generic exceptions like {@link IOException}.
61+
*
62+
* @param e The Throwable to get the error information from.
63+
* @return A ProgramFailureException with the given error information, otherwise null.
64+
*/
65+
public static ProgramFailureException getProgramFailureException(Throwable e) {
66+
67+
List<Throwable> causalChain = Throwables.getCausalChain(e);
68+
for (Throwable t : causalChain) {
69+
if (t instanceof ProgramFailureException) {
70+
return (ProgramFailureException) t;
71+
}
72+
if (t instanceof HttpResponseException) {
73+
return getProgramFailureException((HttpResponseException) t);
74+
}
75+
}
76+
77+
//
78+
return null;
79+
}
80+
}

src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import com.google.bigtable.repackaged.com.google.gson.Gson;
2222
import com.google.cloud.hadoop.util.AccessTokenProvider;
2323
import com.google.cloud.hadoop.util.CredentialFactory;
24+
import io.cdap.cdap.api.exception.ErrorCategory;
25+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
26+
import io.cdap.cdap.api.exception.ErrorType;
27+
import io.cdap.cdap.api.exception.ErrorUtils;
2428
import io.cdap.plugin.gcp.common.GCPUtils;
2529
import org.apache.hadoop.conf.Configuration;
2630

@@ -50,13 +54,20 @@ public AccessToken getAccessToken() {
5054
}
5155
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
5256
} catch (IOException e) {
53-
throw new RuntimeException(e);
57+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
58+
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
5459
}
5560
}
5661

5762
@Override
5863
public void refresh() throws IOException {
59-
getCredentials().refresh();
64+
try {
65+
getCredentials().refresh();
66+
} catch (IOException e) {
67+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
68+
"Unable to refresh service account access token.", e.getMessage(),
69+
ErrorType.UNKNOWN, true, e);
70+
}
6071
}
6172

6273
private GoogleCredentials getCredentials() throws IOException {

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.cdap.plugin.gcp.gcs.sink;
1818

19+
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
20+
import io.cdap.plugin.gcp.common.ExceptionUtils;
1921
import org.apache.hadoop.fs.FSDataInputStream;
2022
import org.apache.hadoop.fs.FileStatus;
2123
import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +43,8 @@
4143
* <p>
4244
* Delegated instances are created based on a supplied Output Format and Destination Table Names.
4345
*/
44-
public class DelegatingGCSOutputCommitter extends OutputCommitter {
46+
public class DelegatingGCSOutputCommitter extends OutputCommitter implements
47+
ErrorDetailsProvider<Void> {
4548

4649
private final TaskAttemptContext taskAttemptContext;
4750
private boolean firstTable = true;
@@ -244,4 +247,8 @@ private String getPendingDirPath(JobID jobId) {
244247
return String.format("%s_%s", FileOutputCommitter.PENDING_DIR_NAME, jobId);
245248
}
246249

250+
@Override
251+
public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
252+
return ExceptionUtils.getProgramFailureException(throwable);
253+
}
247254
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package io.cdap.plugin.gcp.gcs.sink;
1818

1919
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
21+
import io.cdap.plugin.gcp.common.ExceptionUtils;
2022
import org.apache.hadoop.conf.Configuration;
2123
import org.apache.hadoop.io.NullWritable;
2224
import org.apache.hadoop.mapreduce.JobContext;
@@ -32,7 +34,8 @@
3234
/**
3335
* Output Format used to handle Schemaless Records as input.
3436
*/
35-
public class DelegatingGCSOutputFormat extends OutputFormat<NullWritable, StructuredRecord> {
37+
public class DelegatingGCSOutputFormat extends OutputFormat<NullWritable, StructuredRecord> implements
38+
ErrorDetailsProvider<Configuration> {
3639

3740
public static final String PARTITION_FIELD = "delegating_output_format.partition.field";
3841
public static final String DELEGATE_CLASS = "delegating_output_format.delegate";
@@ -75,4 +78,9 @@ public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext contex
7578
return new DelegatingGCSOutputCommitter(context);
7679
}
7780

81+
@Override
82+
public RuntimeException getExceptionDetails(Throwable throwable, Configuration conf) {
83+
return ExceptionUtils.getProgramFailureException(throwable);
84+
}
85+
7886
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package io.cdap.plugin.gcp.gcs.sink;
1818

1919
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
21+
import io.cdap.plugin.gcp.common.ExceptionUtils;
2022
import org.apache.hadoop.io.NullWritable;
2123
import org.apache.hadoop.mapreduce.OutputFormat;
2224
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -31,7 +33,8 @@
3133
* <p>
3234
* This Record Writer will initialize record writes and Output Committers as needed.
3335
*/
34-
public class DelegatingGCSRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
36+
public class DelegatingGCSRecordWriter extends
37+
RecordWriter<NullWritable, StructuredRecord> implements ErrorDetailsProvider<Void> {
3538
private final TaskAttemptContext context;
3639
private final String partitionField;
3740
private final Map<String, RecordWriter<NullWritable, StructuredRecord>> delegateMap;
@@ -78,4 +81,8 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
7881
}
7982
}
8083

84+
@Override
85+
public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
86+
return ExceptionUtils.getProgramFailureException(throwable);
87+
}
8188
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -121,26 +121,42 @@ public void prepareRun(BatchSinkContext context) throws Exception {
121121
collector.addFailure("Service account type is undefined.",
122122
"Must be `filePath` or `JSON`");
123123
collector.getOrThrowException();
124-
return;
125124
}
126-
Credentials credentials = config.connection.getServiceAccount() == null ?
127-
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
125+
126+
Credentials credentials = null;
127+
try {
128+
credentials = config.connection.getServiceAccount() == null ?
129+
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
130+
isServiceAccountFilePath);
131+
} catch (Exception e) {
132+
String errorReason = "Unable to load service account credentials.";
133+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
134+
.withStacktrace(e.getStackTrace());
135+
collector.getOrThrowException();
136+
}
137+
138+
String bucketName = config.getBucket(collector);
128139
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
140+
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
141+
String correctiveAction = "Ensure you entered the correct bucket path and "
142+
+ "have permissions for it.";
129143
Bucket bucket;
130-
String location;
144+
String location = null;
131145
try {
132-
bucket = storage.get(config.getBucket());
146+
bucket = storage.get(bucketName);
147+
if (bucket != null) {
148+
location = bucket.getLocation();
149+
} else {
150+
location = config.getLocation();
151+
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
152+
}
133153
} catch (StorageException e) {
134-
throw new RuntimeException(
135-
String.format("Unable to access or create bucket %s. ", config.getBucket())
136-
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
137-
}
138-
if (bucket != null) {
139-
location = bucket.getLocation();
140-
} else {
141-
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
142-
location = config.getLocation();
154+
String errorReason = String.format(errorReasonFormat, e.getCode());
155+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
156+
.withStacktrace(e.getStackTrace());
157+
collector.getOrThrowException();
143158
}
159+
144160
this.outputPath = getOutputDir(context);
145161
// create asset for lineage
146162
asset = Asset.builder(config.getReferenceName())
@@ -532,8 +548,15 @@ public void validateContentType(FailureCollector failureCollector) {
532548
}
533549
}
534550

535-
public String getBucket() {
536-
return GCSPath.from(path).getBucket();
551+
public String getBucket(FailureCollector collector) {
552+
try {
553+
return GCSPath.from(path).getBucket();
554+
} catch (IllegalArgumentException e) {
555+
collector.addFailure(e.getMessage(), null)
556+
.withStacktrace(e.getStackTrace());
557+
collector.getOrThrowException();
558+
}
559+
return null;
537560
}
538561

539562
@Override

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,33 +129,47 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
129129
config.validate(collector, context.getArguments().asMap());
130130
collector.getOrThrowException();
131131

132-
Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
133-
config.getPath(), new HashMap<>());
134132
Map<String, String> argumentCopy = new HashMap<>(context.getArguments().asMap());
135-
136133
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
137134
collector.getOrThrowException();
135+
138136
Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
139137
if (isServiceAccountFilePath == null) {
140-
context.getFailureCollector().addFailure("Service account type is undefined.",
141-
"Must be `filePath` or `JSON`");
142-
context.getFailureCollector().getOrThrowException();
143-
return;
138+
collector.addFailure("Service account type is undefined.",
139+
"Must be `filePath` or `JSON`");
140+
collector.getOrThrowException();
144141
}
145-
Credentials credentials = config.connection.getServiceAccount() == null ?
146-
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
142+
143+
Credentials credentials = null;
144+
try {
145+
credentials = config.connection.getServiceAccount() == null ?
146+
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
147+
isServiceAccountFilePath);
148+
} catch (Exception e) {
149+
String errorReason = "Unable to load service account credentials.";
150+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
151+
.withStacktrace(e.getStackTrace());
152+
collector.getOrThrowException();
153+
}
154+
155+
String bucketName = config.getBucket(collector);
147156
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
157+
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
158+
String correctiveAction = "Ensure you entered the correct bucket path and "
159+
+ "have permissions for it.";
148160
try {
149-
if (storage.get(config.getBucket()) == null) {
150-
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
161+
if (storage.get(bucketName) == null) {
162+
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
151163
}
152164
} catch (StorageException e) {
153-
// Add more descriptive error message
154-
throw new RuntimeException(
155-
String.format("Unable to access or create bucket %s. ", config.getBucket())
156-
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
165+
String errorReason = String.format(errorReasonFormat, e.getCode());
166+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
167+
.withStacktrace(e.getStackTrace());
168+
collector.getOrThrowException();
157169
}
158170

171+
Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
172+
config.getPath(), new HashMap<>());
159173
if (config.getAllowFlexibleSchema()) {
160174
//Configure MultiSink with support for flexible schemas.
161175
configureSchemalessMultiSink(context, baseProperties, argumentCopy);

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputCommitter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.google.cloud.storage.Storage;
2222
import com.google.cloud.storage.StorageOptions;
2323
import com.google.common.annotations.VisibleForTesting;
24+
import io.cdap.cdap.api.exception.ErrorDetailsProvider;
25+
import io.cdap.plugin.gcp.common.ExceptionUtils;
2426
import io.cdap.plugin.gcp.common.GCPUtils;
2527
import io.cdap.plugin.gcp.gcs.StorageClient;
2628
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +42,8 @@
4042
/**
4143
* OutputCommitter for GCS
4244
*/
43-
public class GCSOutputCommitter extends OutputCommitter {
45+
public class GCSOutputCommitter extends OutputCommitter implements
46+
ErrorDetailsProvider<Void> {
4447

4548
private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
4649
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";
@@ -161,4 +164,9 @@ public boolean isRecoverySupported() {
161164
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
162165
delegate.recoverTask(taskContext);
163166
}
167+
168+
@Override
169+
public RuntimeException getExceptionDetails(Throwable throwable, Void conf) {
170+
return ExceptionUtils.getProgramFailureException(throwable);
171+
}
164172
}

0 commit comments

Comments
 (0)