Skip to content

Commit b899655

Browse files
committed
implement blocking data source wrapper to prevent that long-running tests exceed the maximum limit of database connections
1 parent f4aa389 commit b899655

File tree

3 files changed

+696
-304
lines changed

3 files changed

+696
-304
lines changed
Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
package io.zonky.test.db.flyway;
2+
3+
import javax.sql.DataSource;
4+
import java.io.PrintWriter;
5+
import java.sql.Array;
6+
import java.sql.Blob;
7+
import java.sql.CallableStatement;
8+
import java.sql.Clob;
9+
import java.sql.Connection;
10+
import java.sql.DatabaseMetaData;
11+
import java.sql.NClob;
12+
import java.sql.PreparedStatement;
13+
import java.sql.SQLClientInfoException;
14+
import java.sql.SQLException;
15+
import java.sql.SQLFeatureNotSupportedException;
16+
import java.sql.SQLWarning;
17+
import java.sql.SQLXML;
18+
import java.sql.Savepoint;
19+
import java.sql.Statement;
20+
import java.sql.Struct;
21+
import java.util.Map;
22+
import java.util.Properties;
23+
import java.util.concurrent.Executor;
24+
import java.util.concurrent.Semaphore;
25+
import java.util.logging.Logger;
26+
27+
/**
28+
* Blocking data source wrapper that should avoid to exhaustion of database connections.
29+
* It is a better choice than using a connection pool because database connections can be released as soon as possible.
30+
*/
31+
public class BlockingDataSourceWrapper implements DataSource {
32+
33+
private final DataSource delegate;
34+
private final Semaphore semaphore;
35+
36+
public BlockingDataSourceWrapper(DataSource delegate, Semaphore semaphore) {
37+
this.delegate = delegate;
38+
this.semaphore = semaphore;
39+
}
40+
41+
@Override
42+
public Connection getConnection() throws SQLException {
43+
semaphore.acquireUninterruptibly();
44+
Connection connection = delegate.getConnection();
45+
return new BlockingConnectionWrapper(connection, semaphore);
46+
}
47+
48+
@Override
49+
public Connection getConnection(String username, String password) throws SQLException {
50+
semaphore.acquireUninterruptibly();
51+
Connection connection = delegate.getConnection(username, password);
52+
return new BlockingConnectionWrapper(connection, semaphore);
53+
}
54+
55+
@Override
56+
public PrintWriter getLogWriter() throws SQLException {
57+
return delegate.getLogWriter();
58+
}
59+
60+
@Override
61+
public void setLogWriter(PrintWriter out) throws SQLException {
62+
delegate.setLogWriter(out);
63+
}
64+
65+
@Override
66+
public void setLoginTimeout(int seconds) throws SQLException {
67+
delegate.setLoginTimeout(seconds);
68+
}
69+
70+
@Override
71+
public int getLoginTimeout() throws SQLException {
72+
return delegate.getLoginTimeout();
73+
}
74+
75+
@Override
76+
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
77+
return delegate.getParentLogger();
78+
}
79+
80+
@Override
81+
public <T> T unwrap(Class<T> iface) throws SQLException {
82+
return delegate.unwrap(iface);
83+
}
84+
85+
@Override
86+
public boolean isWrapperFor(Class<?> iface) throws SQLException {
87+
return delegate.isWrapperFor(iface);
88+
}
89+
90+
protected static class BlockingConnectionWrapper implements Connection {
91+
92+
private final Connection delegate;
93+
private final Semaphore semaphore;
94+
95+
private BlockingConnectionWrapper(Connection delegate, Semaphore semaphore) {
96+
this.delegate = delegate;
97+
this.semaphore = semaphore;
98+
}
99+
100+
@Override
101+
public Statement createStatement() throws SQLException {
102+
return delegate.createStatement();
103+
}
104+
105+
@Override
106+
public PreparedStatement prepareStatement(String sql) throws SQLException {
107+
return delegate.prepareStatement(sql);
108+
}
109+
110+
@Override
111+
public CallableStatement prepareCall(String sql) throws SQLException {
112+
return delegate.prepareCall(sql);
113+
}
114+
115+
@Override
116+
public String nativeSQL(String sql) throws SQLException {
117+
return delegate.nativeSQL(sql);
118+
}
119+
120+
@Override
121+
public void setAutoCommit(boolean autoCommit) throws SQLException {
122+
delegate.setAutoCommit(autoCommit);
123+
}
124+
125+
@Override
126+
public boolean getAutoCommit() throws SQLException {
127+
return delegate.getAutoCommit();
128+
}
129+
130+
@Override
131+
public void commit() throws SQLException {
132+
delegate.commit();
133+
}
134+
135+
@Override
136+
public void rollback() throws SQLException {
137+
delegate.rollback();
138+
}
139+
140+
@Override
141+
public void close() throws SQLException {
142+
delegate.close();
143+
semaphore.release();
144+
}
145+
146+
@Override
147+
public boolean isClosed() throws SQLException {
148+
return delegate.isClosed();
149+
}
150+
151+
@Override
152+
public DatabaseMetaData getMetaData() throws SQLException {
153+
return delegate.getMetaData();
154+
}
155+
156+
@Override
157+
public void setReadOnly(boolean readOnly) throws SQLException {
158+
delegate.setReadOnly(readOnly);
159+
}
160+
161+
@Override
162+
public boolean isReadOnly() throws SQLException {
163+
return delegate.isReadOnly();
164+
}
165+
166+
@Override
167+
public void setCatalog(String catalog) throws SQLException {
168+
delegate.setCatalog(catalog);
169+
}
170+
171+
@Override
172+
public String getCatalog() throws SQLException {
173+
return delegate.getCatalog();
174+
}
175+
176+
@Override
177+
public void setTransactionIsolation(int level) throws SQLException {
178+
delegate.setTransactionIsolation(level);
179+
}
180+
181+
@Override
182+
public int getTransactionIsolation() throws SQLException {
183+
return delegate.getTransactionIsolation();
184+
}
185+
186+
@Override
187+
public SQLWarning getWarnings() throws SQLException {
188+
return delegate.getWarnings();
189+
}
190+
191+
@Override
192+
public void clearWarnings() throws SQLException {
193+
delegate.clearWarnings();
194+
}
195+
196+
@Override
197+
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
198+
return delegate.createStatement(resultSetType, resultSetConcurrency);
199+
}
200+
201+
@Override
202+
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
203+
return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
204+
}
205+
206+
@Override
207+
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
208+
return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
209+
}
210+
211+
@Override
212+
public Map<String, Class<?>> getTypeMap() throws SQLException {
213+
return delegate.getTypeMap();
214+
}
215+
216+
@Override
217+
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
218+
delegate.setTypeMap(map);
219+
}
220+
221+
@Override
222+
public void setHoldability(int holdability) throws SQLException {
223+
delegate.setHoldability(holdability);
224+
}
225+
226+
@Override
227+
public int getHoldability() throws SQLException {
228+
return delegate.getHoldability();
229+
}
230+
231+
@Override
232+
public Savepoint setSavepoint() throws SQLException {
233+
return delegate.setSavepoint();
234+
}
235+
236+
@Override
237+
public Savepoint setSavepoint(String name) throws SQLException {
238+
return delegate.setSavepoint(name);
239+
}
240+
241+
@Override
242+
public void rollback(Savepoint savepoint) throws SQLException {
243+
delegate.rollback(savepoint);
244+
}
245+
246+
@Override
247+
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
248+
delegate.releaseSavepoint(savepoint);
249+
}
250+
251+
@Override
252+
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
253+
return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
254+
}
255+
256+
@Override
257+
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
258+
return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
259+
}
260+
261+
@Override
262+
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
263+
return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
264+
}
265+
266+
@Override
267+
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
268+
return delegate.prepareStatement(sql, autoGeneratedKeys);
269+
}
270+
271+
@Override
272+
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
273+
return delegate.prepareStatement(sql, columnIndexes);
274+
}
275+
276+
@Override
277+
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
278+
return delegate.prepareStatement(sql, columnNames);
279+
}
280+
281+
@Override
282+
public Clob createClob() throws SQLException {
283+
return delegate.createClob();
284+
}
285+
286+
@Override
287+
public Blob createBlob() throws SQLException {
288+
return delegate.createBlob();
289+
}
290+
291+
@Override
292+
public NClob createNClob() throws SQLException {
293+
return delegate.createNClob();
294+
}
295+
296+
@Override
297+
public SQLXML createSQLXML() throws SQLException {
298+
return delegate.createSQLXML();
299+
}
300+
301+
@Override
302+
public boolean isValid(int timeout) throws SQLException {
303+
return delegate.isValid(timeout);
304+
}
305+
306+
@Override
307+
public void setClientInfo(String name, String value) throws SQLClientInfoException {
308+
delegate.setClientInfo(name, value);
309+
}
310+
311+
@Override
312+
public void setClientInfo(Properties properties) throws SQLClientInfoException {
313+
delegate.setClientInfo(properties);
314+
}
315+
316+
@Override
317+
public String getClientInfo(String name) throws SQLException {
318+
return delegate.getClientInfo(name);
319+
}
320+
321+
@Override
322+
public Properties getClientInfo() throws SQLException {
323+
return delegate.getClientInfo();
324+
}
325+
326+
@Override
327+
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
328+
return delegate.createArrayOf(typeName, elements);
329+
}
330+
331+
@Override
332+
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
333+
return delegate.createStruct(typeName, attributes);
334+
}
335+
336+
@Override
337+
public void setSchema(String schema) throws SQLException {
338+
delegate.setSchema(schema);
339+
}
340+
341+
@Override
342+
public String getSchema() throws SQLException {
343+
return delegate.getSchema();
344+
}
345+
346+
@Override
347+
public void abort(Executor executor) throws SQLException {
348+
delegate.abort(executor);
349+
}
350+
351+
@Override
352+
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
353+
delegate.setNetworkTimeout(executor, milliseconds);
354+
}
355+
356+
@Override
357+
public int getNetworkTimeout() throws SQLException {
358+
return delegate.getNetworkTimeout();
359+
}
360+
361+
@Override
362+
public <T> T unwrap(Class<T> iface) throws SQLException {
363+
return delegate.unwrap(iface);
364+
}
365+
366+
@Override
367+
public boolean isWrapperFor(Class<?> iface) throws SQLException {
368+
return delegate.isWrapperFor(iface);
369+
}
370+
}
371+
}

0 commit comments

Comments
 (0)