|
39 | 39 | import io.cdap.plugin.common.ReferenceBatchSink; |
40 | 40 | import io.cdap.plugin.common.ReferencePluginConfig; |
41 | 41 | import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; |
42 | | -import io.cdap.plugin.common.util.ExceptionUtils; |
43 | 42 | import io.cdap.plugin.db.ColumnType; |
44 | 43 | import io.cdap.plugin.db.CommonSchemaReader; |
45 | 44 | import io.cdap.plugin.db.ConnectionConfig; |
|
59 | 58 | import org.slf4j.Logger; |
60 | 59 | import org.slf4j.LoggerFactory; |
61 | 60 |
|
62 | | -import java.io.IOException; |
63 | 61 | import java.sql.Connection; |
64 | 62 | import java.sql.Driver; |
65 | 63 | import java.sql.PreparedStatement; |
@@ -129,17 +127,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { |
129 | 127 | Class<? extends Driver> driverClass = DBUtils.getDriverClass( |
130 | 128 | pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE); |
131 | 129 | if (driverClass != null && dbSinkConfig.canConnect()) { |
132 | | - try { |
133 | 130 | validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema, |
134 | 131 | dbSinkConfig.getDBSchemaName()); |
135 | | - } catch (SQLException e) { |
136 | | - String details = String.format("SQL error while validating the schema: Error: %s, SQLState: %s, ErrorCode: %s", |
137 | | - e.getMessage(), e.getSQLState(), e.getErrorCode()); |
138 | | - collector.addFailure(details, |
139 | | - "Ensure the table exists and the connection string points to a valid database.") |
140 | | - .withConfigProperty(DBSinkConfig.TABLE_NAME) |
141 | | - .withStacktrace(e.getStackTrace()); |
142 | | - } |
143 | 132 | } |
144 | 133 | } |
145 | 134 | public void validateOperations(FailureCollector collector, T dbSinkConfig, @Nullable Schema inputSchema) { |
@@ -229,9 +218,6 @@ public void prepareRun(BatchSinkContext context) { |
229 | 218 | } else { |
230 | 219 | outputSchema = inferSchema(driverClass); |
231 | 220 | } |
232 | | - } catch (SQLException e) { |
233 | | - throw new RuntimeException(String.format("Unable to validate schema due to: %s.", |
234 | | - ExceptionUtils.getRootCauseMessage(e)), e); |
235 | 221 | } finally { |
236 | 222 | DBUtils.cleanup(driverClass); |
237 | 223 | } |
@@ -423,7 +409,7 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSet resultSet, List<Strin |
423 | 409 | } |
424 | 410 |
|
425 | 411 | private void validateSchema(FailureCollector collector, Class<? extends Driver> jdbcDriverClass, String tableName, |
426 | | - Schema inputSchema, String dbSchemaName) throws SQLException { |
| 412 | + Schema inputSchema, String dbSchemaName) { |
427 | 413 | String connectionString = dbSinkConfig.getConnectionString(); |
428 | 414 | String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName() |
429 | 415 | : dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName(); |
@@ -459,6 +445,13 @@ connectionString, connectionProperties, getExternalDocumentationLink())) { |
459 | 445 | getExternalDocumentationLink())) { |
460 | 446 | getFieldsValidator().validateFields(inputSchema, rs, collector); |
461 | 447 | } |
| 448 | + } catch (SQLException e) { |
| 449 | + LOG.error("Exception while trying to validate schema of database table {} for connection {}.", |
| 450 | + fullyQualifiedTableName, connectionString, e); |
| 451 | + collector.addFailure( |
| 452 | + String.format("Exception while trying to validate schema of database table '%s' for connection '%s' with %s", |
| 453 | + fullyQualifiedTableName, connectionString, e.getMessage()), |
| 454 | + null).withStacktrace(e.getStackTrace()); |
462 | 455 | } |
463 | 456 | } |
464 | 457 |
|
|
0 commit comments