Skip to content

Commit 1f4783a

Browse files
authored
added loginTimeout for the datasource connection (#2551)
* added loginTimeout for the datasource connection * spotless * fixed tests * added databaseLoginTimeout for IT
1 parent 920f067 commit 1f4783a

File tree

5 files changed

+327
-1
lines changed

5 files changed

+327
-1
lines changed

v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/io/CdcJdbcIO.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ public abstract static class DataSourceConfiguration implements Serializable {
148148
@Nullable
149149
abstract ValueProvider<Collection<String>> getConnectionInitSqls();
150150

151+
@Nullable
152+
abstract ValueProvider<Integer> getLoginTimeout();
153+
151154
@Nullable
152155
abstract DataSource getDataSource();
153156

@@ -169,6 +172,8 @@ abstract static class Builder {
169172

170173
abstract Builder setConnectionInitSqls(ValueProvider<Collection<String>> connectionInitSqls);
171174

175+
abstract Builder setLoginTimeout(ValueProvider<Integer> loginTimeout);
176+
172177
abstract Builder setDataSource(DataSource dataSource);
173178

174179
abstract DataSourceConfiguration build();
@@ -263,6 +268,27 @@ public DataSourceConfiguration withConnectionInitSqls(
263268
return builder().setConnectionInitSqls(connectionInitSqls).build();
264269
}
265270

271+
/**
272+
* Sets the login timeout for the DataSource.
273+
*
274+
* @param loginTimeout login timeout in seconds
275+
* @return updated DataSourceConfiguration
276+
*/
277+
public DataSourceConfiguration withLoginTimeout(Integer loginTimeout) {
278+
return withLoginTimeout(ValueProvider.StaticValueProvider.of(loginTimeout));
279+
}
280+
281+
/**
282+
* Sets the login timeout for the DataSource.
283+
*
284+
* @param loginTimeout login timeout in seconds as ValueProvider
285+
* @return updated DataSourceConfiguration
286+
*/
287+
public DataSourceConfiguration withLoginTimeout(ValueProvider<Integer> loginTimeout) {
288+
checkArgument(loginTimeout != null, "loginTimeout can not be null");
289+
return builder().setLoginTimeout(loginTimeout).build();
290+
}
291+
266292
void populateDisplayData(DisplayData.Builder builder) {
267293
if (getDataSource() != null) {
268294
builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName()));
@@ -299,6 +325,24 @@ && getConnectionInitSqls().get() != null
299325
if (getMaxIdleConnections() != null && getMaxIdleConnections().get() != null) {
300326
basicDataSource.setMaxIdle(getMaxIdleConnections().get().intValue());
301327
}
328+
if (getLoginTimeout() != null && getLoginTimeout().get() != null) {
329+
// BasicDataSource.setLoginTimeout() is not supported and throws
330+
// UnsupportedOperationException.
331+
// Instead, we append the loginTimeout as a connection property to the existing
332+
// properties.
333+
String existingProperties =
334+
getConnectionProperties() != null && getConnectionProperties().get() != null
335+
? getConnectionProperties().get()
336+
: "";
337+
String loginTimeoutProperty = "loginTimeout=" + getLoginTimeout().get().toString();
338+
String updatedProperties =
339+
existingProperties.isEmpty()
340+
? loginTimeoutProperty
341+
: existingProperties + ";" + loginTimeoutProperty;
342+
basicDataSource.setConnectionProperties(updatedProperties);
343+
} else if (getConnectionProperties() != null && getConnectionProperties().get() != null) {
344+
basicDataSource.setConnectionProperties(getConnectionProperties().get());
345+
}
302346

303347
return basicDataSource;
304348
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.datastream.io;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertNotNull;
20+
21+
import java.lang.reflect.Field;
22+
import java.sql.SQLException;
23+
import javax.sql.DataSource;
24+
import org.apache.beam.sdk.options.ValueProvider;
25+
import org.apache.commons.dbcp2.BasicDataSource;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.JUnit4;
29+
30+
/** Test cases for the {@link CdcJdbcIO} class. */
31+
@RunWith(JUnit4.class)
32+
public final class CdcJdbcIOTest {
33+
34+
private static final String DRIVER_CLASS_NAME = "com.mysql.cj.jdbc.Driver";
35+
private static final String URL = "jdbc:mysql://localhost:3306/test";
36+
private static final String USERNAME = "testuser";
37+
private static final String PASSWORD = "testpass";
38+
private static final Integer LOGIN_TIMEOUT = 30;
39+
40+
@Test
41+
public void testDataSourceConfiguration_withLoginTimeout_staticValue() {
42+
CdcJdbcIO.DataSourceConfiguration config =
43+
CdcJdbcIO.DataSourceConfiguration.create(DRIVER_CLASS_NAME, URL)
44+
.withUsername(USERNAME)
45+
.withPassword(PASSWORD)
46+
.withLoginTimeout(LOGIN_TIMEOUT);
47+
48+
assertNotNull(config.getLoginTimeout());
49+
assertEquals(LOGIN_TIMEOUT, config.getLoginTimeout().get());
50+
}
51+
52+
@Test
53+
public void testDataSourceConfiguration_withLoginTimeout_valueProvider() {
54+
ValueProvider<Integer> loginTimeoutProvider =
55+
ValueProvider.StaticValueProvider.of(LOGIN_TIMEOUT);
56+
57+
CdcJdbcIO.DataSourceConfiguration config =
58+
CdcJdbcIO.DataSourceConfiguration.create(DRIVER_CLASS_NAME, URL)
59+
.withUsername(USERNAME)
60+
.withPassword(PASSWORD)
61+
.withLoginTimeout(loginTimeoutProvider);
62+
63+
assertNotNull(config.getLoginTimeout());
64+
assertEquals(LOGIN_TIMEOUT, config.getLoginTimeout().get());
65+
}
66+
67+
@Test
68+
public void testDataSourceConfiguration_buildDatasource_withLoginTimeout() throws SQLException {
69+
CdcJdbcIO.DataSourceConfiguration config =
70+
CdcJdbcIO.DataSourceConfiguration.create(DRIVER_CLASS_NAME, URL)
71+
.withUsername(USERNAME)
72+
.withPassword(PASSWORD)
73+
.withLoginTimeout(LOGIN_TIMEOUT);
74+
75+
DataSource dataSource = config.buildDatasource();
76+
77+
assertNotNull(dataSource);
78+
assertEquals(BasicDataSource.class, dataSource.getClass());
79+
80+
BasicDataSource basicDataSource = (BasicDataSource) dataSource;
81+
// BasicDataSource.getLoginTimeout() throws UnsupportedOperationException,
82+
// so we verify that loginTimeout is included in connection properties instead
83+
try {
84+
Field connectionPropertiesField =
85+
BasicDataSource.class.getDeclaredField("connectionProperties");
86+
connectionPropertiesField.setAccessible(true);
87+
java.util.Properties connectionProperties =
88+
(java.util.Properties) connectionPropertiesField.get(basicDataSource);
89+
assertNotNull(connectionProperties);
90+
assertEquals(LOGIN_TIMEOUT.toString(), connectionProperties.getProperty("loginTimeout"));
91+
} catch (Exception e) {
92+
throw new RuntimeException("Failed to access connectionProperties field", e);
93+
}
94+
assertEquals(DRIVER_CLASS_NAME, basicDataSource.getDriverClassName());
95+
assertEquals(URL, basicDataSource.getUrl());
96+
assertEquals(USERNAME, basicDataSource.getUsername());
97+
assertEquals(PASSWORD, basicDataSource.getPassword());
98+
}
99+
100+
@Test
101+
public void testDataSourceConfiguration_buildDatasource_withoutLoginTimeout()
102+
throws SQLException {
103+
CdcJdbcIO.DataSourceConfiguration config =
104+
CdcJdbcIO.DataSourceConfiguration.create(DRIVER_CLASS_NAME, URL)
105+
.withUsername(USERNAME)
106+
.withPassword(PASSWORD);
107+
108+
DataSource dataSource = config.buildDatasource();
109+
110+
assertNotNull(dataSource);
111+
assertEquals(BasicDataSource.class, dataSource.getClass());
112+
113+
BasicDataSource basicDataSource = (BasicDataSource) dataSource;
114+
// When no loginTimeout is set, connection properties should be null or empty
115+
try {
116+
Field connectionPropertiesField =
117+
BasicDataSource.class.getDeclaredField("connectionProperties");
118+
connectionPropertiesField.setAccessible(true);
119+
java.util.Properties connectionProperties =
120+
(java.util.Properties) connectionPropertiesField.get(basicDataSource);
121+
// Connection properties should be null or empty when no loginTimeout and no connection
122+
// properties are set
123+
if (connectionProperties != null) {
124+
assertEquals(0, connectionProperties.size());
125+
}
126+
} catch (Exception e) {
127+
throw new RuntimeException("Failed to access connectionProperties field", e);
128+
}
129+
assertEquals(DRIVER_CLASS_NAME, basicDataSource.getDriverClassName());
130+
assertEquals(URL, basicDataSource.getUrl());
131+
assertEquals(USERNAME, basicDataSource.getUsername());
132+
assertEquals(PASSWORD, basicDataSource.getPassword());
133+
}
134+
135+
@Test(expected = IllegalArgumentException.class)
136+
public void testDataSourceConfiguration_withLoginTimeout_nullValue() {
137+
CdcJdbcIO.DataSourceConfiguration.create(DRIVER_CLASS_NAME, URL)
138+
.withLoginTimeout((ValueProvider<Integer>) null);
139+
}
140+
141+
@Test
142+
public void testDataSourceConfiguration_withLoginTimeout_nullValueProvider() throws SQLException {
143+
ValueProvider<Integer> nullProvider = ValueProvider.StaticValueProvider.of(null);
144+
145+
CdcJdbcIO.DataSourceConfiguration config =
146+
CdcJdbcIO.DataSourceConfiguration.create(DRIVER_CLASS_NAME, URL)
147+
.withLoginTimeout(nullProvider);
148+
149+
DataSource dataSource = config.buildDatasource();
150+
151+
assertNotNull(dataSource);
152+
assertEquals(BasicDataSource.class, dataSource.getClass());
153+
154+
BasicDataSource basicDataSource = (BasicDataSource) dataSource;
155+
// Note: BasicDataSource.getLoginTimeout() throws UnsupportedOperationException
156+
// so we cannot directly verify the login timeout value
157+
}
158+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.datastream.io;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertNotNull;
20+
21+
import java.lang.reflect.Field;
22+
import java.sql.SQLException;
23+
import javax.sql.DataSource;
24+
import org.apache.commons.dbcp2.BasicDataSource;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
import org.junit.runners.JUnit4;
28+
29+
/**
30+
* Integration test to verify loginTimeout functionality in CdcJdbcIO.DataSourceConfiguration. This
31+
* test validates that the loginTimeout parameter is properly set on the DataSource.
32+
*/
33+
@RunWith(JUnit4.class)
34+
public class LoginTimeoutIntegrationTest {
35+
36+
@Test
37+
public void testLoginTimeoutIsSetOnDataSource() throws SQLException {
38+
// Create a DataSourceConfiguration with loginTimeout
39+
CdcJdbcIO.DataSourceConfiguration config =
40+
CdcJdbcIO.DataSourceConfiguration.create(
41+
"org.h2.Driver", "jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1")
42+
.withUsername("sa")
43+
.withPassword("")
44+
.withLoginTimeout(45);
45+
46+
// Build the DataSource
47+
DataSource dataSource = config.buildDatasource();
48+
49+
// Verify that the DataSource is created and is a BasicDataSource
50+
assertNotNull("DataSource should not be null", dataSource);
51+
assertEquals(
52+
"DataSource should be a BasicDataSource", BasicDataSource.class, dataSource.getClass());
53+
54+
// Verify that the loginTimeout is set correctly in connection properties
55+
BasicDataSource basicDataSource = (BasicDataSource) dataSource;
56+
try {
57+
Field connectionPropertiesField =
58+
BasicDataSource.class.getDeclaredField("connectionProperties");
59+
connectionPropertiesField.setAccessible(true);
60+
java.util.Properties connectionProperties =
61+
(java.util.Properties) connectionPropertiesField.get(basicDataSource);
62+
assertNotNull("Connection properties should not be null", connectionProperties);
63+
assertEquals(
64+
"Login timeout should be set to 45 seconds in connection properties",
65+
"45",
66+
connectionProperties.getProperty("loginTimeout"));
67+
} catch (Exception e) {
68+
throw new RuntimeException("Failed to access connectionProperties field", e);
69+
}
70+
}
71+
72+
@Test
73+
public void testDefaultLoginTimeoutWhenNotSet() throws SQLException {
74+
// Create a DataSourceConfiguration without loginTimeout
75+
CdcJdbcIO.DataSourceConfiguration config =
76+
CdcJdbcIO.DataSourceConfiguration.create(
77+
"org.h2.Driver", "jdbc:h2:mem:testdb2;DB_CLOSE_DELAY=-1")
78+
.withUsername("sa")
79+
.withPassword("");
80+
81+
// Build the DataSource
82+
DataSource dataSource = config.buildDatasource();
83+
84+
// Verify that the DataSource is created
85+
assertNotNull("DataSource should not be null", dataSource);
86+
BasicDataSource basicDataSource = (BasicDataSource) dataSource;
87+
88+
// Verify that no loginTimeout is set in connection properties when not specified
89+
try {
90+
Field connectionPropertiesField =
91+
BasicDataSource.class.getDeclaredField("connectionProperties");
92+
connectionPropertiesField.setAccessible(true);
93+
java.util.Properties connectionProperties =
94+
(java.util.Properties) connectionPropertiesField.get(basicDataSource);
95+
// Connection properties should be null or empty when no loginTimeout is set
96+
if (connectionProperties != null) {
97+
assertEquals(
98+
"Connection properties should be empty when no loginTimeout is set",
99+
0,
100+
connectionProperties.size());
101+
}
102+
} catch (Exception e) {
103+
throw new RuntimeException("Failed to access connectionProperties field", e);
104+
}
105+
}
106+
}

v2/datastream-to-sql/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSQL.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,17 @@ public interface Options extends PipelineOptions, StreamingOptions {
260260
int getNumThreads();
261261

262262
void setNumThreads(int value);
263+
264+
@TemplateParameter.Integer(
265+
order = 16,
266+
groupName = "Target",
267+
optional = true,
268+
description = "Database login timeout in seconds.",
269+
helpText =
270+
"The timeout in seconds for database login attempts. This helps prevent connection hangs when multiple workers try to connect simultaneously.")
271+
Integer getDatabaseLoginTimeout();
272+
273+
void setDatabaseLoginTimeout(Integer value);
263274
}
264275

265276
/**
@@ -315,7 +326,8 @@ public static CdcJdbcIO.DataSourceConfiguration getDataSourceConfiguration(Optio
315326
CdcJdbcIO.DataSourceConfiguration.create(jdbcDriverName, jdbcDriverConnectionString)
316327
.withUsername(options.getDatabaseUser())
317328
.withPassword(options.getDatabasePassword())
318-
.withMaxIdleConnections(new Integer(0));
329+
.withMaxIdleConnections(new Integer(0))
330+
.withLoginTimeout(options.getDatabaseLoginTimeout());
319331

320332
return dataSourceConfiguration;
321333
}

v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSQLIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ public void testDataStreamOracleToPostgresJson() throws IOException {
143143
simpleOracleToJdbcTest(JDBCType.POSTGRES, Function.identity());
144144
}
145145

146+
@Test
147+
public void testDataStreamWithLoginTimeout() throws IOException {
148+
simpleOracleToJdbcTest(
149+
JDBCType.POSTGRES, config -> config.addParameter("databaseLoginTimeout", "30"));
150+
}
151+
146152
@Test
147153
@Ignore("Consolidate feature matrix for expensive tests")
148154
public void testDataStreamOracleToMySqlJsonGCSNotifications() throws IOException {

0 commit comments

Comments
 (0)