-
Notifications
You must be signed in to change notification settings - Fork 34
[PLUGIN-1824] ErrorDetailsProvider - MySql Source/Sink plugin #523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| /* | ||
| * Copyright © 2024 Cask Data, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
| * use this file except in compliance with the License. You may obtain a copy of | ||
| * the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations under | ||
| * the License. | ||
| */ | ||
|
|
||
| package io.cdap.plugin.db; | ||
|
|
||
| import com.google.common.base.Strings; | ||
| import com.google.common.base.Throwables; | ||
| import io.cdap.cdap.api.exception.ErrorCategory; | ||
| import io.cdap.cdap.api.exception.ErrorType; | ||
| import io.cdap.cdap.api.exception.ErrorUtils; | ||
| import io.cdap.cdap.api.exception.ProgramFailureException; | ||
| import io.cdap.cdap.etl.api.exception.ErrorContext; | ||
| import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; | ||
|
|
||
| import java.sql.SQLException; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * A custom ErrorDetailsProvider for Database plugins. | ||
| */ | ||
| public class DBErrorDetailsProvider implements ErrorDetailsProvider { | ||
|
|
||
| public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { | ||
| List<Throwable> causalChain = Throwables.getCausalChain(e); | ||
| for (Throwable t : causalChain) { | ||
| if (t instanceof ProgramFailureException) { | ||
| // if causal chain already has program failure exception, return null to avoid double wrap. | ||
| return null; | ||
| } | ||
| if (t instanceof SQLException) { | ||
| return getProgramFailureException((SQLException) t, errorContext); | ||
| } | ||
| if (t instanceof IllegalArgumentException) { | ||
| return getProgramFailureException((IllegalArgumentException) t, errorContext); | ||
| } | ||
| if (t instanceof IllegalStateException) { | ||
| return getProgramFailureException((IllegalStateException) t, errorContext); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Get a ProgramFailureException with the given error | ||
| * information from {@link SQLException}. | ||
| * | ||
| * @param e The SQLException to get the error information from. | ||
| * @return A ProgramFailureException with the given error information. | ||
| */ | ||
| private ProgramFailureException getProgramFailureException(SQLException e, ErrorContext errorContext) { | ||
| String errorMessage = e.getMessage(); | ||
| String sqlState = e.getSQLState(); | ||
| int errorCode = e.getErrorCode(); | ||
| String errorMessageWithDetails = String.format( | ||
| "Error occurred in the phase: '%s'. Error message: '%s'. Error code: '%s'. sqlState: '%s'", | ||
| errorContext.getPhase(), errorMessage, errorCode, sqlState); | ||
| String externalDocumentationLink = getExternalDocumentationLink(); | ||
| if (!Strings.isNullOrEmpty(externalDocumentationLink)) { | ||
| if (!errorMessageWithDetails.endsWith(".")) { | ||
| errorMessageWithDetails = errorMessageWithDetails + "."; | ||
| } | ||
| errorMessageWithDetails = String.format("%s For more details, see %s", errorMessageWithDetails, | ||
| externalDocumentationLink); | ||
| } | ||
| return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), | ||
| errorMessage, errorMessageWithDetails, getErrorTypeFromErrorCode(errorCode), false, e); | ||
| } | ||
|
|
||
| /** | ||
| * Get a ProgramFailureException with the given error | ||
| * information from {@link IllegalArgumentException}. | ||
| * | ||
| * @param e The IllegalArgumentException to get the error information from. | ||
| * @return A ProgramFailureException with the given error information. | ||
| */ | ||
| private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) { | ||
| String errorMessage = e.getMessage(); | ||
| String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; | ||
| return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), | ||
| errorMessage, | ||
| String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e); | ||
| } | ||
|
|
||
| /** | ||
| * Get a ProgramFailureException with the given error | ||
| * information from {@link IllegalStateException}. | ||
| * | ||
| * @param e The IllegalStateException to get the error information from. | ||
| * @return A ProgramFailureException with the given error information. | ||
| */ | ||
| private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) { | ||
| String errorMessage = e.getMessage(); | ||
| String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s"; | ||
| return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), | ||
| errorMessage, | ||
| String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e); | ||
| } | ||
|
|
||
| /** | ||
| * Get the external documentation link for the client errors if available. | ||
| * | ||
| * @return The external documentation link as a {@link String}. | ||
| */ | ||
| protected String getExternalDocumentationLink() { | ||
| return null; | ||
| } | ||
|
|
||
| protected ErrorType getErrorTypeFromErrorCode(int errorCode) { | ||
| return ErrorType.UNKNOWN; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| import io.cdap.cdap.etl.api.StageConfigurer; | ||
| import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; | ||
| import io.cdap.cdap.etl.api.batch.BatchSinkContext; | ||
| import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; | ||
| import io.cdap.cdap.etl.api.validation.InvalidStageException; | ||
| import io.cdap.plugin.common.LineageRecorder; | ||
| import io.cdap.plugin.common.ReferenceBatchSink; | ||
|
|
@@ -42,6 +43,7 @@ | |
| import io.cdap.plugin.db.ConnectionConfig; | ||
| import io.cdap.plugin.db.ConnectionConfigAccessor; | ||
| import io.cdap.plugin.db.DBConfig; | ||
| import io.cdap.plugin.db.DBErrorDetailsProvider; | ||
| import io.cdap.plugin.db.DBRecord; | ||
| import io.cdap.plugin.db.Operation; | ||
| import io.cdap.plugin.db.SchemaReader; | ||
|
|
@@ -163,6 +165,16 @@ public void validateOperations(FailureCollector collector, T dbSinkConfig, @Null | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the ErrorDetailsProvider class name. | ||
| * Override this method to provide a custom ErrorDetailsProvider class name. | ||
| * | ||
| * @return ErrorDetailsProvider class name | ||
| */ | ||
| protected String getErrorDetailsProviderClassName() { | ||
| return DBErrorDetailsProvider.class.getName(); | ||
| } | ||
|
|
||
| @Override | ||
| public void prepareRun(BatchSinkContext context) { | ||
| String connectionString = dbSinkConfig.getConnectionString(); | ||
|
|
@@ -227,7 +239,8 @@ public void prepareRun(BatchSinkContext context) { | |
| configuration.set(ETLDBOutputFormat.COMMIT_BATCH_SIZE, | ||
| context.getArguments().get(ETLDBOutputFormat.COMMIT_BATCH_SIZE)); | ||
| } | ||
|
|
||
| // set error details provider | ||
| context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Otherwise error details provider information is not propagated to platform.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, moved ! |
||
| addOutputContext(context); | ||
| } | ||
| protected void addOutputContext(BatchSinkContext context) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,13 +25,17 @@ | |
| import io.cdap.cdap.api.data.format.StructuredRecord; | ||
| import io.cdap.cdap.api.data.schema.Schema; | ||
| import io.cdap.cdap.api.dataset.lib.KeyValue; | ||
| import io.cdap.cdap.api.exception.ErrorCategory; | ||
| import io.cdap.cdap.api.exception.ErrorType; | ||
| import io.cdap.cdap.api.exception.ErrorUtils; | ||
| import io.cdap.cdap.api.plugin.PluginConfig; | ||
| import io.cdap.cdap.etl.api.Emitter; | ||
| import io.cdap.cdap.etl.api.FailureCollector; | ||
| import io.cdap.cdap.etl.api.PipelineConfigurer; | ||
| import io.cdap.cdap.etl.api.StageConfigurer; | ||
| import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; | ||
| import io.cdap.cdap.etl.api.batch.BatchSourceContext; | ||
| import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec; | ||
| import io.cdap.cdap.internal.io.SchemaTypeAdapter; | ||
| import io.cdap.plugin.common.LineageRecorder; | ||
| import io.cdap.plugin.common.ReferenceBatchSource; | ||
|
|
@@ -41,6 +45,7 @@ | |
| import io.cdap.plugin.db.ConnectionConfig; | ||
| import io.cdap.plugin.db.ConnectionConfigAccessor; | ||
| import io.cdap.plugin.db.DBConfig; | ||
| import io.cdap.plugin.db.DBErrorDetailsProvider; | ||
| import io.cdap.plugin.db.DBRecord; | ||
| import io.cdap.plugin.db.SchemaReader; | ||
| import io.cdap.plugin.db.TransactionIsolationLevel; | ||
|
|
@@ -119,8 +124,9 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { | |
| collector.addFailure("Unable to instantiate JDBC driver: " + e.getMessage(), null) | ||
| .withStacktrace(e.getStackTrace()); | ||
| } catch (SQLException e) { | ||
| collector.addFailure("SQL error while getting query schema: " + e.getMessage(), null) | ||
| .withStacktrace(e.getStackTrace()); | ||
| String details = String.format("SQL error while getting query schema: Error: %s, SQLState: %s, ErrorCode: %s", | ||
| e.getMessage(), e.getSQLState(), e.getErrorCode()); | ||
| collector.addFailure(details, null).withStacktrace(e.getStackTrace()); | ||
| } catch (Exception e) { | ||
| collector.addFailure(e.getMessage(), null).withStacktrace(e.getStackTrace()); | ||
| } | ||
|
|
@@ -194,7 +200,11 @@ private Schema loadSchemaFromDB(Class<? extends Driver> driverClass) | |
|
|
||
| } catch (SQLException e) { | ||
| // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath | ||
| throw new SQLException(e.getMessage(), e.getSQLState(), e.getErrorCode()); | ||
| String errorMessageWithDetails = String.format("Error occurred while trying to get schema from database." + | ||
| "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState()); | ||
| throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), | ||
| e.getMessage(), errorMessageWithDetails, ErrorType.USER, false, new SQLException(e.getMessage(), | ||
| e.getSQLState(), e.getErrorCode())); | ||
| } finally { | ||
| driverCleanup.destroy(); | ||
| } | ||
|
|
@@ -212,6 +222,16 @@ protected SchemaReader getSchemaReader() { | |
| return new CommonSchemaReader(); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the ErrorDetailsProvider class name. | ||
| * Override this method to provide a custom ErrorDetailsProvider class name. | ||
| * | ||
| * @return ErrorDetailsProvider class name | ||
| */ | ||
| protected String getErrorDetailsProviderClassName() { | ||
| return DBErrorDetailsProvider.class.getName(); | ||
| } | ||
|
|
||
| private DriverCleanup loadPluginClassAndGetDriver(Class<? extends Driver> driverClass) | ||
| throws IllegalAccessException, InstantiationException, SQLException { | ||
|
|
||
|
|
@@ -268,6 +288,8 @@ public void prepareRun(BatchSourceContext context) throws Exception { | |
| lineageRecorder.recordRead("Read", "Read from database plugin", | ||
| schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList())); | ||
| } | ||
| // set error details provider | ||
| context.setErrorDetailsProvider(new ErrorDetailsProviderSpec(getErrorDetailsProviderClassName())); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similarly here:
Otherwise error details provider information is not propagated to platform.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved ! |
||
| context.setInput(Input.of(sourceConfig.getReferenceName(), new SourceInputFormatProvider( | ||
| DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration()))); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Copyright © 2024 Cask Data, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
| * use this file except in compliance with the License. You may obtain a copy of | ||
| * the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations under | ||
| * the License. | ||
| */ | ||
|
|
||
| package io.cdap.plugin.mysql; | ||
|
|
||
| import io.cdap.cdap.api.exception.ErrorType; | ||
| import io.cdap.plugin.db.DBErrorDetailsProvider; | ||
|
|
||
| /** | ||
| * A custom ErrorDetailsProvider for MySQL plugins. | ||
| */ | ||
| public class MysqlErrorDetailsProvider extends DBErrorDetailsProvider { | ||
|
|
||
| @Override | ||
| protected String getExternalDocumentationLink() { | ||
| return "https://dev.mysql.com/doc/mysql-errors/9.0/en/"; | ||
| } | ||
|
|
||
| @Override | ||
| protected ErrorType getErrorTypeFromErrorCode(int errorCode) { | ||
| // https://dev.mysql.com/doc/refman/9.0/en/error-message-elements.html#error-code-ranges | ||
| if (errorCode >= 1000 && errorCode <= 5999) { | ||
| return ErrorType.USER; | ||
| } else if (errorCode >= 10000 && errorCode <= 51999) { | ||
| // SYSTEM errors: Enterprise and user-defined custom error messages | ||
| return ErrorType.SYSTEM; | ||
| } else { | ||
| // UNKNOWN errors: Anything outside defined range | ||
| return ErrorType.UNKNOWN; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this method used in this class?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how it is used in
google-cloudplugin:https://github.com/data-integrations/google-cloud/blob/92dc8b77e391935a28e3b3013cb51a11b85efbab/src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java#L86-L93
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ! This makes sense, I though this will be called by the platform and display on UI.
I have added the doc link in error details now !