Skip to content

Commit c98a86b

Browse files
Implement Program Failure Exception Handling in GCS plugins to catch known errors
1 parent e9e2391 commit c98a86b

14 files changed

+323
-67
lines changed

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.gcp.common;
1818

19+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
1920
import com.google.api.gax.retrying.RetrySettings;
2021
import com.google.auth.Credentials;
2122
import com.google.auth.oauth2.ExternalAccountCredentials;
@@ -35,6 +36,10 @@
3536
import com.google.cloud.storage.StorageException;
3637
import com.google.cloud.storage.StorageOptions;
3738
import com.google.gson.reflect.TypeToken;
39+
import io.cdap.cdap.api.exception.ErrorCategory;
40+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
41+
import io.cdap.cdap.api.exception.ErrorType;
42+
import io.cdap.cdap.api.exception.ProgramFailureException;
3843
import io.cdap.plugin.gcp.gcs.GCSPath;
3944
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
4045
import org.apache.hadoop.conf.Configuration;
@@ -79,7 +84,6 @@ public class GCPUtils {
7984
"https://www.googleapis.com/auth/bigquery");
8085
public static final String FQN_RESERVED_CHARACTERS_PATTERN = ".*[.:` \t\n].*";
8186
public static final int MILLISECONDS_MULTIPLIER = 1000;
82-
8387
/**
8488
* Load a service account from the local file system.
8589
*
@@ -310,4 +314,54 @@ public static String formatAsFQNComponent(String component) {
310314
return component;
311315
}
312316
}
317+
318+
/**
319+
* Get a ProgramFailureException with the given error information.
320+
*
321+
* @param errorReason The reason for the error.
322+
* @param errorMessage The error message.
323+
* @param errorType The type of error.
324+
* @return A ProgramFailureException with the given error information.
325+
*/
326+
public static ProgramFailureException getProgramFailureException(Exception cause,
327+
String errorReason, String errorMessage, ErrorType errorType, boolean dependency) {
328+
ProgramFailureException.Builder builder = new ProgramFailureException.Builder();
329+
330+
if (cause != null) {
331+
builder = builder.withCause(cause);
332+
}
333+
334+
return builder
335+
.withErrorCategory(new ErrorCategory(ErrorCategoryEnum.PLUGIN))
336+
.withErrorReason(errorReason)
337+
.withErrorMessage(errorMessage)
338+
.withErrorType(errorType)
339+
.withDependency(dependency)
340+
.build();
341+
}
342+
343+
/**
344+
* Get a ProgramFailureException with the given error
345+
* information from {@link GoogleJsonResponseException}.
346+
*
347+
* @param e The GoogleJsonResponseException.
348+
* @return A ProgramFailureException with the given error information.
349+
*/
350+
public static ProgramFailureException getProgramFailureException(GoogleJsonResponseException e) {
351+
String errorReason = String.format("Request failed with error code: %s, status message: %s",
352+
e.getStatusCode(),
353+
e.getStatusMessage());
354+
355+
return getProgramFailureException(e, errorReason, e.getMessage(),
356+
getErrorType(e), true);
357+
}
358+
359+
private static ErrorType getErrorType(GoogleJsonResponseException e) {
360+
if ((e.getStatusCode() / 100) == 4) {
361+
return ErrorType.USER;
362+
} else if ((e.getStatusCode() / 100) == 5) {
363+
return ErrorType.SYSTEM;
364+
}
365+
return ErrorType.UNKNOWN;
366+
}
313367
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
6565
taskAttemptContext.getConfiguration(), tableName));
6666

6767
//Wrap output committer into the GCS Output Committer.
68-
GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext));
68+
ForwardingOutputCommitter gcsOutputCommitter =
69+
new ForwardingOutputCommitter(
70+
new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext)));
6971

7072
gcsOutputCommitter.setupJob(taskAttemptContext);
7173
gcsOutputCommitter.setupTask(taskAttemptContext);

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
7171
}
7272

7373
@Override
74-
public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext context) {
75-
return new DelegatingGCSOutputCommitter(context);
74+
public ForwardingOutputCommitter getOutputCommitter(TaskAttemptContext context) {
75+
return new ForwardingOutputCommitter(new DelegatingGCSOutputCommitter(context));
7676
}
77-
7877
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package io.cdap.plugin.gcp.gcs.sink;
1818

1919
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import io.cdap.cdap.api.exception.ErrorType;
21+
import io.cdap.plugin.gcp.common.GCPUtils;
2022
import org.apache.hadoop.conf.Configuration;
2123
import org.apache.hadoop.io.NullWritable;
2224
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -29,14 +31,17 @@
2931
public class DelegatingGCSOutputUtils {
3032

3133
@SuppressWarnings("unchecked")
32-
public static OutputFormat<NullWritable, StructuredRecord> getDelegateFormat(Configuration hConf) throws IOException {
34+
public static OutputFormat<NullWritable, StructuredRecord> getDelegateFormat(
35+
Configuration hConf) {
3336
String delegateClassName = hConf.get(DelegatingGCSOutputFormat.DELEGATE_CLASS);
3437
try {
3538
Class<OutputFormat<NullWritable, StructuredRecord>> delegateClass =
3639
(Class<OutputFormat<NullWritable, StructuredRecord>>) hConf.getClassByName(delegateClassName);
3740
return delegateClass.newInstance();
3841
} catch (Exception e) {
39-
throw new IOException("Unable to instantiate output format for class " + delegateClassName, e);
42+
throw GCPUtils.getProgramFailureException(e,
43+
String.format("Unable to instantiate output format for class: %s", delegateClassName),
44+
e.getMessage(), ErrorType.SYSTEM, false);
4045
}
4146
}
4247

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ public class DelegatingGCSRecordWriter extends RecordWriter<NullWritable, Struct
3535
private final TaskAttemptContext context;
3636
private final String partitionField;
3737
private final Map<String, RecordWriter<NullWritable, StructuredRecord>> delegateMap;
38-
private final DelegatingGCSOutputCommitter delegatingGCSOutputCommitter;
38+
private final ForwardingOutputCommitter delegatingGCSOutputCommitter;
3939

4040
DelegatingGCSRecordWriter(TaskAttemptContext context,
4141
String partitionField,
42-
DelegatingGCSOutputCommitter delegatingGCSOutputCommitter) {
42+
ForwardingOutputCommitter delegatingGCSOutputCommitter) {
4343
this.context = context;
4444
this.partitionField = partitionField;
4545
this.delegateMap = new HashMap<>();
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright © 2015-2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.gcs.sink;
18+
19+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
20+
import io.cdap.cdap.api.exception.ErrorType;
21+
import io.cdap.plugin.gcp.common.GCPUtils;
22+
import org.apache.hadoop.mapreduce.JobContext;
23+
import org.apache.hadoop.mapreduce.OutputCommitter;
24+
import org.apache.hadoop.mapreduce.OutputFormat;
25+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
26+
import java.io.IOException;
27+
28+
/**
29+
* ForwardingOutputCommitter which delegates all operations to another OutputCommitter.
30+
* <p>
31+
* This is used to wrap the OutputCommitter of the delegate format and
32+
* throw {@link io.cdap.cdap.api.exception.ProgramFailureException} from IOException.
33+
*/
34+
public class ForwardingOutputCommitter extends OutputCommitter {
35+
private final OutputCommitter delegate;
36+
37+
public ForwardingOutputCommitter (OutputCommitter delegate) {
38+
this.delegate = delegate;
39+
}
40+
41+
@Override
42+
public void setupJob (JobContext jobContext) throws IOException {
43+
try {
44+
delegate.setupJob(jobContext);
45+
} catch (IOException e) {
46+
getException(e);
47+
}
48+
}
49+
50+
@Override
51+
public void setupTask (TaskAttemptContext taskAttemptContext) throws IOException {
52+
try {
53+
delegate.setupTask(taskAttemptContext);
54+
} catch (IOException e) {
55+
getException(e);
56+
}
57+
}
58+
59+
@Override
60+
public boolean needsTaskCommit (TaskAttemptContext taskAttemptContext) throws IOException {
61+
try {
62+
return delegate.needsTaskCommit(taskAttemptContext);
63+
} catch (IOException e) {
64+
getException(e);
65+
}
66+
return false;
67+
}
68+
69+
@Override
70+
public void commitTask (TaskAttemptContext taskAttemptContext) throws IOException {
71+
try {
72+
delegate.commitTask(taskAttemptContext);
73+
} catch (IOException e) {
74+
getException(e);
75+
}
76+
}
77+
78+
@Override
79+
public void abortTask (TaskAttemptContext taskAttemptContext) throws IOException {
80+
try {
81+
delegate.abortTask(taskAttemptContext);
82+
} catch (IOException e) {
83+
getException(e);
84+
}
85+
}
86+
87+
@SuppressWarnings("rawtypes")
88+
public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
89+
String tableName) throws IOException, InterruptedException {
90+
if (delegate instanceof DelegatingGCSOutputCommitter) {
91+
try {
92+
((DelegatingGCSOutputCommitter) delegate).addGCSOutputCommitterFromOutputFormat(
93+
outputFormat, tableName);
94+
} catch (IOException e) {
95+
getException(e);
96+
}
97+
return;
98+
}
99+
throw GCPUtils.getProgramFailureException(null,
100+
String.format("Operation is not supported in the output committer: %s",
101+
delegate.getClass().getName()), null, ErrorType.SYSTEM, false);
102+
}
103+
104+
private void getException(IOException e) throws IOException {
105+
if (e instanceof GoogleJsonResponseException) {
106+
throw GCPUtils.getProgramFailureException((GoogleJsonResponseException) e);
107+
}
108+
throw e;
109+
}
110+
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -121,26 +121,42 @@ public void prepareRun(BatchSinkContext context) throws Exception {
121121
collector.addFailure("Service account type is undefined.",
122122
"Must be `filePath` or `JSON`");
123123
collector.getOrThrowException();
124-
return;
125124
}
126-
Credentials credentials = config.connection.getServiceAccount() == null ?
127-
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);
125+
126+
Credentials credentials = null;
127+
try {
128+
credentials = config.connection.getServiceAccount() == null ?
129+
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
130+
isServiceAccountFilePath);
131+
} catch (Exception e) {
132+
String errorReason = "Unable to load service account credentials.";
133+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
134+
.withStacktrace(e.getStackTrace());
135+
collector.getOrThrowException();
136+
}
137+
138+
String bucketName = config.getBucket(collector);
128139
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
140+
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
141+
String correctiveAction = "Ensure you entered the correct bucket path and "
142+
+ "have permissions for it.";
129143
Bucket bucket;
130-
String location;
144+
String location = null;
131145
try {
132-
bucket = storage.get(config.getBucket());
146+
bucket = storage.get(bucketName);
147+
if (bucket != null) {
148+
location = bucket.getLocation();
149+
} else {
150+
location = config.getLocation();
151+
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
152+
}
133153
} catch (StorageException e) {
134-
throw new RuntimeException(
135-
String.format("Unable to access or create bucket %s. ", config.getBucket())
136-
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
137-
}
138-
if (bucket != null) {
139-
location = bucket.getLocation();
140-
} else {
141-
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
142-
location = config.getLocation();
154+
String errorReason = String.format(errorReasonFormat, e.getCode());
155+
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
156+
.withStacktrace(e.getStackTrace());
157+
collector.getOrThrowException();
143158
}
159+
144160
this.outputPath = getOutputDir(context);
145161
// create asset for lineage
146162
asset = Asset.builder(config.getReferenceName())
@@ -532,8 +548,15 @@ public void validateContentType(FailureCollector failureCollector) {
532548
}
533549
}
534550

535-
public String getBucket() {
536-
return GCSPath.from(path).getBucket();
551+
public String getBucket(FailureCollector collector) {
552+
try {
553+
return GCSPath.from(path).getBucket();
554+
} catch (IllegalArgumentException e) {
555+
collector.addFailure(e.getMessage(), null)
556+
.withStacktrace(e.getStackTrace());
557+
collector.getOrThrowException();
558+
}
559+
return null;
537560
}
538561

539562
@Override
@@ -718,8 +741,8 @@ public Builder setCustomContentType(@Nullable String customContentType) {
718741
return this;
719742
}
720743

721-
public GCSBatchSink.GCSBatchSinkConfig build() {
722-
return new GCSBatchSink.GCSBatchSinkConfig(
744+
public GCSBatchSinkConfig build() {
745+
return new GCSBatchSinkConfig(
723746
referenceName,
724747
project,
725748
fileSystemProperties,

0 commit comments

Comments
 (0)