Skip to content

Commit 6015db5

Browse files
Implement Program Failure Exception Handling in GCS plugins to catch known errors
1 parent e9e2391 commit 6015db5

20 files changed

+603
-84
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,8 +1010,8 @@
10101010
<version>1.1.0</version>
10111011
<configuration>
10121012
<cdapArtifacts>
1013-
<parent>system:cdap-data-pipeline[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
1014-
<parent>system:cdap-data-streams[6.9.1-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
1013+
<parent>system:cdap-data-pipeline[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
1014+
<parent>system:cdap-data-streams[6.11.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
10151015
</cdapArtifacts>
10161016
</configuration>
10171017
<executions>
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.common;
18+
19+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
20+
import com.google.api.client.http.HttpResponseException;
21+
import io.cdap.cdap.api.exception.ErrorCategory;
22+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
23+
import io.cdap.cdap.api.exception.ErrorUtils;
24+
import io.cdap.cdap.api.exception.ProgramFailureException;
25+
import java.io.IOException;
26+
27+
/**
28+
* Utility class to handle exceptions.
29+
*/
30+
public class ExceptionUtils {
31+
32+
/** Functional interfaces for lambda-friendly method invocations */
33+
@FunctionalInterface
34+
public interface IOOperation {
35+
void execute() throws IOException;
36+
}
37+
38+
/**
39+
* Functional interfaces for lambda-friendly method invocations.
40+
*
41+
* @param <T> the return type of the function
42+
*/
43+
@FunctionalInterface
44+
public interface IOFunction<T> {
45+
T execute() throws IOException;
46+
}
47+
48+
/** Functional interfaces for lambda-friendly method invocations */
49+
@FunctionalInterface
50+
public interface IOInterruptibleOperation {
51+
void execute() throws IOException, InterruptedException;
52+
}
53+
54+
/**
55+
* Functional interfaces for lambda-friendly method invocations.
56+
*
57+
* @param <T> the return type of the function
58+
*/
59+
@FunctionalInterface
60+
public interface IOInterruptibleFunction<T> {
61+
62+
T execute () throws IOException, InterruptedException;
63+
}
64+
65+
// Generic helper method to handle IOException propagation
66+
public static void invokeWithProgramFailureHandling(IOOperation operation) throws IOException {
67+
try {
68+
operation.execute();
69+
} catch (IOException e) {
70+
ProgramFailureException exception = getProgramFailureException(e);
71+
if (exception != null) {
72+
throw exception;
73+
}
74+
throw e;
75+
}
76+
}
77+
78+
// Helper method for returning values (for methods like {@link OutputCommitter#needsTaskCommit})
79+
public static <T> T invokeWithProgramFailureHandling(IOFunction<T> function) throws IOException {
80+
try {
81+
return function.execute();
82+
} catch (IOException e) {
83+
ProgramFailureException exception = getProgramFailureException(e);
84+
if (exception != null) {
85+
throw exception;
86+
}
87+
throw e;
88+
}
89+
}
90+
91+
// Helper method for handling both IOException and InterruptedException
92+
public static void invokeWithProgramFailureAndInterruptionHandling(
93+
IOInterruptibleOperation operation) throws IOException, InterruptedException {
94+
try {
95+
operation.execute();
96+
} catch (IOException e) {
97+
ProgramFailureException exception = getProgramFailureException(e);
98+
if (exception != null) {
99+
throw exception;
100+
}
101+
throw e;
102+
}
103+
}
104+
105+
// Helper method for handling both IOException and InterruptedException
106+
public static <T> T invokeWithProgramFailureAndInterruptionHandling(
107+
IOInterruptibleFunction<T> function) throws IOException, InterruptedException {
108+
try {
109+
return function.execute();
110+
} catch (IOException e) {
111+
ProgramFailureException exception = getProgramFailureException(e);
112+
if (exception != null) {
113+
throw exception;
114+
}
115+
throw e;
116+
}
117+
}
118+
119+
/**
120+
* Get a ProgramFailureException with the given error
121+
* information from {@link HttpResponseException}.
122+
*
123+
* @param e The HttpResponseException to get the error information from.
124+
* @return A ProgramFailureException with the given error information.
125+
*/
126+
private static ProgramFailureException getProgramFailureException(HttpResponseException e) {
127+
Integer statusCode = e.getStatusCode();
128+
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
129+
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
130+
pair.getCorrectiveAction());
131+
132+
String errorMessage = e.getMessage();
133+
if (e instanceof GoogleJsonResponseException) {
134+
GoogleJsonResponseException exception = (GoogleJsonResponseException) e;
135+
errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() :
136+
exception.getMessage();
137+
}
138+
139+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
140+
errorReason, errorMessage, pair.getErrorType(), true, e);
141+
}
142+
143+
/**
144+
* Get a ProgramFailureException with the given error
145+
* information from {@link IOException}.
146+
*
147+
* @param e The IOException to get the error information from.
148+
* @return A ProgramFailureException with the given error information, otherwise null.
149+
*/
150+
private static ProgramFailureException getProgramFailureException(IOException e) {
151+
Throwable target = e instanceof HttpResponseException ? e : e.getCause();
152+
if (target instanceof HttpResponseException) {
153+
return getProgramFailureException((HttpResponseException) target);
154+
}
155+
return null;
156+
}
157+
}

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.gcp.common;
1818

19+
import com.google.api.client.http.HttpResponseException;
1920
import com.google.api.gax.retrying.RetrySettings;
2021
import com.google.auth.Credentials;
2122
import com.google.auth.oauth2.ExternalAccountCredentials;
@@ -35,6 +36,10 @@
3536
import com.google.cloud.storage.StorageException;
3637
import com.google.cloud.storage.StorageOptions;
3738
import com.google.gson.reflect.TypeToken;
39+
import io.cdap.cdap.api.exception.ErrorCategory;
40+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
41+
import io.cdap.cdap.api.exception.ErrorUtils;
42+
import io.cdap.cdap.api.exception.ProgramFailureException;
3843
import io.cdap.plugin.gcp.gcs.GCSPath;
3944
import io.cdap.plugin.gcp.gcs.ServiceAccountAccessTokenProvider;
4045
import org.apache.hadoop.conf.Configuration;
@@ -79,7 +84,7 @@ public class GCPUtils {
7984
"https://www.googleapis.com/auth/bigquery");
8085
public static final String FQN_RESERVED_CHARACTERS_PATTERN = ".*[.:` \t\n].*";
8186
public static final int MILLISECONDS_MULTIPLIER = 1000;
82-
87+
public static final String WRAPPED_OUTPUTFORMAT_CLASSNAME = "wrapped.outputformat.classname";
8388
/**
8489
* Load a service account from the local file system.
8590
*

src/main/java/io/cdap/plugin/gcp/gcs/ServiceAccountAccessTokenProvider.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import com.google.bigtable.repackaged.com.google.gson.Gson;
2222
import com.google.cloud.hadoop.util.AccessTokenProvider;
2323
import com.google.cloud.hadoop.util.CredentialFactory;
24+
import io.cdap.cdap.api.exception.ErrorCategory;
25+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
26+
import io.cdap.cdap.api.exception.ErrorType;
27+
import io.cdap.cdap.api.exception.ErrorUtils;
2428
import io.cdap.plugin.gcp.common.GCPUtils;
2529
import org.apache.hadoop.conf.Configuration;
2630

@@ -50,13 +54,20 @@ public AccessToken getAccessToken() {
5054
}
5155
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
5256
} catch (IOException e) {
53-
throw new RuntimeException(e);
57+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
58+
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
5459
}
5560
}
5661

5762
@Override
5863
public void refresh() throws IOException {
59-
getCredentials().refresh();
64+
try {
65+
getCredentials().refresh();
66+
} catch (IOException e) {
67+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
68+
"Unable to refresh service account access token.", e.getMessage(),
69+
ErrorType.UNKNOWN, true, e);
70+
}
6071
}
6172

6273
private GoogleCredentials getCredentials() throws IOException {

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputCommitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
6565
taskAttemptContext.getConfiguration(), tableName));
6666

6767
//Wrap output committer into the GCS Output Committer.
68-
GCSOutputCommitter gcsOutputCommitter = new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext));
68+
ForwardingOutputCommitter gcsOutputCommitter =
69+
new ForwardingOutputCommitter(
70+
new GCSOutputCommitter(outputFormat.getOutputCommitter(taskAttemptContext)));
6971

7072
gcsOutputCommitter.setupJob(taskAttemptContext);
7173
gcsOutputCommitter.setupTask(taskAttemptContext);

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputFormat.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
package io.cdap.plugin.gcp.gcs.sink;
1818

1919
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import io.cdap.cdap.api.exception.ErrorCategory;
21+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
22+
import io.cdap.cdap.api.exception.ErrorType;
23+
import io.cdap.cdap.api.exception.ErrorUtils;
24+
import io.cdap.plugin.gcp.common.GCPUtils;
2025
import org.apache.hadoop.conf.Configuration;
2126
import org.apache.hadoop.io.NullWritable;
2227
import org.apache.hadoop.mapreduce.JobContext;
@@ -46,11 +51,13 @@ public DelegatingGCSOutputFormat() {
4651
* Get required configuration properties for this Output Format
4752
*/
4853
public static Map<String, String> configure(String delegateClassName,
54+
String wrappedClassName,
4955
String filterField,
5056
String outputBaseDir,
5157
String outputSuffix) {
5258
Map<String, String> config = new HashMap<>();
5359
config.put(DELEGATE_CLASS, delegateClassName);
60+
config.put(GCPUtils.WRAPPED_OUTPUTFORMAT_CLASSNAME, wrappedClassName);
5461
config.put(PARTITION_FIELD, filterField);
5562
config.put(OUTPUT_PATH_BASE_DIR, outputBaseDir);
5663
config.put(OUTPUT_PATH_SUFFIX, outputSuffix);
@@ -62,7 +69,8 @@ public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptC
6269
Configuration hConf = context.getConfiguration();
6370
String partitionField = hConf.get(PARTITION_FIELD);
6471

65-
return new DelegatingGCSRecordWriter(context, partitionField, getOutputCommitter(context));
72+
return new ForwardingRecordWriter(new DelegatingGCSRecordWriter(context, partitionField,
73+
getOutputCommitter(context), this));
6674
}
6775

6876
@Override
@@ -71,8 +79,7 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted
7179
}
7280

7381
@Override
74-
public DelegatingGCSOutputCommitter getOutputCommitter(TaskAttemptContext context) {
75-
return new DelegatingGCSOutputCommitter(context);
82+
public ForwardingOutputCommitter getOutputCommitter(TaskAttemptContext context) {
83+
return new ForwardingOutputCommitter(new DelegatingGCSOutputCommitter(context));
7684
}
77-
7885
}

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSOutputUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
package io.cdap.plugin.gcp.gcs.sink;
1818

1919
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import io.cdap.cdap.api.exception.ErrorCategory;
21+
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
22+
import io.cdap.cdap.api.exception.ErrorType;
23+
import io.cdap.cdap.api.exception.ErrorUtils;
2024
import org.apache.hadoop.conf.Configuration;
2125
import org.apache.hadoop.io.NullWritable;
2226
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -36,7 +40,9 @@ public static OutputFormat<NullWritable, StructuredRecord> getDelegateFormat(Con
3640
(Class<OutputFormat<NullWritable, StructuredRecord>>) hConf.getClassByName(delegateClassName);
3741
return delegateClass.newInstance();
3842
} catch (Exception e) {
39-
throw new IOException("Unable to instantiate output format for class " + delegateClassName, e);
43+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
44+
String.format("Unable to instantiate output format for class '%s'.", delegateClassName),
45+
e.getMessage(), ErrorType.SYSTEM, false, e);
4046
}
4147
}
4248

src/main/java/io/cdap/plugin/gcp/gcs/sink/DelegatingGCSRecordWriter.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,16 @@ public class DelegatingGCSRecordWriter extends RecordWriter<NullWritable, Struct
3535
private final TaskAttemptContext context;
3636
private final String partitionField;
3737
private final Map<String, RecordWriter<NullWritable, StructuredRecord>> delegateMap;
38-
private final DelegatingGCSOutputCommitter delegatingGCSOutputCommitter;
38+
private final ForwardingOutputCommitter delegatingGCSOutputCommitter;
39+
private final DelegatingGCSOutputFormat outputFormat;
3940

40-
DelegatingGCSRecordWriter(TaskAttemptContext context,
41-
String partitionField,
42-
DelegatingGCSOutputCommitter delegatingGCSOutputCommitter) {
41+
DelegatingGCSRecordWriter(TaskAttemptContext context, String partitionField,
42+
ForwardingOutputCommitter delegatingGCSOutputCommitter, DelegatingGCSOutputFormat outputFormat) {
4343
this.context = context;
4444
this.partitionField = partitionField;
4545
this.delegateMap = new HashMap<>();
4646
this.delegatingGCSOutputCommitter = delegatingGCSOutputCommitter;
47+
this.outputFormat = outputFormat;
4748
}
4849

4950
@Override
@@ -55,6 +56,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
5556
if (delegateMap.containsKey(tableName)) {
5657
delegate = delegateMap.get(tableName);
5758
} else {
59+
5860
//Get output format from configuration.
5961
OutputFormat<NullWritable, StructuredRecord> format =
6062
DelegatingGCSOutputUtils.getDelegateFormat(context.getConfiguration());
@@ -63,7 +65,7 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
6365
delegatingGCSOutputCommitter.addGCSOutputCommitterFromOutputFormat(format, tableName);
6466

6567
//Add record writer to delegate map.
66-
delegate = format.getRecordWriter(context);
68+
delegate = new ForwardingRecordWriter(format.getRecordWriter(context));
6769
delegateMap.put(tableName, delegate);
6870
}
6971

0 commit comments

Comments
 (0)