Skip to content

Commit 468001b

Browse files
authored
Fix race condition (#34058)
* Update JdbcIO.java * Fix vars * Fix typing problems * Use lineage check so that we maintain safety * Remove dupe assignment * Assign correctly inside lock to avoid race * Add other getConnection * Fix bad merge * Remove unneeded logs * Shrink critical section * Cleanup * Fix bad refactor * typo
1 parent 228f028 commit 468001b

File tree

1 file changed

+18
-3
lines changed
  • sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc

1 file changed

+18
-3
lines changed

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import java.util.Set;
4444
import java.util.concurrent.ConcurrentHashMap;
4545
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.locks.Lock;
47+
import java.util.concurrent.locks.ReentrantLock;
4648
import java.util.function.Predicate;
4749
import java.util.stream.Collectors;
4850
import java.util.stream.IntStream;
@@ -1609,6 +1611,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output
16091611
private final int fetchSize;
16101612
private final boolean disableAutoCommit;
16111613

1614+
private Lock connectionLock = new ReentrantLock();
16121615
private @Nullable DataSource dataSource;
16131616
private @Nullable Connection connection;
16141617
private @Nullable KV<@Nullable String, String> reportedLineage;
@@ -1637,8 +1640,13 @@ private Connection getConnection() throws SQLException {
16371640
Connection connection = this.connection;
16381641
if (connection == null) {
16391642
DataSource validSource = checkStateNotNull(this.dataSource);
1640-
connection = checkStateNotNull(validSource).getConnection();
1641-
this.connection = connection;
1643+
connectionLock.lock();
1644+
try {
1645+
connection = validSource.getConnection();
1646+
this.connection = connection;
1647+
} finally {
1648+
connectionLock.unlock();
1649+
}
16421650

16431651
// report Lineage if not haven't done so
16441652
KV<@Nullable String, String> schemaWithTable =
@@ -2663,6 +2671,7 @@ abstract Builder<T, V> setMaxBatchBufferingDuration(
26632671
Metrics.distribution(WriteFn.class, "milliseconds_per_batch");
26642672

26652673
private final WriteFnSpec<T, V> spec;
2674+
private Lock connectionLock = new ReentrantLock();
26662675
private @Nullable DataSource dataSource;
26672676
private @Nullable Connection connection;
26682677
private @Nullable PreparedStatement preparedStatement;
@@ -2700,7 +2709,13 @@ private Connection getConnection() throws SQLException {
27002709
Connection connection = this.connection;
27012710
if (connection == null) {
27022711
DataSource validSource = checkStateNotNull(dataSource);
2703-
connection = validSource.getConnection();
2712+
connectionLock.lock();
2713+
try {
2714+
connection = validSource.getConnection();
2715+
} finally {
2716+
connectionLock.unlock();
2717+
}
2718+
27042719
connection.setAutoCommit(false);
27052720
preparedStatement =
27062721
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());

0 commit comments

Comments
 (0)