diff --git a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java index 718a707..7967cfd 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java @@ -230,33 +230,10 @@ public String getConnectionArguments() { } public void validate(FailureCollector collector) { - if (getOauth2Enabled()) { - if (!containsMacro(PROPERTY_CLIENT_ID) - && Strings.isNullOrEmpty(getClientId())) { - collector.addFailure("Client ID is not set.", null) - .withConfigProperty(PROPERTY_CLIENT_ID); - } - if (!containsMacro(PROPERTY_CLIENT_SECRET) - && Strings.isNullOrEmpty(getClientSecret())) { - collector.addFailure("Client Secret is not set.", null) - .withConfigProperty(PROPERTY_CLIENT_SECRET); - } - if (!containsMacro(PROPERTY_REFRESH_TOKEN) - && Strings.isNullOrEmpty(getRefreshToken())) { - collector.addFailure("Refresh Token is not set.", null) - .withConfigProperty(PROPERTY_REFRESH_TOKEN); - } - } else if (getKeyPairEnabled()) { - if (!containsMacro(PROPERTY_USERNAME) - && Strings.isNullOrEmpty(getUsername())) { - collector.addFailure("Username is not set.", null) - .withConfigProperty(PROPERTY_USERNAME); - } - if (!containsMacro(PROPERTY_PRIVATE_KEY) - && Strings.isNullOrEmpty(getPrivateKey())) { - collector.addFailure("Private Key is not set.", null) - .withConfigProperty(PROPERTY_PRIVATE_KEY); - } + if (Boolean.TRUE.equals(getOauth2Enabled())) { + validateWhenOath2Enabled(collector); + } else if (Boolean.TRUE.equals(getKeyPairEnabled())) { + validateWhenKeyPairEnabled(collector); } else { if (!containsMacro(PROPERTY_USERNAME) && Strings.isNullOrEmpty(getUsername())) { @@ -272,6 +249,37 @@ public void validate(FailureCollector collector) { validateConnection(collector); } + private void validateWhenKeyPairEnabled(FailureCollector collector) { + if (!containsMacro(PROPERTY_USERNAME) + && Strings.isNullOrEmpty(getUsername())) { + collector.addFailure("Username is not set.", null) + .withConfigProperty(PROPERTY_USERNAME); + } + if (!containsMacro(PROPERTY_PRIVATE_KEY) + && Strings.isNullOrEmpty(getPrivateKey())) { + collector.addFailure("Private Key is not set.", null) + .withConfigProperty(PROPERTY_PRIVATE_KEY); + } + } + + private void validateWhenOath2Enabled(FailureCollector collector) { + if (!containsMacro(PROPERTY_CLIENT_ID) + && Strings.isNullOrEmpty(getClientId())) { + collector.addFailure("Client ID is not set.", null) + .withConfigProperty(PROPERTY_CLIENT_ID); + } + if (!containsMacro(PROPERTY_CLIENT_SECRET) + && Strings.isNullOrEmpty(getClientSecret())) { + collector.addFailure("Client Secret is not set.", null) + .withConfigProperty(PROPERTY_CLIENT_SECRET); + } + if (!containsMacro(PROPERTY_REFRESH_TOKEN) + && Strings.isNullOrEmpty(getRefreshToken())) { + collector.addFailure("Refresh Token is not set.", null) + .withConfigProperty(PROPERTY_REFRESH_TOKEN); + } + } + public boolean canConnect() { return (!containsMacro(PROPERTY_DATABASE) && !containsMacro(PROPERTY_SCHEMA_NAME) && !containsMacro(PROPERTY_ACCOUNT_NAME) && !containsMacro(PROPERTY_USERNAME) @@ -299,7 +307,7 @@ protected void validateConnection(FailureCollector collector) { .withConfigProperty(PROPERTY_USERNAME); // TODO: for oauth2 - if (keyPairEnabled) { + if (Boolean.TRUE.equals(keyPairEnabled)) { failure.withConfigProperty(PROPERTY_PRIVATE_KEY); } else { failure.withConfigProperty(PROPERTY_PASSWORD); diff --git a/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java b/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java index 3d94a17..f529776 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/OAuthUtil.java @@ -19,6 +19,9 @@ import com.google.gson.JsonElement; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -26,6 +29,7 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -50,16 +54,23 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient httppost.setHeader("Content-type", "application/x-www-form-urlencoded"); // set grant type and refresh_token. It should be in body not url! - StringEntity entity = new StringEntity(String.format("refresh_token=%s&grant_type=refresh_token", - URLEncoder.encode(config.getRefreshToken(), "UTF-8"))); - httppost.setEntity(entity); + try { + StringEntity entity = new StringEntity(String.format("refresh_token=%s&grant_type=refresh_token", + URLEncoder.encode(config.getRefreshToken(), "UTF-8"))); + httppost.setEntity(entity); + } catch (NullPointerException e) { + String errorMessage = String.format("Failed to encode URL due to missing Refresh Token with message: %s.", + e.getMessage()); + String errorReason = "Error encoding URL due to missing Refresh Token."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, true, e); + } // set 'Authorization' header String stringToEncode = config.getClientId() + ":" + config.getClientSecret(); String encondedAuthorization = new String(Base64.getEncoder().encode(stringToEncode.getBytes())); httppost.setHeader("Authorization", String.format("Basic %s", encondedAuthorization)); - CloseableHttpResponse response = httpclient.execute(httppost); String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); @@ -72,7 +83,13 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient // if exception happened during parsing OR if json does not contain 'access_token' key. if (jsonElement == null) { - throw new RuntimeException(String.format("Unexpected response '%s' from '%s'", responseString, uri.toString())); + String errorReason = String.format("Failed to parse access token from response. Request %s returned response " + + "code '%s' & reason: %s", uri.toString(), response.getStatusLine().getStatusCode(), + response.getStatusLine().getReasonPhrase()); + String errorMessage = String.format("Failed to parse access token, request %s returned response %s " + + "with code '%s'.", uri, responseString, response.getStatusLine().getStatusCode()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.SYSTEM, true, new JsonSyntaxException(errorReason)); } return jsonElement.getAsString(); diff --git a/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorDetailsProvider.java new file mode 100644 index 0000000..31e1a01 --- /dev/null +++ b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorDetailsProvider.java @@ -0,0 +1,80 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.snowflake.common; + +import com.google.common.base.Throwables; +import io.cdap.cdap.api.data.format.UnexpectedFormatException; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; +import io.cdap.plugin.snowflake.common.exception.SchemaParseException; + +import java.util.List; + +/** + * Error details provided for the Snowflake + **/ +public class SnowflakeErrorDetailsProvider implements ErrorDetailsProvider { + + private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. Error message: %s"; + + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof IllegalArgumentException) { + return getProgramFailureException((IllegalArgumentException) t, errorContext, ErrorType.USER); + } + if (t instanceof IllegalStateException) { + return getProgramFailureException((IllegalStateException) t, errorContext, ErrorType.SYSTEM); + } + if (t instanceof SchemaParseException) { + return getProgramFailureException((SchemaParseException) t, errorContext, ErrorType.USER); + } + if (t instanceof UnexpectedFormatException) { + return getProgramFailureException((UnexpectedFormatException) t, errorContext, ErrorType.SYSTEM); + } + if (t instanceof ConnectionTimeoutException) { + return getProgramFailureException((ConnectionTimeoutException) t, errorContext, ErrorType.SYSTEM); + } + } + return null; + } + + /** + * Get a ProgramFailureException with the given error + * information from {@link Exception}. + * + * @param exception The Exception to get the error information from. + * @return A ProgramFailureException with the given error information. + */ + private ProgramFailureException getProgramFailureException(Exception exception, ErrorContext errorContext, + ErrorType errorType) { + String errorMessage = exception.getMessage(); + return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, + String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(), errorMessage), errorType, false, exception); + } +} diff --git a/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java new file mode 100644 index 0000000..9e6424a --- /dev/null +++ b/src/main/java/io/cdap/plugin/snowflake/common/SnowflakeErrorType.java @@ -0,0 +1,185 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.snowflake.common; + +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; +import net.snowflake.client.jdbc.ErrorCode; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Error Type provided based on the Snowflake error message code + * + **/ +public class SnowflakeErrorType { + + private static final Map ERROR_CODE_TO_ERROR_TYPE; + private static final Map ERROR_CODE_TO_ERROR_CATEGORY; + + //https://github.com/snowflakedb/snowflake-jdbc/blob/master/src/main/java/net/snowflake/client/jdbc/ErrorCode.java + static { + ERROR_CODE_TO_ERROR_TYPE = new HashMap<>(); + ERROR_CODE_TO_ERROR_TYPE.put(200004, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200006, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200007, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200008, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200009, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200010, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200011, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200012, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200014, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200017, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200018, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200019, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200021, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200023, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200024, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200025, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200026, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200028, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200029, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200030, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200031, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200032, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200033, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200034, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200035, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200036, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200037, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200038, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200045, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200046, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200047, ErrorType.USER); + ERROR_CODE_TO_ERROR_TYPE.put(200056, ErrorType.USER); + + ERROR_CODE_TO_ERROR_TYPE.put(200001, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200002, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200003, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200013, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200015, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200016, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200020, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200022, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200039, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200040, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200044, ErrorType.SYSTEM); + ERROR_CODE_TO_ERROR_TYPE.put(200061, ErrorType.SYSTEM); + + + ErrorCategory.ErrorCategoryEnum plugin = ErrorCategory.ErrorCategoryEnum.PLUGIN; + ERROR_CODE_TO_ERROR_CATEGORY = new HashMap<>(); + ERROR_CODE_TO_ERROR_CATEGORY.put(200004, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200006, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200007, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200008, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200009, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200010, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200011, new ErrorCategory(plugin, "INVALID_AUTHORIZATION_SPECIFICATION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200012, new ErrorCategory(plugin, "INVALID_AUTHORIZATION_SPECIFICATION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200014, new ErrorCategory(plugin, "PROGRAM_LIMIT_EXCEEDED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200017, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200018, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200019, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200021, new ErrorCategory(plugin, "SQL_STATEMENT_NOT_YET_COMPLETE")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200023, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200024, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200025, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200026, new ErrorCategory(plugin, "INVALID_AUTHORIZATION_SPECIFICATION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200028, new ErrorCategory(plugin, "INVALID_AUTHORIZATION_SPECIFICATION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200029, new ErrorCategory(plugin, "INVALID_AUTHORIZATION_SPECIFICATION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200030, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200031, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200032, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200033, new ErrorCategory(plugin, "INVALID_PARAMETER_VALUE")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200034, new ErrorCategory(plugin, "DATA_EXCEPTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200035, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200036, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200037, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200038, new ErrorCategory(plugin, "FEATURE_NOT_SUPPORTED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200045, new ErrorCategory(plugin, "SYNTAX_ERROR")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200046, new ErrorCategory(plugin, "SYNTAX_ERROR")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200047, new ErrorCategory(plugin, "INVALID_PARAMETER_VALUE")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200056, new ErrorCategory(plugin, "INVALID_PARAMETER_VALUE")); + + //system errors + ERROR_CODE_TO_ERROR_CATEGORY.put(200001, new ErrorCategory(plugin, "INTERNAL_ERROR")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200002, + new ErrorCategory(plugin, "SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200003, new ErrorCategory(plugin, "QUERY_CANCELED")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200013, new ErrorCategory(plugin, "SYSTEM_ERROR")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200015, new ErrorCategory(plugin, "IO_ERROR")); //network error + ERROR_CODE_TO_ERROR_CATEGORY.put(200016, new ErrorCategory(plugin, "IO_ERROR")); //io error + ERROR_CODE_TO_ERROR_CATEGORY.put(200020, new ErrorCategory(plugin, "SYSTEM_ERROR")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200022, new ErrorCategory(plugin, "INTERNAL_ERROR")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200039, + new ErrorCategory(plugin, "SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200040, + new ErrorCategory(plugin, "SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200044, new ErrorCategory(plugin, "SYSTEM_ERROR")); + ERROR_CODE_TO_ERROR_CATEGORY.put(200061, new ErrorCategory(plugin, "SYSTEM_ERROR")); + } + + /** + * Method to get the error type based on the error code. + * + * @param errorCode the error code to classify + * @return the corresponding ErrorType (USER, SYSTEM, UNKNOWN) + */ + private static ErrorType getErrorTypeFromErrorCode(int errorCode) { + if (ERROR_CODE_TO_ERROR_TYPE.containsKey(errorCode)) { + return ERROR_CODE_TO_ERROR_TYPE.get(errorCode); + } + return ErrorType.UNKNOWN; + } + + + /** + * Method to get the error type based on the error code. + * + * @param errorCode the error code to classify + * @return the corresponding ErrorCategory + */ + private static ErrorCategory getErrorCategoryFromSqlState(int errorCode) { + if (ERROR_CODE_TO_ERROR_CATEGORY.containsKey(errorCode)) { + return ERROR_CODE_TO_ERROR_CATEGORY.get(errorCode); + } + return new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN); + } + + public static ProgramFailureException fetchProgramFailureException(SQLException e, String errorReason, + String errorMessage) { + Optional errorCodes = Arrays.stream(ErrorCode.values()) + .filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState())) + .findFirst(); + ErrorCategory errorCategory = errorCodes.isPresent() ? + getErrorCategoryFromSqlState(errorCodes.get().getMessageCode()) : + new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN); + ErrorType errorType = errorCodes.isPresent() ? getErrorTypeFromErrorCode(errorCodes.get().getMessageCode()) : + ErrorType.UNKNOWN; + return ErrorUtils.getProgramFailureException(errorCategory, errorReason, errorMessage, errorType, + true, ErrorCodeType.SQLSTATE, e.getSQLState(), DocumentUrlUtil.getSupportedDocumentUrl(), e); + } +} diff --git a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java index d74d85e..911f696 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java @@ -18,10 +18,15 @@ import com.google.common.base.Strings; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.common.KeyValueListParser; import io.cdap.plugin.snowflake.common.BaseSnowflakeConfig; import io.cdap.plugin.snowflake.common.OAuthUtil; +import io.cdap.plugin.snowflake.common.SnowflakeErrorType; import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; +import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; import io.cdap.plugin.snowflake.common.util.QueryUtil; import net.snowflake.client.jdbc.SnowflakeBasicDataSource; import org.apache.http.impl.client.HttpClients; @@ -56,14 +61,17 @@ public SnowflakeAccessor(BaseSnowflakeConfig config) { initDataSource(dataSource, config); } - public void runSQL(String query) throws IOException { + public void runSQL(String query) { try (Connection connection = dataSource.getConnection(); PreparedStatement populateStmt = connection.prepareStatement(query);) { populateStmt.execute(); } catch (SQLException e) { - throw new IOException(String.format("Statement '%s' failed due to '%s'", query, e.getMessage()), e); + String errorMessage = String.format("Statement '%s' failed with SQL state %s and error code %s due to '%s'", + query, e.getSQLState(), e.getErrorCode(), e.getMessage()); + String errorReason = String.format("Statement '%s' failed with SQL state %s and error code %s. For more " + + "details see %s.", query, e.getSQLState(), e.getErrorCode(), DocumentUrlUtil.getSupportedDocumentUrl()); + throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage); } - } /** @@ -88,7 +96,11 @@ public List describeQuery(String query) throws IOExcep fieldDescriptors.add(new SnowflakeFieldDescriptor(name, type, nullable)); } } catch (SQLException e) { - throw new IOException(e); + String errorMessage = String.format("Failed to execute query to fetch descriptors with SQL State %s and error " + + "code %s with message: %s.", e.getSQLState(), e.getErrorCode(), e.getMessage()); + String errorReason = String.format("Failed to execute query to fetch descriptors with SQL State %s and error " + + "code %s. For more details %s", e.getSQLState(), e.getErrorCode(), DocumentUrlUtil.getSupportedDocumentUrl()); + throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage); } return fieldDescriptors; } @@ -108,14 +120,14 @@ private void initDataSource(SnowflakeBasicDataSource dataSource, BaseSnowflakeCo dataSource.setRole(role); } - if (config.getOauth2Enabled()) { + if (Boolean.TRUE.equals(config.getOauth2Enabled())) { String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config); dataSource.setOauthToken(accessToken); // The recommend way to pass token is in the password when you use the driver with connection pool. // This is also a mandatory field, so adding the same. // Refer https://github.com/snowflakedb/snowflake-jdbc/issues/1175 dataSource.setPassword(accessToken); - } else if (config.getKeyPairEnabled()) { + } else if (Boolean.TRUE.equals(config.getKeyPairEnabled())) { dataSource.setUser(config.getUsername()); String privateKeyPath = writeTextToTmpFile(config.getPrivateKey()); @@ -143,6 +155,12 @@ public void checkConnection() { connection.getMetaData(); } catch (SQLException e) { throw new ConnectionTimeoutException("Cannot create Snowflake connection.", e); + } catch (NullPointerException e) { + String errorMessage = String.format("Failed to create Snowflake connection due to missing Username or password " + + "with message: %s.", e.getMessage()); + String errorReason = "Cannot create Snowflake connection. Username or password is missing."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, true, e); } } // SnowflakeBasicDataSource doesn't provide access for additional properties. diff --git a/src/main/java/io/cdap/plugin/snowflake/common/util/DocumentUrlUtil.java b/src/main/java/io/cdap/plugin/snowflake/common/util/DocumentUrlUtil.java new file mode 100644 index 0000000..08af4b9 --- /dev/null +++ b/src/main/java/io/cdap/plugin/snowflake/common/util/DocumentUrlUtil.java @@ -0,0 +1,42 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.snowflake.common.util; + +/** + * Utility helper class for Snowflake documentation url + **/ +public final class DocumentUrlUtil { + // Private constructor to prevent instantiation + private DocumentUrlUtil() { + throw new UnsupportedOperationException("Utility class cannot be instantiated"); + } + + /** + * Supported document URL. + */ + private static final String SUPPORTED_DOCUMENT_URL = + "https://docs.snowflake.com/en/user-guide/client-connectivity-troubleshooting/error-messages"; + + /** + * Retrieves the supported document URL. + * + * @return the supported document URL + */ + public static String getSupportedDocumentUrl() { + return SUPPORTED_DOCUMENT_URL; + } +} diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java index 436ae74..2431404 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/CSVBuffer.java @@ -15,6 +15,9 @@ */ package io.cdap.plugin.snowflake.sink.batch; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -32,12 +35,12 @@ public class CSVBuffer implements Closeable { .withAllowMissingColumnNames(false); private CSVPrinter csvPrinter; - private ByteArrayOutputStream csvStream; + private final ByteArrayOutputStream csvStream; private boolean isHeaderPrinted; - private boolean printHeader; + private final boolean printHeader; private int recordsCount = 0; - public CSVBuffer(boolean printHeader) throws IOException { + public CSVBuffer(boolean printHeader) { this.printHeader = printHeader; this.csvStream = new ByteArrayOutputStream(); reset(); @@ -54,12 +57,20 @@ public void write(CSVRecord csvRecord) throws IOException { recordsCount++; } - public void reset() throws IOException { + public void reset() { isHeaderPrinted = !printHeader; recordsCount = 0; csvStream.reset(); // we need to re-create this or else OutputStreamWriter will not able to write after reset. - csvPrinter = new CSVPrinter(new OutputStreamWriter(csvStream, StandardCharsets.UTF_8), csvFormat); + try { + csvPrinter = new CSVPrinter(new OutputStreamWriter(csvStream, StandardCharsets.UTF_8), csvFormat); + } catch (IOException e) { + String errorMessage = String.format("Unable to reset the CSV stream and recreate the CSV printer, " + + "failed with error message: %s", e.getMessage()); + String errorReason = "Unable to reset the CSV stream and recreate the printer."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.UNKNOWN, true, e); + } } public int size() { diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java index 3c7fa86..6774bf5 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeBatchSink.java @@ -29,7 +29,9 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.snowflake.common.SnowflakeErrorDetailsProvider; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,12 +71,14 @@ public void prepareRun(BatchSinkContext context) { config.validate(inputSchema, collector); collector.getOrThrowException(); + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(SnowflakeErrorDetailsProvider.class.getName())); context.addOutput(Output.of(config.getReferenceName(), new SnowflakeOutputFormatProvider(config))); LineageRecorder lineageRecorder = new LineageRecorder(context, config.getReferenceName()); lineageRecorder.createExternalDataset(inputSchema); // Record the field level WriteOperation - if (inputSchema.getFields() != null && !inputSchema.getFields().isEmpty()) { + if (inputSchema != null && inputSchema.getFields() != null && !inputSchema.getFields().isEmpty()) { String operationDescription = String.format("Wrote to Snowflake table '%s'", config.getTableName()); lineageRecorder.recordWrite("Write", operationDescription, inputSchema.getFields().stream() diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java index 41d5613..a241693 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeOutputFormat.java @@ -16,7 +16,9 @@ package io.cdap.plugin.snowflake.sink.batch; import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.SQLException; import java.util.UUID; /** @@ -43,9 +46,8 @@ public class SnowflakeOutputFormat extends OutputFormat public static final String DESTINATION_STAGE_PATH_PROPERTY = "cdap.dest.stage.path"; @Override - public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) - throws IOException { - return new SnowflakeRecordWriter(taskAttemptContext); + public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) { + return new SnowflakeRecordWriter(taskAttemptContext); } @Override @@ -64,11 +66,11 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) public void setupJob(JobContext jobContext) { Configuration conf = jobContext.getConfiguration(); conf.set(DESTINATION_STAGE_PATH_PROPERTY, DESTINATION_STAGE_PATH); - LOG.info(String.format("Writing data to '%s'", DESTINATION_STAGE_PATH)); + LOG.info("Writing data to '{}'", DESTINATION_STAGE_PATH); } @Override - public void commitJob(JobContext jobContext) throws IOException { + public void commitJob(JobContext jobContext) { Configuration conf = jobContext.getConfiguration(); String configJson = conf.get( SnowflakeOutputFormatProvider.PROPERTY_CONFIG_JSON); diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java index 9434481..897cb5b 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeRecordWriter.java @@ -16,6 +16,9 @@ package io.cdap.plugin.snowflake.sink.batch; import com.google.gson.Gson; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; @@ -41,7 +44,7 @@ public class SnowflakeRecordWriter extends RecordWriter private final SnowflakeSinkAccessor snowflakeAccessor; private final String destinationStagePath; - public SnowflakeRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException { + public SnowflakeRecordWriter(TaskAttemptContext taskAttemptContext) { Configuration conf = taskAttemptContext.getConfiguration(); destinationStagePath = conf.get(SnowflakeOutputFormat.DESTINATION_STAGE_PATH_PROPERTY); String configJson = conf.get( @@ -55,29 +58,51 @@ public SnowflakeRecordWriter(TaskAttemptContext taskAttemptContext) throws IOExc } @Override - public void write(NullWritable key, CSVRecord csvRecord) throws IOException { + public void write(NullWritable key, CSVRecord csvRecord) { csvBufferSizeCheck.reset(); - csvBufferSizeCheck.write(csvRecord); + try { + csvBufferSizeCheck.write(csvRecord); + } catch (IOException e) { + String errorMessage = String.format("Failed to write CSV record %s in the size check buffer with message: %s", + csvRecord, e.getMessage()); + String errorReason = "Unable to write CSV record in the size check buffer"; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.SYSTEM, true, e); + } if (config.getMaxFileSize() > 0 && csvBuffer.size() + csvBufferSizeCheck.size() > config.getMaxFileSize()) { submitCurrentBatch(); } - csvBuffer.write(csvRecord); + try { + csvBuffer.write(csvRecord); + } catch (IOException e) { + String errorMessage = String.format("Failed to write CSV record %s in the main buffer with message: %s", + csvRecord, e.getMessage()); + String errorReason = String.format("Unable to write CSV record '%s' in the main buffer.", csvRecord); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.SYSTEM, true, e); + } } - private void submitCurrentBatch() throws IOException { + private void submitCurrentBatch() { if (csvBuffer.getRecordsCount() != 0) { try (InputStream csvInputStream = new ByteArrayInputStream(csvBuffer.getByteArray())) { snowflakeAccessor.uploadStream(csvInputStream, destinationStagePath); + } catch (IOException e) { + String errorMessage = String.format("Failed to upload file to the destination stage '%s' with message: %s", + destinationStagePath, e.getMessage()); + String errorReason = String.format("Failed to upload file to the destination stage '%s'.", + destinationStagePath); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.SYSTEM, true, e); } - csvBuffer.reset(); } } @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException { + public void close(TaskAttemptContext taskAttemptContext) { submitCurrentBatch(); } } diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java index 0eba82d..5eae056 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java @@ -16,11 +16,12 @@ package io.cdap.plugin.snowflake.sink.batch; +import io.cdap.plugin.snowflake.common.SnowflakeErrorType; import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor; +import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; import net.snowflake.client.jdbc.SnowflakeConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.InputStream; import java.sql.Connection; import java.sql.SQLException; @@ -45,7 +46,7 @@ public SnowflakeSinkAccessor(SnowflakeSinkConfig config) { this.config = config; } - public void uploadStream(InputStream inputStream, String stageDir) throws IOException { + public void uploadStream(InputStream inputStream, String stageDir) { // file name needs to be unique across all the nodes. String filename = String.format(DEST_FILE_NAME, UUID.randomUUID().toString()); LOG.info("Uploading file '{}' to table stage", filename); @@ -55,17 +56,22 @@ public void uploadStream(InputStream inputStream, String stageDir) throws IOExce null, inputStream, filename, true); } catch (SQLException e) { - throw new IOException(e); + String errorReason = String.format("Unable to compress '%s' and upload data to destination stage '%s'. For " + + "more details, see %s", filename, stageDir, DocumentUrlUtil.getSupportedDocumentUrl()); + String errorMessage = String.format("Failed to compress '%s' and upload data to destination stage '%s' with " + + "errorCode: '%s' and sqlState: '%s' with message: %s.", filename, stageDir, e.getErrorCode(), e.getSQLState(), + e.getMessage()); + throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage); } } - public void populateTable(String destinationStagePath) throws IOException { + public void populateTable(String destinationStagePath) { String populateStatement = String.format(POPULATE_TABLE_STAGE, config.getTableName(), destinationStagePath, config.getCopyOptions()); runSQL(populateStatement); } - public void removeDirectory(String path) throws IOException { + public void removeDirectory(String path) { runSQL(String.format("remove %s", path)); } } diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java index 49ec955..e6aba5b 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/StructuredRecordToCSVRecordTransformer.java @@ -19,6 +19,9 @@ import com.google.gson.JsonParser; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.format.StructuredRecordStringConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,7 @@ import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -39,7 +43,7 @@ public class StructuredRecordToCSVRecordTransformer { private static final Logger LOG = LoggerFactory.getLogger(StructuredRecordToCSVRecordTransformer.class); - public CSVRecord transform(StructuredRecord record) throws IOException { + public CSVRecord transform(StructuredRecord record) { List fieldNames = new ArrayList<>(); List values = new ArrayList<>(); @@ -63,8 +67,7 @@ public CSVRecord transform(StructuredRecord record) throws IOException { * @return string representing the value in format, which can be understood by Snowflake */ @Nullable - public static String convertSchemaFieldToString(Object value, Schema.Field field, StructuredRecord record) - throws IOException { + public static String convertSchemaFieldToString(Object value, Schema.Field field, StructuredRecord record) { // don't convert null to avoid NPE if (value == null) { return null; @@ -100,7 +103,18 @@ public static String convertSchemaFieldToString(Object value, Schema.Field field instant = Instant.ofEpochMilli((Long) value); return instant.atZone(ZoneOffset.UTC).toLocalTime().toString(); case DECIMAL: - return record.getDecimal(field.getName()).toString(); + try { + return record.getDecimal(field.getName()).toString(); + } catch (Exception e) { + String errorReason = String.format("Field '%s' is expected to be a decimal, but is a %s.", + field.getName(), value.getClass().getSimpleName()); + String errorMessage = String.format("Field '%s' is expected to be a decimal, but is a %s. " + + "Failed to fetch record with message: %s", field.getName(), value.getClass().getSimpleName(), + e.getMessage()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, true, e); + } + default: throw new IllegalArgumentException( String.format("Field '%s' is of unsupported type '%s'", fieldSchema.getDisplayName(), @@ -111,12 +125,31 @@ public static String convertSchemaFieldToString(Object value, Schema.Field field switch (fieldSchema.getType()) { // convert to json so it can be saved to Snowflake's variant case RECORD: - return StructuredRecordStringConverter.toJsonString((StructuredRecord) value); - // convert to json so it can be saved to Snowflake's variant + try { + return StructuredRecordStringConverter.toJsonString((StructuredRecord) value); + } catch (IOException e) { + String errorMessage = String.format("Failed to encode record to JSON for schema field '%s' of type RECORD " + + "with message: %s", field.getName(), e.getMessage()); + String errorReason = String.format( + "Failed to encode record to JSON for schema field '%s' of type RECORD.", field.getName()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.SYSTEM, true, e); + } + // convert to json so it can be saved to Snowflake's variant case ARRAY: - String stringRecord = StructuredRecordStringConverter.toJsonString(record); - JsonElement jsonObject = new JsonParser().parse(stringRecord); - return jsonObject.getAsJsonObject().get(field.getName()).toString(); + String stringRecord; + try { + stringRecord = StructuredRecordStringConverter.toJsonString(record); + JsonElement jsonObject = new JsonParser().parse(stringRecord); + return jsonObject.getAsJsonObject().get(field.getName()).toString(); + } catch (IOException e) { + String errorMessage = String.format("Failed to encode record to JSON for schema field '%s' of type ARRAY " + + "with message: %s", field.getName(), e.getMessage()); + String errorReason = String.format( + "Failed to encode record to JSON for schema field '%s' of type ARRAY.", field.getName()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.SYSTEM, true, e); + } // convert to hex which can be understood by Snowflake and saved to BINARY type case BYTES: byte[] bytes; diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java index f5fde15..a8b0248 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java @@ -30,7 +30,9 @@ import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.snowflake.common.SnowflakeErrorDetailsProvider; import io.cdap.plugin.snowflake.common.util.SchemaHelper; import org.apache.hadoop.io.NullWritable; @@ -87,6 +89,8 @@ public void prepareRun(BatchSourceContext context) { .collect(Collectors.toList())); } + // set error details provider + context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(SnowflakeErrorDetailsProvider.class.getName())); context.setInput(Input.of(config.getReferenceName(), new SnowflakeInputFormatProvider(config, escapeChar))); } diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java index 9cd9ac8..561a10f 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSourceConfig.java @@ -19,7 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.snowflake.common.BaseSnowflakeConfig; + +import java.util.Objects; import javax.annotation.Nullable; /** @@ -83,4 +86,14 @@ public String getReferenceName() { public String getSchema() { return schema; } + + public void validate(FailureCollector collector) { + super.validate(collector); + + if (!containsMacro(PROPERTY_MAX_SPLIT_SIZE) && Objects.nonNull(maxSplitSize) + && maxSplitSize < 0) { + collector.addFailure("Maximum Slit Size cannot be a negative number.", null) + .withConfigProperty(PROPERTY_MAX_SPLIT_SIZE); + } + } } diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java index c4c1d1b..64304e3 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java @@ -18,7 +18,6 @@ import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -26,7 +25,6 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -39,7 +37,7 @@ public class SnowflakeInputFormat extends InputFormat { private static final Gson GSON = new Gson(); @Override - public List getSplits(JobContext jobContext) throws IOException { + public List getSplits(JobContext jobContext) { SnowflakeSourceAccessor snowflakeAccessor = getSnowflakeAccessor(jobContext.getConfiguration()); List stageSplits = snowflakeAccessor.prepareStageSplits(); return stageSplits.stream() diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java index 97f6996..f4d0a4f 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import io.cdap.cdap.api.data.batch.InputFormatProvider; import java.util.Map; diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java index d8990c0..335296d 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java @@ -20,6 +20,9 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.format.UnexpectedFormatException; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +54,20 @@ public StructuredRecord transform(Map row) { private StructuredRecord getStructuredRecord(Map row, Schema schema) { StructuredRecord.Builder builder = StructuredRecord.builder(schema); - row.entrySet().stream() - .filter(entry -> schema.getField(entry.getKey()) != null) // filter absent fields in the schema - .forEach(entry -> builder.set( - entry.getKey(), - convertValue(entry.getKey(), entry.getValue(), schema.getField(entry.getKey()).getSchema()))); - return builder.build(); + try { + row.entrySet().stream() + .filter(entry -> schema.getField(entry.getKey()) != null) // filter absent fields in the schema + .forEach(entry -> builder.set( + entry.getKey(), + convertValue(entry.getKey(), entry.getValue(), schema.getField(entry.getKey()).getSchema()))); + return builder.build(); + } catch (Exception e) { + String errorMessage = String.format("Failed to transform data with the provided schema with message: %s.", + e.getMessage()); + String errorReason = String.format("Failed to transform data with the provided schema: %s.", schema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, true, e); + } } @Nullable @@ -105,11 +116,11 @@ private Object convertValue(String fieldName, String value, Schema fieldSchema) return Double.parseDouble(castValue(value, fieldName, String.class)); case STRING: return value; + default: + throw new UnexpectedFormatException( + String.format("Unsupported schema type: '%s' for field: '%s'. Supported types are 'bytes, boolean, " + + "double, string'.", fieldSchema, fieldName)); } - - throw new UnexpectedFormatException( - String.format("Unsupported schema type: '%s' for field: '%s'. Supported types are 'bytes, boolean, " - + "double, string'.", fieldSchema, fieldName)); } private static T castValue(Object value, String fieldName, Class clazz) { diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java index bc8c922..202be53 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java @@ -17,9 +17,10 @@ package io.cdap.plugin.snowflake.source.batch; import au.com.bytecode.opencsv.CSVReader; +import io.cdap.plugin.snowflake.common.SnowflakeErrorType; import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor; +import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil; import io.cdap.plugin.snowflake.common.util.QueryUtil; -import io.cdap.plugin.snowflake.sink.batch.SnowflakeSinkAccessor; import net.snowflake.client.jdbc.SnowflakeConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ * A class which accesses Snowflake API to do actions used by batch source. */ public class SnowflakeSourceAccessor extends SnowflakeAccessor { - private static final Logger LOG = LoggerFactory.getLogger(SnowflakeSinkAccessor.class); + private static final Logger LOG = LoggerFactory.getLogger(SnowflakeSourceAccessor.class); // Directory should be unique, so that parallel pipelines can run correctly, as well as after failure we don't // have old stage files in the dir. private static final String STAGE_PATH = "@~/cdap_stage/result" + UUID.randomUUID() + "/"; @@ -74,7 +75,7 @@ public SnowflakeSourceAccessor(SnowflakeBatchSourceConfig config, String escapeC * @return List of file paths in Snowflake stage. * @throws IOException thrown if there are any issue with the I/O operations. */ - public List prepareStageSplits() throws IOException { + public List prepareStageSplits() { LOG.info("Loading data into stage: '{}'", STAGE_PATH); String copy = String.format(COMAND_COPY_INTO, QueryUtil.removeSemicolon(config.getImportQuery())); if (config.getMaxSplitSize() > 0) { @@ -92,7 +93,12 @@ public List prepareStageSplits() throws IOException { } } } catch (SQLException e) { - throw new IOException(e); + String errorReason = String.format("Failed to load data into stage '%s' with sqlState %s and errorCode %s. " + + "For more details, see %s.", STAGE_PATH, e.getErrorCode(), e.getSQLState(), + DocumentUrlUtil.getSupportedDocumentUrl()); + String errorMessage = String.format("Failed to load data into stage '%s' with sqlState %s and errorCode %s. " + + "Failed to execute query with message: %s.", STAGE_PATH, e.getSQLState(), e.getErrorCode(), e.getMessage()); + throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage); } return stageSplits; } @@ -102,7 +108,7 @@ public List prepareStageSplits() throws IOException { * @param stageSplit path to file in Snowflake stage. * @throws IOException hrown if there are any issue with the I/O operations. */ - public void removeStageFile(String stageSplit) throws IOException { + public void removeStageFile(String stageSplit) { runSQL(String.format("remove @~/%s", stageSplit)); } @@ -111,16 +117,20 @@ public void removeStageFile(String stageSplit) throws IOException { * * @param stageSplit path to file in Snowflake stage. * @return CSVReader. - * @throws IOException thrown if there are any issue with the I/O operations. */ - public CSVReader buildCsvReader(String stageSplit) throws IOException { + public CSVReader buildCsvReader(String stageSplit) { try (Connection connection = dataSource.getConnection()) { InputStream downloadStream = connection.unwrap(SnowflakeConnection.class) .downloadStream("@~", stageSplit, true); InputStreamReader inputStreamReader = new InputStreamReader(downloadStream); return new CSVReader(inputStreamReader, ',', '"', escapeChar); } catch (SQLException e) { - throw new IOException(e); + String errorReason = String.format("Failed to execute the query with sqlState: '%s' & errorCode: '%s'. " + + "For more details, see %s.", e.getSQLState(), e.getErrorCode(), DocumentUrlUtil.getSupportedDocumentUrl()); + String errorMessage = String.format("Failed to execute the query with sqlState: '%s' & errorCode: '%s' " + + "with message: %s, stage split at %s.", e.getSQLState(), e.getErrorCode(), + e.getMessage(), stageSplit); + throw SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage); } } }