Skip to content

Commit c7457f5

Browse files
committed
Error Management for Google Publisher
1 parent 6db2c69 commit c7457f5

File tree

5 files changed

+96
-9
lines changed

5 files changed

+96
-9
lines changed

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProviderUtil.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
2121
import com.google.api.client.http.HttpResponseException;
22+
import com.google.api.gax.rpc.ApiException;
2223
import com.google.common.base.Strings;
2324
import com.google.common.base.Throwables;
2425
import io.cdap.cdap.api.exception.ErrorCategory;
@@ -69,6 +70,33 @@ public static ProgramFailureException getProgramFailureException(HttpResponseExc
6970
ErrorCodeType.HTTP, statusCode.toString(), externalDocumentationLink, e);
7071
}
7172

73+
/**
74+
* Get a ProgramFailureException with the given error
75+
* information from {@link ApiException}.
76+
*
77+
* @param e The ApiException to get the error information from.
78+
* @return A ProgramFailureException with the given error information.
79+
*/
80+
public static ProgramFailureException getProgramFailureException(ApiException e, String externalDocUrl,
81+
@Nullable ErrorContext errorContext) {
82+
List<Throwable> causalChain = Throwables.getCausalChain(e);
83+
for (Throwable t : causalChain) {
84+
if (t instanceof HttpResponseException) {
85+
return getProgramFailureException((HttpResponseException) t, externalDocUrl, errorContext);
86+
}
87+
}
88+
Integer statusCode = e.getStatusCode().getCode().getHttpStatusCode();
89+
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
90+
String errorReason = String.format("%s %s. %s. For more details, see %s", statusCode, e.getMessage(),
91+
pair.getCorrectiveAction(), externalDocUrl);
92+
String errorMessage = e.getMessage();
93+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
94+
errorContext != null ?
95+
String.format(GCPErrorDetailsProvider.ERROR_MESSAGE_FORMAT, errorContext.getPhase(), e.getClass().getName(),
96+
errorMessage) : String.format("%s: %s", e.getClass().getName(), errorMessage), pair.getErrorType(), true,
97+
ErrorCodeType.HTTP, statusCode.toString(), externalDocUrl, e);
98+
}
99+
72100
public static ProgramFailureException getHttpResponseExceptionDetailsFromChain(Throwable e, String errorReason,
73101
ErrorType errorType,
74102
boolean dependency,
@@ -79,11 +107,17 @@ public static ProgramFailureException getHttpResponseExceptionDetailsFromChain(T
79107
// Avoid double wrap
80108
return (ProgramFailureException) t;
81109
}
110+
}
111+
for (int i = causalChain.size() - 1; i >= 0; i--) {
112+
Throwable t = causalChain.get(i);
82113
if (t instanceof HttpResponseException) {
83114
return getProgramFailureException((HttpResponseException) t, externalDocUrl, null);
84115
}
116+
if (t instanceof ApiException) {
117+
return getProgramFailureException((ApiException) t, externalDocUrl, null);
118+
}
85119
}
86-
// If no HttpResponseException found in the causal chain, return generic program failure exception
120+
// If no HttpResponseException or ApiException found in the causal chain, return generic program failure exception
87121
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
88122
String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()), errorType, dependency, e);
89123
}

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class GCPUtils {
8181
public static final int MILLISECONDS_MULTIPLIER = 1000;
8282
public static final String GCS_SUPPORTED_DOC_URL = "https://cloud.google.com/storage/docs/json_api/v1/status-codes";
8383
public static final String BQ_SUPPORTED_DOC_URL = "https://cloud.google.com/bigquery/docs/error-messages";
84+
public static final String PUBSUB_SUPPORTED_DOC_URL = "https://cloud.google.com/pubsub/docs/reference/error-codes";
8485

8586
/**
8687
* Load a service account from the local file system.

src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,16 @@
3333
import io.cdap.cdap.api.data.format.StructuredRecord;
3434
import io.cdap.cdap.api.data.schema.Schema;
3535
import io.cdap.cdap.api.dataset.lib.KeyValue;
36+
import io.cdap.cdap.api.exception.ErrorCategory;
37+
import io.cdap.cdap.api.exception.ErrorCodeType;
38+
import io.cdap.cdap.api.exception.ErrorType;
39+
import io.cdap.cdap.api.exception.ErrorUtils;
3640
import io.cdap.cdap.etl.api.Emitter;
3741
import io.cdap.cdap.etl.api.FailureCollector;
3842
import io.cdap.cdap.etl.api.PipelineConfigurer;
3943
import io.cdap.cdap.etl.api.batch.BatchSink;
4044
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
45+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
4146
import io.cdap.cdap.format.StructuredRecordStringConverter;
4247
import io.cdap.plugin.common.LineageRecorder;
4348
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
@@ -116,11 +121,16 @@ public void prepareRun(BatchSinkContext context) throws IOException {
116121
} catch (AlreadyExistsException e1) {
117122
// can happen if there is a race condition. Ignore this error since all that matters is the topic exists
118123
} catch (ApiException e1) {
119-
throw new IOException(
120-
String.format("Could not auto-create topic '%s' in project '%s'. "
121-
+ "Please ensure it is created before running the pipeline, "
122-
+ "or ensure that the service account has permission to create the topic.",
123-
config.topic, projectId), e);
124+
int statusCode = e1.getStatusCode().getCode().getHttpStatusCode();
125+
String error = String.format(
126+
"Could not auto-create topic '%s' in project '%s'. "
127+
+ "Please ensure it is created before running the pipeline, "
128+
+ "or ensure that the service account has permission to create the topic."
129+
+ "For more details, see %s",
130+
config.topic, projectId, GCPUtils.PUBSUB_SUPPORTED_DOC_URL);
131+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
132+
error, error, ErrorType.UNKNOWN, true, ErrorCodeType.HTTP, String.valueOf(statusCode),
133+
GCPUtils.PUBSUB_SUPPORTED_DOC_URL, e1);
124134
}
125135
}
126136
}
@@ -130,6 +140,9 @@ public void prepareRun(BatchSinkContext context) throws IOException {
130140
LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName());
131141
lineageRecorder.createExternalDataset(inputSchema);
132142

143+
// set error details provider
144+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(GooglePublisherErrorDetailsProvider.class.getName()));
145+
133146
Configuration configuration = new Configuration();
134147
PubSubOutputFormat.configure(configuration, config);
135148
context.addOutput(Output.of(config.getReferenceName(),
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.publisher;
18+
19+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
20+
import io.cdap.plugin.gcp.common.GCPUtils;
21+
22+
/**
23+
* A custom ErrorDetailsProvider for Google Publisher.
24+
*/
25+
public class GooglePublisherErrorDetailsProvider extends GCPErrorDetailsProvider {
26+
27+
@Override
28+
protected String getExternalDocumentationLink() {
29+
return GCPUtils.PUBSUB_SUPPORTED_DOC_URL;
30+
}
31+
}

src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@
2626
import com.google.pubsub.v1.ProjectTopicName;
2727
import com.google.pubsub.v1.PubsubMessage;
2828
import io.cdap.cdap.api.data.format.StructuredRecord;
29+
import io.cdap.cdap.api.exception.ErrorCategory;
30+
import io.cdap.cdap.api.exception.ErrorType;
31+
import io.cdap.cdap.api.exception.ErrorUtils;
2932
import io.cdap.cdap.format.StructuredRecordStringConverter;
3033
import io.cdap.cdap.format.io.StructuredRecordDatumWriter;
3134
import io.cdap.plugin.format.avro.StructuredToAvroTransformer;
3235
import io.cdap.plugin.gcp.bigtable.sink.BigtableSinkConfig;
36+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
3337
import io.cdap.plugin.gcp.common.GCPUtils;
3438
import io.cdap.plugin.gcp.gcs.actions.GCSBucketCreate;
3539
import org.apache.avro.generic.GenericDatumWriter;
@@ -248,9 +252,11 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio
248252
return message;
249253
}
250254

251-
private void handleErrorIfAny() throws IOException {
255+
private void handleErrorIfAny() {
252256
if (failures.get() > errorThreshold) {
253-
throw new IOException(String.format("Failed to publish %s records", failures.get()), error.get());
257+
String errorMessage = String.format("Failed to publish %s records: %s.", failures.get(), error.get());
258+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
259+
errorMessage, errorMessage, ErrorType.UNKNOWN, true, null);
254260
}
255261
}
256262

@@ -264,7 +270,9 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException {
264270
handleErrorIfAny();
265271
}
266272
} catch (ExecutionException | InterruptedException e) {
267-
throw new IOException("Error publishing records to PubSub", e);
273+
String errorReason = String.format("Error publishing records to PubSub: %s.", e.getMessage());
274+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
275+
true, GCPUtils.PUBSUB_SUPPORTED_DOC_URL);
268276
} finally {
269277
try {
270278
publisher.shutdown();

0 commit comments

Comments
 (0)