1717package io .cdap .plugin .db .action ;
1818
1919import dev .failsafe .Failsafe ;
20+ import dev .failsafe .RetryPolicy ;
2021import io .cdap .cdap .etl .api .FailureCollector ;
2122import io .cdap .cdap .etl .api .PipelineConfigurer ;
2223import io .cdap .cdap .etl .api .StageConfigurer ;
@@ -43,9 +44,12 @@ public class AbstractDBArgumentSetter extends Action {
4344
4445 private static final String JDBC_PLUGIN_ID = "driver" ;
4546 private final ArgumentSetterConfig config ;
47+ private final RetryPolicy <Object > retryPolicy ;
4648
4749 public AbstractDBArgumentSetter (ArgumentSetterConfig config ) {
4850 this .config = config ;
51+ this .retryPolicy = RetryPolicyUtil .getRetryPolicy (config .getInitialRetryDuration (), config .getMaxRetryDuration (),
52+ config .getMaxRetryCount ());
4953 }
5054
5155 @ Override
@@ -94,10 +98,11 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer)
9498 */
9599 private void processArguments (Class <? extends Driver > driverClass ,
96100 FailureCollector failureCollector , SettableArguments settableArguments )
97- throws SQLException , IllegalAccessException , InstantiationException {
101+ throws SQLException , IllegalAccessException , InstantiationException {
98102 DriverCleanup driverCleanup ;
103+
99104 driverCleanup = DBUtils .ensureJDBCDriverIsAvailable (driverClass , config .getConnectionString (),
100- config .getJdbcPluginName ());
105+ config .getJdbcPluginName ());
101106 Properties connectionProperties = new Properties ();
102107 connectionProperties .putAll (config .getConnectionArguments ());
103108 try {
@@ -108,46 +113,51 @@ private void processArguments(Class<? extends Driver> driverClass,
108113 }
109114
110115 private void executeWithRetry (FailureCollector failureCollector , SettableArguments settableArguments ,
111- Properties connectionProperties ) {
112- Failsafe .with (RetryPolicyUtil .createConnectionRetryPolicy (config .getInitialRetryDuration (),
113- config .getMaxRetryDuration (), config .getMaxRetryCount ())).run (() -> {
114- try (Connection connection = DriverManager
115- .getConnection (config .getConnectionString (), connectionProperties )) {
116- ResultSet resultSet ;
117- try (Statement statement = connection .createStatement ()) {
118- resultSet = statement .executeQuery (config .getQuery ());
119- }
120- boolean hasRecord = resultSet .next ();
121- if (!hasRecord ) {
122- failureCollector .addFailure ("No record found." ,
123- "The argument selection conditions must match only one record." );
124- return ;
125- }
126- if (settableArguments != null ) {
127- setArguments (resultSet , settableArguments );
128- }
129- if (resultSet .next ()) {
130- failureCollector
131- .addFailure ("More than one records found." ,
132- "The argument selection conditions must match only one record." );
133- }
116+ Properties connectionProperties ) throws SQLException {
117+ try (Connection connection = createConnectionWithRetry (connectionProperties )) {
118+ ResultSet resultSet ;
119+ try (Statement statement = createStatementWithRetry (connection )) {
120+ resultSet = Failsafe .with (retryPolicy ).get (() -> statement .executeQuery (config .getQuery ()));
121+ }
122+ boolean hasRecord = resultSet .next ();
123+ if (!hasRecord ) {
124+ failureCollector .addFailure ("No record found." ,
125+ "The argument selection conditions must match only one record." );
126+ return ;
127+ }
128+ if (settableArguments != null ) {
129+ setArguments (resultSet , failureCollector , settableArguments );
134130 }
135- });
131+ if (resultSet .next ()) {
132+ failureCollector
133+ .addFailure ("More than one records found." ,
134+ "The argument selection conditions must match only one record." );
135+ }
136+ }
137+ }
138+
139+ private Connection createConnectionWithRetry (Properties connectionProperties ) {
140+ return Failsafe .with (retryPolicy ).<Connection >get (() -> DriverManager
141+ .getConnection (config .getConnectionString (), connectionProperties ));
142+ }
143+
144+ private Statement createStatementWithRetry (Connection connection ) {
145+ return Failsafe .with (retryPolicy ).<Statement >get (() -> connection .createStatement ());
136146 }
137147
138148 /**
139149 * Converts column from jdbc results set into pipeline arguments
140150 *
141151 * @param resultSet - result set from db {@link ResultSet}
152+ * @param failureCollector - context failure collector @{link FailureCollector}
142153 * @param arguments - context argument setter {@link SettableArguments}
154+ * @throws SQLException - raises {@link SQLException} when configuration is not valid
143155 */
144- private void setArguments (ResultSet resultSet , SettableArguments arguments ) {
145- Failsafe .with (RetryPolicyUtil .createConnectionRetryPolicy (config .getInitialRetryDuration (),
146- config .getMaxRetryDuration (), config .getMaxRetryCount ())).run (() -> {
147- String [] columns = config .getArgumentsColumns ().split ("," );
148- for (String column : columns ) {
149- arguments .set (column , resultSet .getString (column ));
150- }
151- });
156+ private void setArguments (ResultSet resultSet , FailureCollector failureCollector ,
157+ SettableArguments arguments ) throws SQLException {
158+ String [] columns = config .getArgumentsColumns ().split ("," );
159+ for (String column : columns ) {
160+ arguments .set (column , resultSet .getString (column ));
161+ }
152162 }
153163}
0 commit comments