Skip to content

Commit ebf6c71

Browse files
Make DatabricksPooledConnection thread-safe (#535)
1 parent a61af5e commit ebf6c71

File tree

1 file changed

+98
-75
lines changed

1 file changed

+98
-75
lines changed

src/main/java/com/databricks/jdbc/pooling/DatabricksPooledConnection.java

Lines changed: 98 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import java.lang.reflect.Proxy;
1212
import java.sql.*;
1313
import java.util.Arrays;
14-
import java.util.HashSet;
1514
import java.util.Set;
15+
import java.util.concurrent.CopyOnWriteArraySet;
1616
import javax.annotation.Nullable;
1717
import javax.sql.ConnectionEvent;
1818
import javax.sql.ConnectionEventListener;
@@ -23,13 +23,10 @@ public class DatabricksPooledConnection implements PooledConnection {
2323

2424
private static final JdbcLogger LOGGER =
2525
JdbcLoggerFactory.getLogger(DatabricksPooledConnection.class);
26-
private final Set<ConnectionEventListener> listeners = new HashSet<>();
26+
private final Set<ConnectionEventListener> listeners = new CopyOnWriteArraySet<>();
2727
private Connection physicalConnection;
2828
private ConnectionHandler connectionHandler;
29-
30-
public Connection getPhysicalConnection() {
31-
return this.physicalConnection;
32-
}
29+
private final Object lock = new Object();
3330

3431
/**
3532
* Creates a new PooledConnection representing the specified physical connection.
@@ -40,27 +37,6 @@ public DatabricksPooledConnection(Connection physicalConnection) {
4037
this.physicalConnection = physicalConnection;
4138
}
4239

43-
/** Fires a connection closed event to all listeners. */
44-
void fireConnectionClosed() {
45-
LOGGER.debug("void fireConnectionClosed()");
46-
for (ConnectionEventListener listener : this.listeners) {
47-
listener.connectionClosed(new ConnectionEvent(this));
48-
}
49-
}
50-
51-
/**
52-
* Fires a connection error event to all listeners
53-
*
54-
* @param e the SQLException to consider
55-
*/
56-
private void fireConnectionError(SQLException e) {
57-
LOGGER.debug(
58-
String.format("private void fireConnectionError(SQLException e = {%s})", e.toString()));
59-
for (ConnectionEventListener listener : this.listeners) {
60-
listener.connectionErrorOccurred(new ConnectionEvent(this, e));
61-
}
62-
}
63-
6440
@Override
6541
public void addConnectionEventListener(ConnectionEventListener connectionEventListener) {
6642
listeners.add(connectionEventListener);
@@ -85,16 +61,18 @@ public void addStatementEventListener(StatementEventListener listener) {
8561
@Override
8662
public void close() throws SQLException {
8763
LOGGER.debug("public void close()");
88-
if (connectionHandler != null && !connectionHandler.isClosed()) {
89-
connectionHandler.close();
90-
}
91-
if (physicalConnection == null) {
92-
return;
93-
}
94-
try {
95-
physicalConnection.close();
96-
} finally {
97-
physicalConnection = null;
64+
synchronized (lock) {
65+
if (connectionHandler != null && !connectionHandler.isClosed()) {
66+
connectionHandler.close();
67+
}
68+
if (physicalConnection == null) {
69+
return;
70+
}
71+
try {
72+
physicalConnection.close();
73+
} finally {
74+
physicalConnection = null;
75+
}
9876
}
9977
}
10078

@@ -110,19 +88,46 @@ public void close() throws SQLException {
11088
@Override
11189
public Connection getConnection() throws SQLException {
11290
LOGGER.debug("public PooledConnection getConnection()");
113-
if (physicalConnection == null) {
114-
// Before throwing the exception, notify the listeners
115-
DatabricksSQLException sqlException =
116-
new DatabricksSQLException("This PooledConnection has already been closed.");
117-
fireConnectionError(sqlException);
118-
throw sqlException;
91+
synchronized (lock) {
92+
if (physicalConnection == null) {
93+
// Before throwing the exception, notify the listeners
94+
DatabricksSQLException sqlException =
95+
new DatabricksSQLException("This PooledConnection has already been closed.");
96+
fireConnectionError(sqlException);
97+
throw sqlException;
98+
}
99+
// Only one connection can be open at a time from this PooledConnection
100+
if (connectionHandler != null && !connectionHandler.isClosed()) {
101+
connectionHandler.close();
102+
}
103+
connectionHandler = new ConnectionHandler(physicalConnection);
104+
return connectionHandler.getVirtualConnection();
119105
}
120-
// Only one connection can be open at a time from this PooledConnection
121-
if (connectionHandler != null && !connectionHandler.isClosed()) {
122-
connectionHandler.close();
106+
}
107+
108+
public Connection getPhysicalConnection() {
109+
return this.physicalConnection;
110+
}
111+
112+
/** Fires a connection closed event to all listeners. */
113+
private void fireConnectionClosed() {
114+
LOGGER.debug("void fireConnectionClosed()");
115+
for (ConnectionEventListener listener : this.listeners) {
116+
listener.connectionClosed(new ConnectionEvent(this));
117+
}
118+
}
119+
120+
/**
121+
* Fires a connection error event to all listeners
122+
*
123+
* @param e the SQLException to consider
124+
*/
125+
private void fireConnectionError(SQLException e) {
126+
LOGGER.debug(
127+
String.format("private void fireConnectionError(SQLException e = {%s})", e.toString()));
128+
for (ConnectionEventListener listener : this.listeners) {
129+
listener.connectionErrorOccurred(new ConnectionEvent(this, e));
123130
}
124-
connectionHandler = new ConnectionHandler(physicalConnection);
125-
return connectionHandler.getVirtualConnection();
126131
}
127132

128133
/**
@@ -131,10 +136,12 @@ public Connection getConnection() throws SQLException {
131136
*/
132137
private class ConnectionHandler implements InvocationHandler {
133138
private Connection physicalConnection;
134-
private Connection
135-
virtualConnection; // the Connection the client is currently using, which is not a physical
136139

137-
// connection
140+
/**
141+
* Connection being used by the client. This is a proxy object that wraps the physical
142+
* connection.
143+
*/
144+
private Connection virtualConnection;
138145

139146
ConnectionHandler(Connection physicalConnection) {
140147
this.physicalConnection = physicalConnection;
@@ -153,7 +160,7 @@ public Object invoke(Object proxy, Method method, @Nullable Object[] args) throw
153160
LOGGER.debug(
154161
String.format(
155162
"public Object invoke(Object proxy, Method method = {%s}, Object[] args = {%s})",
156-
method, args));
163+
method, Arrays.toString(args)));
157164
final String methodName = method.getName();
158165
if (method.getDeclaringClass() == Object.class) {
159166
if (methodName.equals("toString")) {
@@ -168,26 +175,32 @@ public Object invoke(Object proxy, Method method, @Nullable Object[] args) throw
168175
try {
169176
return method.invoke(physicalConnection, args);
170177
} catch (InvocationTargetException e) {
171-
// throwing.nullable
178+
// throwing nullable
172179
throw e.getTargetException();
173180
}
174181
}
175182

176183
if (methodName.equals("isClosed")) {
177-
return physicalConnection == null || physicalConnection.isClosed();
184+
synchronized (DatabricksPooledConnection.this.lock) {
185+
return physicalConnection == null || physicalConnection.isClosed();
186+
}
178187
}
179188
// Do not close the physical connection, remove reference and fire close event
180189
if (methodName.equals("close")) {
181-
if (physicalConnection != null) {
182-
physicalConnection = null;
183-
virtualConnection = null;
184-
connectionHandler = null;
185-
fireConnectionClosed();
190+
synchronized (DatabricksPooledConnection.this.lock) {
191+
if (physicalConnection != null) {
192+
physicalConnection = null;
193+
virtualConnection = null;
194+
connectionHandler = null;
195+
fireConnectionClosed();
196+
}
186197
}
187198
return null;
188199
}
189-
if (physicalConnection == null || physicalConnection.isClosed()) {
190-
throw new DatabricksSQLException("Connection has been closed.");
200+
synchronized (DatabricksPooledConnection.this.lock) {
201+
if (physicalConnection == null || physicalConnection.isClosed()) {
202+
throw new DatabricksSQLException("Connection has been closed.");
203+
}
191204
}
192205

193206
// From here on in, we invoke via reflection and catch exceptions
@@ -226,13 +239,17 @@ Connection getVirtualConnection() {
226239

227240
public void close() {
228241
LOGGER.debug("public void close()");
229-
physicalConnection = null;
230-
virtualConnection = null;
231-
// No close event fired here: see JDBC 4.3 Optional Package spec section 11.4
242+
synchronized (DatabricksPooledConnection.this.lock) {
243+
physicalConnection = null;
244+
virtualConnection = null;
245+
// No close event fired here: see JDBC 4.3 Optional Package spec section 11.4
246+
}
232247
}
233248

234249
public boolean isClosed() {
235-
return physicalConnection == null;
250+
synchronized (DatabricksPooledConnection.this.lock) {
251+
return physicalConnection == null;
252+
}
236253
}
237254
}
238255

@@ -271,23 +288,29 @@ public Object invoke(Object proxy, Method method, @Nullable Object[] args) throw
271288
}
272289

273290
if (methodName.equals("isClosed")) {
274-
return physicalStatement == null || physicalStatement.isClosed();
291+
synchronized (this) {
292+
return physicalStatement == null || physicalStatement.isClosed();
293+
}
275294
}
276295
if (methodName.equals("close")) {
277-
if (physicalStatement == null || physicalStatement.isClosed()) {
278-
return null;
296+
synchronized (this) {
297+
if (physicalStatement == null || physicalStatement.isClosed()) {
298+
return null;
299+
}
300+
conHandler = null;
301+
physicalStatement.close();
302+
physicalStatement = null;
279303
}
280-
conHandler = null;
281-
physicalStatement.close();
282-
physicalStatement = null;
283304
return null;
284305
}
285-
if (physicalStatement == null || physicalStatement.isClosed()) {
286-
throw new DatabricksSQLException("Statement has been closed.");
306+
synchronized (this) {
307+
if (physicalStatement == null || physicalStatement.isClosed()) {
308+
throw new DatabricksSQLException("Statement has been closed.");
309+
}
287310
}
288311
if (methodName.equals("getConnection")) {
289-
return conHandler
290-
.getVirtualConnection(); // the virtual connection from the connection handler
312+
// the virtual connection from the connection handler
313+
return conHandler.getVirtualConnection();
291314
}
292315

293316
// Delegate the call to the physical Statement.

0 commit comments

Comments
 (0)