Skip to content

Commit ff0cd0b

Browse files
committed
PLUGIN-1823: Rework
Move retry logic into a separate class: RetryUtils and add exception handling
1 parent ad6b3fe commit ff0cd0b

File tree

9 files changed

+238
-163
lines changed

9 files changed

+238
-163
lines changed

database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ public Integer getMaxRetryCount() {
109109
return maxRetryCount == null ? DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
110110
}
111111

112-
113112
public ConnectionConfig() {
114113
}
115114

database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.cdap.plugin.db.action;
1818

19-
import dev.failsafe.Failsafe;
2019
import dev.failsafe.RetryPolicy;
2120
import io.cdap.cdap.etl.api.FailureCollector;
2221
import io.cdap.cdap.etl.api.PipelineConfigurer;
@@ -28,10 +27,10 @@
2827
import io.cdap.plugin.util.DBUtils;
2928
import io.cdap.plugin.util.DriverCleanup;
3029
import io.cdap.plugin.util.RetryPolicyUtil;
30+
import io.cdap.plugin.util.RetryUtils;
3131

3232
import java.sql.Connection;
3333
import java.sql.Driver;
34-
import java.sql.DriverManager;
3534
import java.sql.ResultSet;
3635
import java.sql.SQLException;
3736
import java.sql.Statement;
@@ -44,7 +43,7 @@ public class AbstractDBArgumentSetter extends Action {
4443

4544
private static final String JDBC_PLUGIN_ID = "driver";
4645
private final ArgumentSetterConfig config;
47-
private final RetryPolicy<Object> retryPolicy;
46+
private final RetryPolicy<?> retryPolicy;
4847

4948
public AbstractDBArgumentSetter(ArgumentSetterConfig config) {
5049
this.config = config;
@@ -114,10 +113,13 @@ private void processArguments(Class<? extends Driver> driverClass,
114113

115114
private void executeWithRetry(FailureCollector failureCollector, SettableArguments settableArguments,
116115
Properties connectionProperties) throws SQLException {
117-
try (Connection connection = createConnectionWithRetry(connectionProperties)) {
116+
try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy<Connection>) retryPolicy,
117+
config.getConnectionString(), connectionProperties, null)) {
118118
ResultSet resultSet;
119-
try (Statement statement = createStatementWithRetry(connection)) {
120-
resultSet = Failsafe.with(retryPolicy).get(() -> statement.executeQuery(config.getQuery()));
119+
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
120+
null)) {
121+
resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, statement,
122+
config.getQuery(), null);
121123
}
122124
boolean hasRecord = resultSet.next();
123125
if (!hasRecord) {
@@ -136,15 +138,6 @@ private void executeWithRetry(FailureCollector failureCollector, SettableArgumen
136138
}
137139
}
138140

139-
private Connection createConnectionWithRetry(Properties connectionProperties) {
140-
return Failsafe.with(retryPolicy).<Connection>get(() -> DriverManager
141-
.getConnection(config.getConnectionString(), connectionProperties));
142-
}
143-
144-
private Statement createStatementWithRetry(Connection connection) {
145-
return Failsafe.with(retryPolicy).<Statement>get(() -> connection.createStatement());
146-
}
147-
148141
/**
149142
* Converts column from jdbc results set into pipeline arguments
150143
*

database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.cdap.plugin.util.DBUtils;
2222
import io.cdap.plugin.util.DriverCleanup;
2323
import io.cdap.plugin.util.RetryPolicyUtil;
24+
import io.cdap.plugin.util.RetryUtils;
2425

2526
import java.sql.Connection;
2627
import java.sql.Driver;
@@ -37,7 +38,7 @@ public class DBRun {
3738
private final QueryConfig config;
3839
private final Class<? extends Driver> driverClass;
3940
private boolean enableAutoCommit;
40-
private final RetryPolicy<Object> retryPolicy;
41+
private final RetryPolicy<?> retryPolicy;
4142

4243
public DBRun(QueryConfig config, Class<? extends Driver> driverClass, Boolean enableAutocommit) {
4344
this.config = config;
@@ -61,14 +62,15 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
6162

6263
Properties connectionProperties = new Properties();
6364
connectionProperties.putAll(config.getConnectionArguments());
64-
try (Connection connection = createConnectionWithRetry(connectionProperties)) {
65+
try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy<Connection>) retryPolicy,
66+
config.getConnectionString(), connectionProperties, null)) {
6567
executeInitQueries(connection, config.getInitQueries());
6668
if (!enableAutoCommit) {
6769
connection.setAutoCommit(false);
6870
}
69-
try (Statement statement = createStatementWithRetry(connection)) {
70-
Failsafe.with(retryPolicy).run(() -> statement.execute(config.query));
71-
71+
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
72+
null)) {
73+
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, config.query, null);
7274
if (!enableAutoCommit) {
7375
connection.commit();
7476
}
@@ -83,18 +85,10 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
8385

8486
private void executeInitQueries(Connection connection, List<String> initQueries) throws SQLException {
8587
for (String query : initQueries) {
86-
try (Statement statement = createStatementWithRetry(connection)) {
87-
Failsafe.with(retryPolicy).run(() -> statement.execute(query));
88+
try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection,
89+
null)) {
90+
RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, null);
8891
}
8992
}
9093
}
91-
92-
private Connection createConnectionWithRetry(Properties connectionProperties) {
93-
return Failsafe.with(retryPolicy).<Connection>get(() -> DriverManager
94-
.getConnection(config.getConnectionString(), connectionProperties));
95-
}
96-
97-
private Statement createStatementWithRetry(Connection connection) {
98-
return Failsafe.with(retryPolicy).<Statement>get(() -> connection.createStatement());
99-
}
10094
}

database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package io.cdap.plugin.db.connector;
1818

1919
import com.google.common.collect.Maps;
20-
import dev.failsafe.Failsafe;
20+
import dev.failsafe.RetryPolicy;
2121
import io.cdap.cdap.api.data.batch.InputFormatProvider;
2222
import io.cdap.cdap.api.data.schema.Schema;
2323
import io.cdap.cdap.etl.api.batch.BatchConnector;
@@ -35,6 +35,7 @@
3535
import io.cdap.plugin.db.SchemaReader;
3636
import io.cdap.plugin.db.source.DataDrivenETLDBInputFormat;
3737
import io.cdap.plugin.util.RetryPolicyUtil;
38+
import io.cdap.plugin.util.RetryUtils;
3839
import org.apache.hadoop.io.LongWritable;
3940
import org.apache.hadoop.mapreduce.MRJobConfig;
4041
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -58,10 +59,13 @@ public abstract class AbstractDBSpecificConnector<T extends DBWritable> extends
5859
implements BatchConnector<LongWritable, T> {
5960

6061
private final AbstractDBConnectorConfig config;
62+
private final RetryPolicy<?> retryPolicy;
6163

6264
protected AbstractDBSpecificConnector(AbstractDBConnectorConfig config) {
6365
super(config);
6466
this.config = config;
67+
this.retryPolicy = RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(), config.getMaxRetryDuration(),
68+
config.getMaxRetryCount());
6569
}
6670

6771
public abstract boolean supportSchema();
@@ -174,14 +178,13 @@ protected String getStratifiedQuery(String tableName, int limit, String strata,
174178

175179
protected Schema loadTableSchema(Connection connection, String query, @Nullable Integer timeoutSec, String sessionID)
176180
throws SQLException {
177-
Statement statement = connection.createStatement();
181+
Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy<Statement>) retryPolicy, connection, null);
178182
statement.setMaxRows(1);
179183
if (timeoutSec != null) {
180184
statement.setQueryTimeout(timeoutSec);
181185
}
182-
ResultSet resultSet = Failsafe.with(RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(),
183-
config.getMaxRetryDuration(), config.getMaxRetryCount()))
184-
.get(() -> statement.executeQuery(query));
186+
ResultSet resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, statement, query,
187+
null);
185188
return Schema.recordOf("outputSchema", getSchemaReader(sessionID).getSchemaFields(resultSet));
186189
}
187190

0 commit comments

Comments
 (0)