Skip to content

Commit 51e1efb

Browse files
authored
Merge pull request #183 from youzh00/feature/issue-139-reentrantlocks
Replace synchronized blocks with ReentrantLock in JDBC driver and server classes (Issue #139)
2 parents 920e762 + 0ba9eb2 commit 51e1efb

File tree

9 files changed

+223
-118
lines changed

9 files changed

+223
-118
lines changed

ojp-jdbc-driver/src/main/java/org/openjproxy/grpc/client/MultinodeUrlParser.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ public class MultinodeUrlParser {
2929
// Cache of statement services keyed by server configuration
3030
private static final Map<String, StatementService> statementServiceCache = new ConcurrentHashMap<>();
3131

32-
3332
/**
3433
* Helper class to return the service, connection URL, server endpoints, and datasource names.
3534
*/
@@ -57,7 +56,8 @@ public static class ServiceAndUrl {
5756
* @param url the JDBC URL (should be already cleaned of datasource names)
5857
* @param dataSourceNames optional list of datasource names corresponding to each endpoint
5958
*/
60-
public synchronized static ServiceAndUrl getOrCreateStatementService(String url, List<String> dataSourceNames) {
59+
// No synchronization needed: computeIfAbsent on ConcurrentHashMap provides the required atomicity.
60+
public static ServiceAndUrl getOrCreateStatementService(String url, List<String> dataSourceNames) {
6161
try {
6262
// Try to parse as multinode URL
6363
List<ServerEndpoint> endpoints = MultinodeUrlParser.parseServerEndpoints(url, dataSourceNames);
@@ -121,7 +121,7 @@ public synchronized static ServiceAndUrl getOrCreateStatementService(String url,
121121
* Gets or creates a StatementService implementation based on the URL.
122122
* Backward compatibility method without datasource names.
123123
*/
124-
public synchronized static ServiceAndUrl getOrCreateStatementService(String url) {
124+
public static ServiceAndUrl getOrCreateStatementService(String url) {
125125
return getOrCreateStatementService(url, null);
126126
}
127127

ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/ResultSet.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.Map;
4040
import java.util.TreeMap;
4141
import java.util.concurrent.atomic.AtomicInteger;
42-
4342
import static org.openjproxy.grpc.client.GrpcExceptionHandler.handle;
4443

4544
@Slf4j
@@ -723,7 +722,7 @@ public boolean isAfterLast() throws SQLException {
723722
}
724723

725724
@Override
726-
public synchronized boolean isFirst() throws SQLException {
725+
public boolean isFirst() throws SQLException {
727726
log.debug("isFirst called");
728727
if (this.inProxyMode) {
729728
return super.isFirst();

ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/xa/OjpXAConnection.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Properties;
25+
import java.util.concurrent.locks.ReentrantLock;
2526

2627
/**
2728
* Implementation of XAConnection that connects to the OJP server for XA operations.
@@ -47,6 +48,7 @@ public class OjpXAConnection implements XAConnection, ServerHealthListener {
4748
private List<String> serverEndpoints;
4849
private final List<ConnectionEventListener> listeners = new ArrayList<>();
4950
private String boundServerAddress; // Phase 2: Track which server this connection is bound to
51+
private final ReentrantLock sessionLock = new ReentrantLock();
5052

5153
public OjpXAConnection(StatementService statementService, String url, String user, String password, Properties properties, List<String> serverEndpoints) {
5254
log.debug("Creating OjpXAConnection for URL: {}", url);
@@ -73,12 +75,15 @@ public OjpXAConnection(StatementService statementService, String url, String use
7375
* Lazily create the server-side session when first needed.
7476
* This avoids creating sessions that may never be used.
7577
*/
76-
private synchronized SessionInfo getOrCreateSession() throws SQLException {
78+
private SessionInfo getOrCreateSession() throws SQLException {
7779
if (sessionInfo != null) {
7880
return sessionInfo;
7981
}
80-
82+
sessionLock.lock();
8183
try {
84+
if (sessionInfo != null) {
85+
return sessionInfo;
86+
}
8287
// Connect to server with XA flag enabled
8388
ConnectionDetails.Builder connBuilder = ConnectionDetails.newBuilder()
8489
.setUrl(url)
@@ -115,6 +120,8 @@ private synchronized SessionInfo getOrCreateSession() throws SQLException {
115120
} catch (Exception e) {
116121
log.error("Failed to create XA connection session", e);
117122
throw new SQLException("Failed to create XA connection session", e);
123+
}finally {
124+
sessionLock.unlock();
118125
}
119126
}
120127

@@ -125,17 +132,22 @@ private synchronized SessionInfo getOrCreateSession() throws SQLException {
125132
* @return The new SessionInfo
126133
* @throws SQLException if session recreation fails
127134
*/
128-
synchronized SessionInfo recreateSession() throws SQLException {
129-
log.info("Recreating XA session (previous session: {})",
130-
sessionInfo != null ? sessionInfo.getSessionUUID() : "none");
131-
132-
// Clear existing session
133-
sessionInfo = null;
134-
boundServerAddress = null;
135-
xaResource = null; // Force recreation of XAResource with new session
136-
137-
// Create new session (will use round-robin to select a different server)
138-
return getOrCreateSession();
135+
SessionInfo recreateSession() throws SQLException {
136+
sessionLock.lock();
137+
try {
138+
log.info("Recreating XA session (previous session: {})",
139+
sessionInfo != null ? sessionInfo.getSessionUUID() : "none");
140+
141+
// Clear existing session
142+
sessionInfo = null;
143+
boundServerAddress = null;
144+
xaResource = null; // Force recreation of XAResource with new session
145+
146+
// Create new session (will use round-robin to select a different server)
147+
return getOrCreateSession();
148+
} finally {
149+
sessionLock.unlock();
150+
}
139151
}
140152

141153
@Override

ojp-jdbc-driver/src/main/java/org/openjproxy/jdbc/xa/OjpXADataSource.java

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.sql.SQLFeatureNotSupportedException;
1818
import java.util.List;
1919
import java.util.Properties;
20+
import java.util.concurrent.locks.ReentrantLock;
2021
import java.util.logging.Logger;
2122

2223
/**
@@ -57,6 +58,8 @@ public class OjpXADataSource implements XADataSource {
5758
private String dataSourceName;
5859
private List<String> serverEndpoints;
5960

61+
private static final ReentrantLock initLock = new ReentrantLock();
62+
6063
public OjpXADataSource() {
6164
log.debug("Creating OjpXADataSource");
6265
}
@@ -73,46 +76,56 @@ public OjpXADataSource(String url, String user, String password) {
7376
* This is done lazily when the first XA connection is requested.
7477
* The GRPC channel is opened once and reused by all XA connections from this datasource.
7578
*/
76-
private synchronized void initializeIfNeeded() throws SQLException {
79+
private void initializeIfNeeded() throws SQLException {
80+
// Fast path - no lock needed if already initialized
7781
if (statementService != null) {
7882
return; // Already initialized
7983
}
80-
81-
if (url == null || url.isEmpty()) {
82-
throw new SQLException("URL is not set");
83-
}
84-
85-
// Parse URL to extract datasource name and clean URL
86-
UrlParser.UrlParseResult urlParseResult = UrlParser.parseUrlWithDataSource(url);
87-
this.cleanUrl = urlParseResult.cleanUrl;
88-
this.dataSourceName = urlParseResult.dataSourceName;
89-
90-
log.debug("Parsed URL - clean: {}, dataSource: {}", cleanUrl, dataSourceName);
91-
92-
// Load ojp.properties file and extract datasource-specific configuration
93-
Properties ojpProperties = DatasourcePropertiesLoader.loadOjpPropertiesForDataSource(dataSourceName);
94-
if (ojpProperties != null && !ojpProperties.isEmpty()) {
95-
// Merge ojp.properties with any manually set properties
96-
for (String key : ojpProperties.stringPropertyNames()) {
97-
if (!properties.containsKey(key)) {
98-
properties.setProperty(key, ojpProperties.getProperty(key));
84+
85+
initLock.lock();
86+
try {
87+
// Double-check inside lock
88+
if (statementService != null) {
89+
return;
90+
}
91+
92+
if (url == null || url.isEmpty()) {
93+
throw new SQLException("URL is not set");
94+
}
95+
// Parse URL to extract datasource name and clean URL
96+
UrlParser.UrlParseResult urlParseResult = UrlParser.parseUrlWithDataSource(url);
97+
this.cleanUrl = urlParseResult.cleanUrl;
98+
this.dataSourceName = urlParseResult.dataSourceName;
99+
100+
log.debug("Parsed URL - clean: {}, dataSource: {}", cleanUrl, dataSourceName);
101+
102+
// Load ojp.properties file and extract datasource-specific configuration
103+
Properties ojpProperties = DatasourcePropertiesLoader.loadOjpPropertiesForDataSource(dataSourceName);
104+
if (ojpProperties != null && !ojpProperties.isEmpty()) {
105+
// Merge ojp.properties with any manually set properties
106+
for (String key : ojpProperties.stringPropertyNames()) {
107+
if (!properties.containsKey(key)) {
108+
properties.setProperty(key, ojpProperties.getProperty(key));
109+
}
99110
}
111+
log.debug("Loaded ojp.properties with {} properties for dataSource: {}", ojpProperties.size(), dataSourceName);
100112
}
101-
log.debug("Loaded ojp.properties with {} properties for dataSource: {}", ojpProperties.size(), dataSourceName);
113+
114+
// Initialize StatementService - this will open the GRPC channel on first use
115+
log.debug("Initializing StatementServiceGrpcClient for XA datasource: {}", dataSourceName);
116+
117+
// Detect multinode vs single-node configuration and get the URL to use for connection
118+
MultinodeUrlParser.ServiceAndUrl serviceAndUrl = MultinodeUrlParser.getOrCreateStatementService(cleanUrl);
119+
statementService = serviceAndUrl.getService();
120+
this.serverEndpoints = serviceAndUrl.getServerEndpoints();
121+
122+
// The GRPC channel will be opened lazily on the first connect() call
123+
// Since this StatementService instance is shared by all XA connections from this datasource,
124+
// the channel is opened once and reused
125+
log.info("StatementService initialized for datasource: {}. GRPC channel will open on first use.", dataSourceName);
126+
} finally {
127+
initLock.unlock();
102128
}
103-
104-
// Initialize StatementService - this will open the GRPC channel on first use
105-
log.debug("Initializing StatementServiceGrpcClient for XA datasource: {}", dataSourceName);
106-
107-
// Detect multinode vs single-node configuration and get the URL to use for connection
108-
MultinodeUrlParser.ServiceAndUrl serviceAndUrl = MultinodeUrlParser.getOrCreateStatementService(cleanUrl);
109-
statementService = serviceAndUrl.getService();
110-
this.serverEndpoints = serviceAndUrl.getServerEndpoints();
111-
112-
// The GRPC channel will be opened lazily on the first connect() call
113-
// Since this StatementService instance is shared by all XA connections from this datasource,
114-
// the channel is opened once and reused
115-
log.info("StatementService initialized for datasource: {}. GRPC channel will open on first use.", dataSourceName);
116129
}
117130

118131

ojp-jdbc-driver/src/test/java/openjproxy/jdbc/testutil/SQLServerTestContainer.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import org.testcontainers.containers.MSSQLServerContainer;
44

5+
import java.util.concurrent.locks.ReentrantLock;
6+
57
/**
68
* Singleton SQL Server test container for all SQL Server integration tests.
79
* This ensures that all tests share the same SQL Server instance to improve test performance
@@ -15,35 +17,46 @@ public class SQLServerTestContainer {
1517
private static MSSQLServerContainer<?> container;
1618
private static boolean isStarted = false;
1719
private static boolean shutdownHookRegistered = false;
18-
20+
21+
private static ReentrantLock initLock = new ReentrantLock();
1922
/**
2023
* Gets or creates the shared SQL Server test container instance.
2124
* The container is automatically started on first access.
2225
*
2326
* @return the shared MSSQLServerContainer instance
2427
*/
25-
public static synchronized MSSQLServerContainer<?> getInstance() {
26-
if (container == null) {
27-
container = new MSSQLServerContainer<>(MSSQL_IMAGE)
28-
.acceptLicense();
28+
public static MSSQLServerContainer<?> getInstance() {
29+
// Fast-path: if container already created and running, return it without locking
30+
MSSQLServerContainer<?> local = container;
31+
if (local != null && local.isRunning()) {
32+
return local;
2933
}
30-
31-
if (!isStarted) {
32-
container.start();
33-
isStarted = true;
34-
35-
// Add shutdown hook to stop container when JVM exits
36-
if (!shutdownHookRegistered) {
37-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
38-
if (container != null && container.isRunning()) {
39-
container.stop();
40-
}
41-
}));
42-
shutdownHookRegistered = true;
34+
35+
initLock.lock();
36+
try {
37+
if (container == null) {
38+
container = new MSSQLServerContainer<>(MSSQL_IMAGE).acceptLicense();
4339
}
40+
41+
if (!isStarted) {
42+
container.start();
43+
isStarted = true;
44+
45+
// Add shutdown hook to stop container when JVM exits
46+
if (!shutdownHookRegistered) {
47+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
48+
if (container != null && container.isRunning()) {
49+
container.stop();
50+
}
51+
}));
52+
shutdownHookRegistered = true;
53+
}
54+
}
55+
56+
return container;
57+
}finally {
58+
initLock.unlock();
4459
}
45-
46-
return container;
4760
}
4861

4962
/**

ojp-jdbc-driver/src/test/java/openjproxy/jdbc/testutil/TestDBUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.sql.ResultSetMetaData;
1212
import java.sql.SQLException;
1313
import java.sql.Statement;
14+
import java.util.concurrent.atomic.AtomicInteger;
1415

1516
import static org.junit.jupiter.api.Assertions.assertNotNull;
1617
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -165,13 +166,13 @@ public void close() {
165166
* Simple Xid implementation for XA transactions in tests.
166167
*/
167168
private static class SimpleXid implements Xid {
168-
private static int counter = 0;
169+
// AtomicInteger replaces synchronized(SimpleXid.class) for thread-safe ID generation
170+
// and avoids virtual-thread pinning
171+
private static AtomicInteger counter = new AtomicInteger(0);;
169172
private final int id;
170173

171174
public SimpleXid() {
172-
synchronized (SimpleXid.class) {
173-
this.id = counter++;
174-
}
175+
this.id = counter.getAndIncrement();
175176
}
176177

177178
@Override

ojp-jdbc-driver/src/test/java/org/openjproxy/grpc/client/MultinodeXAIntegrationTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.ExecutorService;
3030
import java.util.concurrent.Executors;
3131
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.concurrent.locks.ReentrantLock;
3233

3334
import static org.junit.jupiter.api.Assumptions.assumeFalse;
3435

@@ -47,6 +48,7 @@ public class MultinodeXAIntegrationTest {
4748
private static AtomicInteger totalFailedQueries = new AtomicInteger(0);
4849
private static AtomicInteger nonConnectivityFailedQueries = new AtomicInteger(0);
4950
private static ExecutorService queryExecutor = new RoundRobinExecutorService(100);
51+
private static final ReentrantLock multinodeXaLock = new ReentrantLock();
5052

5153

5254
private static AtomikosDataSourceBean xaDataSource;
@@ -252,36 +254,37 @@ protected static Connection getConnection(String driverClass, String url, String
252254
// Load driver if needed
253255
Class.forName(driverClass);
254256
Connection conn;
255-
synchronized (MultinodeIntegrationTest.class) {
257+
multinodeXaLock.lock();
258+
try {
256259
if (xaDataSource == null) {
257260
OjpXADataSource xaDataSourceImpl = new OjpXADataSource();
258261
//Kept for reference testing with Postgres native XA
259262
//PGXADataSource xaDataSourceImpl = new PGXADataSource();
260263
xaDataSourceImpl.setUrl(url);
261264
xaDataSourceImpl.setUser(user);
262265
xaDataSourceImpl.setPassword(password);
263-
264266
xaDataSource = new AtomikosDataSourceBean();
265267
xaDataSource.setUniqueResourceName("ATOMIKOS_OJP_XA_DS");
266268
xaDataSource.setXaDataSource(xaDataSourceImpl);
267269
xaDataSource.setMinPoolSize(ATOMIKOS_MIN_POOL_SIZE);
268270
xaDataSource.setMaxPoolSize(ATOMIKOS_MAX_POOL_SIZE);
269271
log.info("✓ Atomikos XA DataSource initialized");
270272
}
271-
272273
// Block below is to increase the chances getting different connections to better test multinode balancing
273274
// as per Atomikos tends to reuse the very same connection whenever possible.
274-
int num = (int)(Math.random() * 10) + 1;
275+
int num = (int) (Math.random() * 10) + 1;
275276
List<Connection> connectionList = new ArrayList<>();
276277
for (int i = 0; i < num; i++) {
277278
connectionList.add(xaDataSource.getConnection());
278279
}
279-
conn = connectionList.remove(connectionList.size()-1);
280-
for(Connection c : connectionList){
280+
conn = connectionList.remove(connectionList.size() - 1);
281+
for (Connection c : connectionList) {
281282
c.close();
282283
}
284+
return conn;
285+
} finally {
286+
multinodeXaLock.unlock();
283287
}
284-
return conn;
285288
}
286289

287290
private static void runExactQuerySequence(int threadNum, String driverClass, String url, String user, String password) {

0 commit comments

Comments
 (0)