@@ -76,7 +76,7 @@ public class JDBCExtract extends ExtractToOne {
7676
7777 private JdbcIO .DataSourceConfiguration datasourceConfig ;
7878 private long numBatches ;
79- private ComboPooledDataSource ds ;
79+ private ComboPooledDataSource initializationDS ;
8080 private String [] orderByCols ;
8181 private String viewName ;
8282 private String orderedQuery ;
@@ -107,7 +107,14 @@ public class JDBCExtract extends ExtractToOne {
107107 */
108108 public void init () throws ComponentInitializationException {
109109 try {
110- this .ds = new ComboPooledDataSource ();
110+ this .initializationDS = new ComboPooledDataSource ();
111+ initializationDS .setAcquireRetryAttempts (1 );
112+ initializationDS .setDriverClass (driver );
113+ initializationDS .setJdbcUrl (url );
114+ initializationDS .setUser (user );
115+ initializationDS .setPassword (password );
116+ initializationDS .setMaxIdleTime (this .idleTimeout );
117+ ComboPooledDataSource ds = new ComboPooledDataSource ();
111118 ds .setDriverClass (driver );
112119 ds .setJdbcUrl (url );
113120 ds .setUser (user );
@@ -124,7 +131,7 @@ public void init() throws ComponentInitializationException {
124131 // Find appropriate columns to order by so that pagination results are consistent
125132 this .orderByCols = findPaginationOrderingColumns (this .query );
126133 // Get record count so that we know how many batches are going to be needed
127- try (Connection conn = ds .getConnection ()) {
134+ try (Connection conn = initializationDS .getConnection ()) {
128135 ResultSet rs = conn .createStatement ().executeQuery (countQuery );
129136 rs .next ();
130137 int resultCount = rs .getInt (1 );
@@ -165,7 +172,7 @@ public List<String> getOutputTags() {
165172 @ Override
166173 public Schema calculateOutputSchema () {
167174 Schema schema ;
168- try (Connection conn = ds .getConnection ();
175+ try (Connection conn = initializationDS .getConnection ();
169176 PreparedStatement ps = conn .prepareStatement (this .query , ResultSet .TYPE_FORWARD_ONLY , ResultSet .CONCUR_READ_ONLY )) {
170177 schema = SchemaUtilProxy .toBeamSchema (driver , ps .getMetaData ());
171178 } catch (SQLException throwables ) {
@@ -198,7 +205,7 @@ public PCollection<Row> begin(PBegin input) {
198205 }
199206
200207 private String [] findPaginationOrderingColumns (String query ) throws ComponentInitializationException {
201- try (Connection conn = this .ds .getConnection ()) {
208+ try (Connection conn = this .initializationDS .getConnection ()) {
202209 ResultSetMetaData queryMeta = conn .prepareStatement (query ).getMetaData ();
203210 Map <String , Integer > colNameToIndex = new HashMap <>();
204211 for (int i = 0 ; i < queryMeta .getColumnCount (); i ++) {
0 commit comments