Skip to content

Commit 1eba47c

Browse files
committed
PLUGIN-1823: Rework
1 parent 852bcba commit 1eba47c

File tree

26 files changed

+206
-240
lines changed

26 files changed

+206
-240
lines changed

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,9 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
108108
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
109109
properties.put(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.TABLE_NAME, table);
110110
}
111+
112+
@Override
113+
protected CloudSQLMySQLErrorDetailsProvider getErrorDetailsProvider() {
114+
return new CloudSQLMySQLErrorDetailsProvider();
115+
}
111116
}

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ protected String getErrorDetailsProviderClassName() {
109109
return CloudSQLMySQLErrorDetailsProvider.class.getName();
110110
}
111111

112+
@Override
113+
protected CloudSQLMySQLErrorDetailsProvider getErrorDetailsProvider() {
114+
return new CloudSQLMySQLErrorDetailsProvider();
115+
}
116+
112117
@Override
113118
protected String getExternalDocumentationLink() {
114119
return DBUtils.CLOUDSQLMYSQL_SUPPORTED_DOC_URL;

cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ protected String getErrorDetailsProviderClassName() {
138138
return CloudSQLMySQLErrorDetailsProvider.class.getName();
139139
}
140140

141+
@Override
142+
protected CloudSQLMySQLErrorDetailsProvider getErrorDetailsProvider() {
143+
return new CloudSQLMySQLErrorDetailsProvider();
144+
}
145+
141146
/** CloudSQL MySQL source config. */
142147
public static class CloudSQLMySQLSourceConfig extends AbstractDBSpecificSourceConfig {
143148

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,9 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
118118
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
119119
sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
120120
}
121+
122+
@Override
123+
protected CloudSQLPostgreSQLErrorDetailsProvider getErrorDetailsProvider() {
124+
return new CloudSQLPostgreSQLErrorDetailsProvider();
125+
}
121126
}

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ protected String getErrorDetailsProviderClassName() {
153153
return CloudSQLPostgreSQLErrorDetailsProvider.class.getName();
154154
}
155155

156+
@Override
157+
protected CloudSQLPostgreSQLErrorDetailsProvider getErrorDetailsProvider() {
158+
return new CloudSQLPostgreSQLErrorDetailsProvider();
159+
}
160+
156161
@Override
157162
protected String getExternalDocumentationLink() {
158163
return DBUtils.CLOUDSQLPOSTGRES_SUPPORTED_DOC_URL;

cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ protected String getErrorDetailsProviderClassName() {
9797
return CloudSQLPostgreSQLErrorDetailsProvider.class.getName();
9898
}
9999

100+
@Override
101+
protected CloudSQLPostgreSQLErrorDetailsProvider getErrorDetailsProvider() {
102+
return new CloudSQLPostgreSQLErrorDetailsProvider();
103+
}
104+
100105
@Override
101106
protected String createConnectionString() {
102107
if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase(

database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.etl.api.action.Action;
2424
import io.cdap.cdap.etl.api.action.ActionContext;
2525
import io.cdap.cdap.etl.api.action.SettableArguments;
26+
import io.cdap.plugin.common.db.DBErrorDetailsProvider;
2627
import io.cdap.plugin.db.ConnectionConfig;
2728
import io.cdap.plugin.util.DBUtils;
2829
import io.cdap.plugin.util.DriverCleanup;
@@ -114,12 +115,12 @@ private void processArguments(Class<? extends Driver> driverClass,
114115
private void executeWithRetry(FailureCollector failureCollector, SettableArguments settableArguments,
115116
Properties connectionProperties) throws SQLException {
116117
try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy<Connection>) retryPolicy,
117-
config.getConnectionString(), connectionProperties, getExternalDocumentationLink())) {
118+
config.getConnectionString(), connectionProperties, getErrorDetailsProvider())) {
118119
ResultSet resultSet;
119120
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
120-
getExternalDocumentationLink())) {
121+
getErrorDetailsProvider())) {
121122
resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, statement,
122-
config.getQuery(), null);
123+
config.getQuery(), getErrorDetailsProvider());
123124
}
124125
boolean hasRecord = resultSet.next();
125126
if (!hasRecord) {
@@ -163,4 +164,14 @@ private void setArguments(ResultSet resultSet, FailureCollector failureCollector
163164
protected String getExternalDocumentationLink() {
164165
return "https://en.wikipedia.org/wiki/SQLSTATE";
165166
}
167+
168+
/**
169+
* Returns the DBErrorDetailsProvider instance.
170+
* Override this method to provide a custom DBErrorDetailsProvider instance.
171+
*
172+
* @return DBErrorDetailsProvider instance
173+
*/
174+
protected DBErrorDetailsProvider getErrorDetailsProvider() {
175+
return new DBErrorDetailsProvider();
176+
}
166177
}

database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@
1616

1717
package io.cdap.plugin.db.action;
1818

19-
import dev.failsafe.Failsafe;
2019
import dev.failsafe.RetryPolicy;
20+
import io.cdap.plugin.common.db.DBErrorDetailsProvider;
2121
import io.cdap.plugin.util.DBUtils;
2222
import io.cdap.plugin.util.DriverCleanup;
2323
import io.cdap.plugin.util.RetryPolicyUtil;
2424
import io.cdap.plugin.util.RetryUtils;
2525

2626
import java.sql.Connection;
2727
import java.sql.Driver;
28-
import java.sql.DriverManager;
2928
import java.sql.SQLException;
3029
import java.sql.Statement;
3130
import java.util.List;
@@ -50,6 +49,15 @@ public DBRun(QueryConfig config, Class<? extends Driver> driverClass, Boolean en
5049
config.getMaxRetryCount());
5150
}
5251

52+
/**
53+
* Returns the DBErrorDetailsProvider instance.
54+
*
55+
* @return DBErrorDetailsProvider instance
56+
*/
57+
protected DBErrorDetailsProvider getErrorDetailsProvider() {
58+
return new DBErrorDetailsProvider();
59+
}
60+
5361
/**
5462
* Uses a configured JDBC driver to execute a SQL statement. The configurations of which JDBC driver
5563
* to use and which connection string to use come from the plugin configuration.
@@ -63,14 +71,14 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
6371
Properties connectionProperties = new Properties();
6472
connectionProperties.putAll(config.getConnectionArguments());
6573
try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy<Connection>) retryPolicy,
66-
config.getConnectionString(), connectionProperties, null)) {
74+
config.getConnectionString(), connectionProperties, getErrorDetailsProvider())) {
6775
executeInitQueries(connection, config.getInitQueries());
6876
if (!enableAutoCommit) {
6977
connection.setAutoCommit(false);
7078
}
7179
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
72-
null)) {
73-
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, config.query, null);
80+
getErrorDetailsProvider())) {
81+
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, config.query, getErrorDetailsProvider());
7482
if (!enableAutoCommit) {
7583
connection.commit();
7684
}
@@ -86,8 +94,8 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
8694
private void executeInitQueries(Connection connection, List<String> initQueries) throws SQLException {
8795
for (String query : initQueries) {
8896
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
89-
null)) {
90-
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, null);
97+
getErrorDetailsProvider())) {
98+
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getErrorDetailsProvider());
9199
}
92100
}
93101
}

database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.cdap.plugin.common.SourceInputFormatProvider;
3030
import io.cdap.plugin.common.db.AbstractDBConnector;
3131
import io.cdap.plugin.common.db.DBConnectorPath;
32+
import io.cdap.plugin.common.db.DBErrorDetailsProvider;
3233
import io.cdap.plugin.common.util.ExceptionUtils;
3334
import io.cdap.plugin.db.CommonSchemaReader;
3435
import io.cdap.plugin.db.ConnectionConfigAccessor;
@@ -122,6 +123,16 @@ public InputFormatProvider getInputFormatProvider(ConnectorContext context, Samp
122123
return new SourceInputFormatProvider(DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration());
123124
}
124125

126+
/**
127+
* Returns the DBErrorDetailsProvider instance.
128+
* Override this method to provide a custom DBErrorDetailsProvider instance.
129+
*
130+
* @return DBErrorDetailsProvider instance
131+
*/
132+
protected DBErrorDetailsProvider getErrorDetailsProvider() {
133+
return new DBErrorDetailsProvider();
134+
}
135+
125136
protected Connection getConnection(DBConnectorPath path) {
126137
return getConnection(getConnectionString(path.getDatabase()), config.getConnectionArgumentsProperties());
127138
}
@@ -178,14 +189,16 @@ protected String getStratifiedQuery(String tableName, int limit, String strata,
178189

179190
protected Schema loadTableSchema(Connection connection, String query, @Nullable Integer timeoutSec, String sessionID)
180191
throws SQLException {
181-
Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection, null);
182-
statement.setMaxRows(1);
183-
if (timeoutSec != null) {
184-
statement.setQueryTimeout(timeoutSec);
185-
}
186-
ResultSet resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, statement, query,
187-
null);
188-
return Schema.recordOf("outputSchema", getSchemaReader(sessionID).getSchemaFields(resultSet));
192+
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
193+
getErrorDetailsProvider())) {
194+
statement.setMaxRows(1);
195+
if (timeoutSec != null) {
196+
statement.setQueryTimeout(timeoutSec);
197+
}
198+
ResultSet resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, statement, query,
199+
getErrorDetailsProvider());
200+
return Schema.recordOf("outputSchema", getSchemaReader(sessionID).getSchemaFields(resultSet));
201+
}
189202
}
190203

191204
protected void setConnectionProperties(Map<String, String> properties, ConnectorSpecRequest request) {

database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.cdap.plugin.common.ReferenceBatchSink;
4040
import io.cdap.plugin.common.ReferencePluginConfig;
4141
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
42+
import io.cdap.plugin.common.db.DBErrorDetailsProvider;
4243
import io.cdap.plugin.db.ColumnType;
4344
import io.cdap.plugin.db.CommonSchemaReader;
4445
import io.cdap.plugin.db.ConnectionConfig;
@@ -180,6 +181,16 @@ protected String getErrorDetailsProviderClassName() {
180181
return null;
181182
}
182183

184+
/**
185+
* Returns the DBErrorDetailsProvider instance.
186+
* Override this method to provide a custom DBErrorDetailsProvider instance.
187+
*
188+
* @return DBErrorDetailsProvider instance
189+
*/
190+
protected DBErrorDetailsProvider getErrorDetailsProvider() {
191+
return new DBErrorDetailsProvider();
192+
}
193+
183194
/**
184195
* Returns the external documentation link.
185196
* Override this method to provide a custom external documentation link.
@@ -305,13 +316,12 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
305316
Properties connectionProperties = new Properties();
306317
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
307318
try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy<Connection>) retryPolicy,
308-
dbSinkConfig.getConnectionString(), connectionProperties, getExternalDocumentationLink())) {
319+
dbSinkConfig.getConnectionString(), connectionProperties, getErrorDetailsProvider())) {
309320
executeInitQueries(connection, dbSinkConfig.getInitQueries());
310321
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy,
311-
connection, getExternalDocumentationLink());
322+
connection, getErrorDetailsProvider());
312323
ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, statement,
313-
String.format("SELECT * FROM %s WHERE 1 = 0", fullyQualifiedTableName),
314-
getExternalDocumentationLink())) {
324+
String.format("SELECT * FROM %s WHERE 1 = 0", fullyQualifiedTableName), getErrorDetailsProvider())) {
315325
inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
316326
}
317327
}
@@ -355,15 +365,15 @@ private void setResultSetMetadata() throws SQLException, IllegalAccessException,
355365
Properties connectionProperties = new Properties();
356366
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
357367
try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy<Connection>) retryPolicy,
358-
connectionString, connectionProperties, getExternalDocumentationLink())) {
368+
connectionString, connectionProperties, getErrorDetailsProvider())) {
359369
executeInitQueries(connection, dbSinkConfig.getInitQueries());
360370
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
361-
getExternalDocumentationLink());
371+
getErrorDetailsProvider());
362372
// Run a query against the DB table that returns 0 records, but returns valid ResultSetMetadata
363373
// that can be used to construct DBRecord objects to sink to the database table.
364374
ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, statement,
365375
String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, fullyQualifiedTableName),
366-
getExternalDocumentationLink())) {
376+
getErrorDetailsProvider())) {
367377
columnTypes.addAll(getMatchedColumnTypeList(rs, columns));
368378
}
369379
}
@@ -426,7 +436,7 @@ private void validateSchema(FailureCollector collector, Class<? extends Driver>
426436
Properties connectionProperties = new Properties();
427437
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
428438
try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy<Connection>) retryPolicy,
429-
connectionString, connectionProperties, getExternalDocumentationLink())) {
439+
connectionString, connectionProperties, getErrorDetailsProvider())) {
430440
executeInitQueries(connection, dbSinkConfig.getInitQueries());
431441
try (ResultSet tables = connection.getMetaData().getTables(null, dbSchemaName, tableName, null)) {
432442
if (!tables.next()) {
@@ -440,9 +450,9 @@ connectionString, connectionProperties, getExternalDocumentationLink())) {
440450
setColumnsInfo(inputSchema.getFields());
441451
try (PreparedStatement pStmt = RetryUtils.prepareStatementWithRetry((RetryPolicy<PreparedStatement>) retryPolicy,
442452
connection, String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, fullyQualifiedTableName),
443-
getExternalDocumentationLink());
453+
getErrorDetailsProvider());
444454
ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, pStmt,
445-
getExternalDocumentationLink())) {
455+
getErrorDetailsProvider())) {
446456
getFieldsValidator().validateFields(inputSchema, rs, collector);
447457
}
448458
} catch (SQLException e) {
@@ -475,8 +485,8 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
475485
private void executeInitQueries(Connection connection, List<String> initQueries) throws SQLException {
476486
for (String query : initQueries) {
477487
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
478-
getExternalDocumentationLink())) {
479-
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getExternalDocumentationLink());
488+
getErrorDetailsProvider())) {
489+
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getErrorDetailsProvider());
480490
}
481491
}
482492
}

0 commit comments

Comments
 (0)