Skip to content

Commit 48c3e60

Browse files
committed
Error Management for Spanner
1 parent 64891be commit 48c3e60

File tree

9 files changed

+146
-13
lines changed

9 files changed

+146
-13
lines changed

src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public class GCPUtils {
8787
public static final String GCS_SUPPORTED_DOC_URL = "https://cloud.google.com/storage/docs/json_api/v1/status-codes";
8888
public static final String BQ_SUPPORTED_DOC_URL = "https://cloud.google.com/bigquery/docs/error-messages";
8989
public static final String PUBSUB_SUPPORTED_DOC_URL = "https://cloud.google.com/pubsub/docs/reference/error-codes";
90+
public static final String SPANNER_SUPPORTED_DOC_URL = "https://cloud.google.com/spanner/docs/error-codes";
9091
public static final int BQ_DEFAULT_READ_TIMEOUT_SECONDS = 120;
9192

9293
/**
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.spanner.common;
18+
19+
import com.google.cloud.spanner.ErrorCode;
20+
import com.google.cloud.spanner.SpannerException;
21+
import com.google.common.base.Throwables;
22+
import io.cdap.cdap.api.exception.ErrorCategory;
23+
import io.cdap.cdap.api.exception.ErrorCodeType;
24+
import io.cdap.cdap.api.exception.ErrorType;
25+
import io.cdap.cdap.api.exception.ErrorUtils;
26+
import io.cdap.cdap.api.exception.ProgramFailureException;
27+
import io.cdap.cdap.etl.api.exception.ErrorContext;
28+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
29+
import io.cdap.plugin.gcp.common.GCPUtils;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
/**
35+
* A custom ErrorDetailsProvider for Spanner.
36+
*/
37+
public class SpannerErrorDetailsProvider extends GCPErrorDetailsProvider {
38+
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s";
39+
40+
static Map<ErrorCode, ErrorUtils.ActionErrorPair> actionErrorMap = new HashMap<>();
41+
42+
static {
43+
actionErrorMap.put(ErrorCode.CANCELLED, ErrorUtils.getActionErrorByStatusCode(499));
44+
actionErrorMap.put(ErrorCode.UNKNOWN, ErrorUtils.getActionErrorByStatusCode(500));
45+
actionErrorMap.put(ErrorCode.INVALID_ARGUMENT, ErrorUtils.getActionErrorByStatusCode(400));
46+
actionErrorMap.put(ErrorCode.DEADLINE_EXCEEDED, ErrorUtils.getActionErrorByStatusCode(504));
47+
actionErrorMap.put(ErrorCode.NOT_FOUND, ErrorUtils.getActionErrorByStatusCode(404));
48+
actionErrorMap.put(ErrorCode.ALREADY_EXISTS, ErrorUtils.getActionErrorByStatusCode(409));
49+
actionErrorMap.put(ErrorCode.PERMISSION_DENIED, ErrorUtils.getActionErrorByStatusCode(403));
50+
actionErrorMap.put(ErrorCode.UNAUTHENTICATED, ErrorUtils.getActionErrorByStatusCode(401));
51+
actionErrorMap.put(ErrorCode.RESOURCE_EXHAUSTED, ErrorUtils.getActionErrorByStatusCode(429));
52+
actionErrorMap.put(ErrorCode.FAILED_PRECONDITION, ErrorUtils.getActionErrorByStatusCode(400));
53+
actionErrorMap.put(ErrorCode.ABORTED, ErrorUtils.getActionErrorByStatusCode(409));
54+
actionErrorMap.put(ErrorCode.OUT_OF_RANGE, ErrorUtils.getActionErrorByStatusCode(400));
55+
actionErrorMap.put(ErrorCode.UNIMPLEMENTED, ErrorUtils.getActionErrorByStatusCode(501));
56+
actionErrorMap.put(ErrorCode.INTERNAL, ErrorUtils.getActionErrorByStatusCode(500));
57+
actionErrorMap.put(ErrorCode.UNAVAILABLE, ErrorUtils.getActionErrorByStatusCode(503));
58+
actionErrorMap.put(ErrorCode.DATA_LOSS, ErrorUtils.getActionErrorByStatusCode(500));
59+
}
60+
61+
@Override
62+
protected String getExternalDocumentationLink() {
63+
return GCPUtils.SPANNER_SUPPORTED_DOC_URL;
64+
}
65+
66+
@Override
67+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
68+
ProgramFailureException ex = super.getExceptionDetails(e, errorContext);
69+
if (ex != null) {
70+
return ex;
71+
}
72+
List<Throwable> causalChain = Throwables.getCausalChain(e);
73+
for (Throwable t : causalChain) {
74+
if (t instanceof SpannerException) {
75+
return getProgramFailureExceptionFromSpannerException((SpannerException) t);
76+
}
77+
}
78+
return null;
79+
}
80+
81+
private ProgramFailureException getProgramFailureExceptionFromSpannerException(SpannerException se) {
82+
String errorCodeName = se.getErrorCode().name();
83+
ErrorUtils.ActionErrorPair actionErrorPair = null;
84+
String errorReason = se.getReason();
85+
String errorMessage = se.getMessage();
86+
if (actionErrorMap.containsKey(se.getErrorCode())) {
87+
actionErrorPair = actionErrorMap.get(se.getErrorCode());
88+
errorReason = String.format("%s %s. %s", errorCodeName, errorMessage, actionErrorPair.getCorrectiveAction());
89+
}
90+
if (!errorReason.endsWith(".")) {
91+
errorReason = errorReason + ".";
92+
}
93+
errorReason = String.format("%s For more details, see %s.", errorReason, GCPUtils.SPANNER_SUPPORTED_DOC_URL);
94+
95+
String errorMessageWithCode = String.format("[ErrorCode='%s'] %s", errorCodeName, errorMessage);
96+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
97+
errorReason, String.format("%s: %s", se.getClass().getName(), errorMessageWithCode),
98+
actionErrorPair != null ? actionErrorPair.getErrorType() : ErrorType.UNKNOWN, true, ErrorCodeType.HTTP,
99+
errorCodeName, GCPUtils.SPANNER_SUPPORTED_DOC_URL, se);
100+
}
101+
}

src/main/java/io/cdap/plugin/gcp/spanner/sink/RecordToMutationTransformer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import io.cdap.cdap.api.data.format.StructuredRecord;
2525
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
2626
import io.cdap.cdap.api.data.schema.Schema;
27+
import io.cdap.cdap.api.exception.ErrorCategory;
28+
import io.cdap.cdap.api.exception.ErrorType;
29+
import io.cdap.cdap.api.exception.ErrorUtils;
2730

2831
import java.lang.reflect.Array;
2932
import java.nio.ByteBuffer;
@@ -224,8 +227,10 @@ static Collection<Object> toCollection(String fieldName, String fieldType, Objec
224227
} else if (value.getClass().isArray()) {
225228
return convertToObjectCollection(value);
226229
} else {
227-
throw new UnexpectedFormatException(
228-
String.format("Field '%s' of type '%s' has unexpected value '%s'", fieldName, fieldType, value));
230+
String errorMessage = String.format
231+
("Field '%s' of type '%s' has unexpected value '%s'.", fieldName, fieldType, value);
232+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
233+
errorMessage, errorMessage, ErrorType.USER, false, null);
229234
}
230235
}
231236

src/main/java/io/cdap/plugin/gcp/spanner/sink/SpannerSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,15 @@
3939
import io.cdap.cdap.etl.api.batch.BatchSink;
4040
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
4141
import io.cdap.cdap.etl.api.connector.Connector;
42+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
4243
import io.cdap.plugin.common.Asset;
4344
import io.cdap.plugin.common.LineageRecorder;
4445
import io.cdap.plugin.common.ReferenceBatchSink;
4546
import io.cdap.plugin.common.ReferenceNames;
4647
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
4748
import io.cdap.plugin.gcp.common.CmekUtils;
4849
import io.cdap.plugin.gcp.spanner.SpannerConstants;
50+
import io.cdap.plugin.gcp.spanner.common.SpannerErrorDetailsProvider;
4951
import io.cdap.plugin.gcp.spanner.common.SpannerUtil;
5052
import io.cdap.plugin.gcp.spanner.connector.SpannerConnector;
5153
import org.apache.hadoop.conf.Configuration;
@@ -133,6 +135,9 @@ public void prepareRun(BatchSinkContext context) throws Exception {
133135
LineageRecorder lineageRecorder = new LineageRecorder(context, asset);
134136
lineageRecorder.createExternalDataset(schema);
135137

138+
// set error details provider
139+
context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(SpannerErrorDetailsProvider.class.getName()));
140+
136141
SpannerOutputFormat.configure(configuration, config, schema);
137142
context.addOutput(Output.of(referenceName,
138143
new SinkOutputFormatProvider(SpannerOutputFormat.class, configuration)));

src/main/java/io/cdap/plugin/gcp/spanner/source/PartitionInputSplit.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package io.cdap.plugin.gcp.spanner.source;
1818

1919
import com.google.cloud.spanner.Partition;
20+
import io.cdap.cdap.api.exception.ErrorCategory;
21+
import io.cdap.cdap.api.exception.ErrorType;
22+
import io.cdap.cdap.api.exception.ErrorUtils;
2023
import org.apache.hadoop.io.Writable;
2124
import org.apache.hadoop.mapreduce.InputSplit;
2225
import org.slf4j.Logger;
@@ -81,7 +84,9 @@ public void readFields(DataInput dataInput) throws IOException {
8184
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
8285
partition = (Partition) objectInputStream.readObject();
8386
} catch (ClassNotFoundException cfe) {
84-
throw new IOException("Exception while trying to deserialize object ", cfe);
87+
String errorMessage = String.format("Exception while trying to deserialize object: %s.", cfe);
88+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
89+
errorMessage, errorMessage, ErrorType.SYSTEM, false, null);
8590
}
8691
}
8792

src/main/java/io/cdap/plugin/gcp/spanner/source/ResultSetToRecordTransformer.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import com.google.cloud.spanner.ResultSet;
2323
import com.google.cloud.spanner.Type;
2424
import io.cdap.cdap.api.data.format.StructuredRecord;
25-
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
2625
import io.cdap.cdap.api.data.schema.Schema;
26+
import io.cdap.cdap.api.exception.ErrorCategory;
27+
import io.cdap.cdap.api.exception.ErrorType;
28+
import io.cdap.cdap.api.exception.ErrorUtils;
2729

2830
import java.time.Instant;
2931
import java.time.LocalDate;
@@ -105,10 +107,10 @@ private void validateDateTime(Schema schema, String fieldName, String value) {
105107
try {
106108
LocalDateTime.parse(value);
107109
} catch (DateTimeParseException exception) {
108-
throw new UnexpectedFormatException(
109-
String
110-
.format("Datetime field '%s' with value '%s' is not in ISO-8601 format.", fieldName, value.toString()),
111-
exception);
110+
String errorMessage = String.format("Datetime field '%s' with value '%s' is not in ISO-8601 format.", fieldName,
111+
value.toString());
112+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
113+
errorMessage, errorMessage, ErrorType.USER, false, exception);
112114
}
113115
}
114116
}
@@ -178,8 +180,10 @@ private Long convertTimestampToLong(String fieldName, Timestamp timestamp, Schem
178180
long micros = TimeUnit.SECONDS.toMicros(instant.getEpochSecond());
179181
return Math.addExact(micros, TimeUnit.NANOSECONDS.toMicros(instant.getNano()));
180182
} catch (ArithmeticException e) {
181-
throw new UnexpectedFormatException(String.format("Field %s was set to a %s that is too large.", fieldName,
182-
logicalType.getToken()));
183+
String errorMessage = String.format("Field %s was set to a %s that is too large.", fieldName,
184+
logicalType.getToken());
185+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
186+
errorMessage, errorMessage, ErrorType.USER, false, e);
183187
}
184188
}
185189
}

src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerInputFormat.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
import com.google.cloud.spanner.BatchTransactionId;
2020
import com.google.cloud.spanner.Partition;
2121
import com.google.cloud.spanner.ResultSet;
22+
import io.cdap.cdap.api.exception.ErrorCategory;
23+
import io.cdap.cdap.api.exception.ErrorType;
24+
import io.cdap.cdap.api.exception.ErrorUtils;
2225
import io.cdap.plugin.gcp.spanner.SpannerConstants;
2326
import org.apache.hadoop.conf.Configuration;
2427
import org.apache.hadoop.io.NullWritable;
@@ -59,6 +62,7 @@ public List<InputSplit> getSplits(JobContext jobContext) throws IOException, Int
5962
}
6063
} catch (Exception e) {
6164
throw new IOException("Exception while trying to initialize spanner and create partition splits", e);
65+
6266
}
6367
LOG.debug("Initialized and configured {} splits", partitionSplits.size());
6468
return partitionSplits;
@@ -74,7 +78,9 @@ private <T> T deserializeObject(Configuration configuration, String property) th
7478
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
7579
return (T) objectInputStream.readObject();
7680
} catch (ClassNotFoundException cfe) {
77-
throw new IOException("Exception while trying to deserialize object ", cfe);
81+
String errorMessage = String.format("Exception while trying to deserialize object: %s.", cfe);
82+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
83+
errorMessage, errorMessage, ErrorType.SYSTEM, false, null);
7884
}
7985
}
8086

src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,15 @@
5050
import io.cdap.cdap.etl.api.batch.BatchSource;
5151
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
5252
import io.cdap.cdap.etl.api.connector.Connector;
53+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
5354
import io.cdap.plugin.common.Asset;
5455
import io.cdap.plugin.common.LineageRecorder;
5556
import io.cdap.plugin.common.ReferenceNames;
5657
import io.cdap.plugin.common.SourceInputFormatProvider;
5758
import io.cdap.plugin.gcp.common.GCPConfig;
5859
import io.cdap.plugin.gcp.common.Schemas;
5960
import io.cdap.plugin.gcp.spanner.SpannerConstants;
61+
import io.cdap.plugin.gcp.spanner.common.SpannerErrorDetailsProvider;
6062
import io.cdap.plugin.gcp.spanner.common.SpannerUtil;
6163
import io.cdap.plugin.gcp.spanner.connector.SpannerConnector;
6264
import org.apache.commons.lang3.StringUtils;
@@ -196,6 +198,10 @@ public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
196198
LineageRecorder lineageRecorder = new LineageRecorder(batchSourceContext, asset);
197199
lineageRecorder.createExternalDataset(configuredSchema);
198200

201+
// set error details provider
202+
batchSourceContext.setErrorDetailsProvider(new ErrorDetailsProviderSpec
203+
(SpannerErrorDetailsProvider.class.getName()));
204+
199205
// set input format and pass configuration
200206
batchSourceContext.setInput(Input.of(referenceName,
201207
new SourceInputFormatProvider(SpannerInputFormat.class, configuration)));

src/test/java/io/cdap/plugin/gcp/spanner/source/ResultSetToRecordTransformerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import com.google.cloud.spanner.Type;
2424
import com.google.common.base.Charsets;
2525
import io.cdap.cdap.api.data.format.StructuredRecord;
26-
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
2726
import io.cdap.cdap.api.data.schema.Schema;
27+
import io.cdap.cdap.api.exception.ProgramFailureException;
2828
import org.junit.Assert;
2929
import org.junit.Test;
3030
import org.mockito.Mockito;
@@ -150,7 +150,7 @@ public void testTransform() {
150150
Assert.assertEquals(formattedDatetime, record.get("datetime_field"));
151151
}
152152

153-
@Test(expected = UnexpectedFormatException.class)
153+
@Test(expected = ProgramFailureException.class)
154154
public void testInvalidDateTime() {
155155
Schema testSchema = Schema
156156
.recordOf("record", Schema.Field.of("datetime_field", Schema.of(Schema.LogicalType.DATETIME)));

0 commit comments

Comments
 (0)