Skip to content

Commit 6d3dc93

Browse files
authored
Merge pull request #50 from kazabubu/master
Added query timeout support
2 parents 643e641 + de677ed commit 6d3dc93

File tree

8 files changed

+52
-30
lines changed

8 files changed

+52
-30
lines changed

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Select.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,27 @@ private Select() {
2525
private static final Logger log = LoggerFactory.getLogger(Select.class);
2626

2727
static <T> Flowable<T> create(Single<Connection> connections,
28-
Flowable<List<Object>> parameterGroups, String sql, int fetchSize,
29-
Function<? super ResultSet, ? extends T> mapper, boolean eagerDispose) {
28+
Flowable<List<Object>> parameterGroups, String sql, int fetchSize,
29+
Function<? super ResultSet, ? extends T> mapper, boolean eagerDispose, int queryTimeoutSec) {
3030
return connections //
3131
.toFlowable() //
32-
.flatMap(con -> create(con, sql, parameterGroups, fetchSize, mapper, eagerDispose));
32+
.flatMap(con -> create(con, sql, parameterGroups, fetchSize, mapper, eagerDispose, queryTimeoutSec));
3333
}
3434

3535
static <T> Flowable<T> create(Connection con, String sql,
36-
Flowable<List<Object>> parameterGroups, int fetchSize,
37-
Function<? super ResultSet, T> mapper, boolean eagerDispose) {
36+
Flowable<List<Object>> parameterGroups, int fetchSize,
37+
Function<? super ResultSet, T> mapper, boolean eagerDispose, int queryTimeoutSec) {
3838
log.debug("Select.create called with con={}", con);
39-
Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql);
39+
Callable<NamedPreparedStatement> initialState = () -> Util.prepare(con, fetchSize, sql, queryTimeoutSec);
4040
Function<NamedPreparedStatement, Flowable<T>> observableFactory = ps -> parameterGroups
41-
.flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize),
41+
.flatMap(parameters -> create(ps.ps, parameters, mapper, ps.names, sql, fetchSize, queryTimeoutSec),
4242
true, 1);
4343
Consumer<NamedPreparedStatement> disposer = Util::closePreparedStatementAndConnection;
4444
return Flowable.using(initialState, observableFactory, disposer, eagerDispose);
4545
}
4646

4747
private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Object> parameters,
48-
Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize) {
48+
Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize, int queryTimeoutSec) {
4949
log.debug("parameters={}", parameters);
5050
log.debug("names={}", names);
5151

@@ -56,7 +56,7 @@ private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Objec
5656
if (hasCollection) {
5757
// create a new prepared statement with the collection ? substituted with
5858
// ?s to match the size of the collection parameter
59-
ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params);
59+
ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params, queryTimeoutSec);
6060
// now wrap the rs to auto close ps2 because it is single use (the next
6161
// collection parameter may have a different ordinality so we need to build
6262
// a new PreparedStatement with a different number of question marks

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/SelectBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public final class SelectBuilder extends ParametersBuilder<SelectBuilder>
1818
private final Database db;
1919

2020
int fetchSize = 0; // default
21+
int queryTimeoutSec = Util.QUERY_TIMEOUT_NOT_SET; //default
2122
private Flowable<?> dependsOn;
2223

2324
SelectBuilder(String sql, Single<Connection> connection, Database db) {
@@ -43,6 +44,12 @@ public SelectBuilder fetchSize(int size) {
4344
return this;
4445
}
4546

47+
public SelectBuilder queryTimeoutSec(int timeoutSec) {
48+
Preconditions.checkArgument(timeoutSec >= 0);
49+
this.queryTimeoutSec = timeoutSec;
50+
return this;
51+
}
52+
4653
public TransactedSelectBuilder transacted() {
4754
return new TransactedSelectBuilder(this, db);
4855
}
@@ -55,7 +62,7 @@ public TransactedSelectBuilder transactedValuesOnly() {
5562
public <T> Flowable<T> get(@Nonnull ResultSetMapper<? extends T> mapper) {
5663
Preconditions.checkNotNull(mapper, "mapper cannot be null");
5764
Flowable<List<Object>> pg = super.parameterGroupsToFlowable();
58-
Flowable<T> f = Select.<T>create(connection, pg, sql, fetchSize, mapper, true);
65+
Flowable<T> f = Select.<T>create(connection, pg, sql, fetchSize, mapper, true, queryTimeoutSec);
5966
if (dependsOn != null) {
6067
return dependsOn.ignoreElements().andThen(f);
6168
} else {

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectAutomappedBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ private static <T> Flowable<Tx<T>> createFlowable(SelectAutomappedBuilder<T> sb,
9696
sb.selectBuilder.sql, //
9797
sb.selectBuilder.fetchSize, //
9898
Util.autoMap(sb.cls), //
99-
false) //
99+
false, //
100+
sb.selectBuilder.queryTimeoutSec) //
100101
.materialize() //
101102
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
102103
.doOnNext(tx -> {
@@ -107,4 +108,4 @@ private static <T> Flowable<Tx<T>> createFlowable(SelectAutomappedBuilder<T> sb,
107108
});
108109
}
109110

110-
}
111+
}

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedSelectBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ private static <T> Flowable<Tx<T>> createFlowable(SelectBuilder sb,
105105
sb.sql, //
106106
sb.fetchSize, //
107107
mapper, //
108-
false) //
108+
false, //
109+
sb.queryTimeoutSec) //
109110
.materialize() //
110111
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
111112
.doOnNext(tx -> {

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/TransactedUpdateBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ private static Flowable<Tx<Integer>> createFlowable(UpdateBuilder ub, Database d
137137
ub.parameterGroupsToFlowable(), //
138138
ub.sql, //
139139
ub.batchSize, //
140-
false) //
140+
false, //
141+
ub.queryTimeoutSec) //
141142
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
142143
.doOnNext(tx -> {
143144
t[0] = ((TxImpl<Integer>) tx);

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Update.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,18 @@ private Update() {
2828
}
2929

3030
static Flowable<Notification<Integer>> create(Single<Connection> connection,
31-
Flowable<List<Object>> parameterGroups, String sql, int batchSize,
32-
boolean eagerDispose) {
31+
Flowable<List<Object>> parameterGroups, String sql, int batchSize,
32+
boolean eagerDispose, int queryTimeoutSec) {
3333
return connection //
3434
.toFlowable() //
35-
.flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose), true,
35+
.flatMap(con -> create(con, sql, parameterGroups, batchSize, eagerDispose, queryTimeoutSec), true,
3636
1);
3737
}
3838

3939
private static Flowable<Notification<Integer>> create(Connection con, String sql,
40-
Flowable<List<Object>> parameterGroups, int batchSize, boolean eagerDispose) {
40+
Flowable<List<Object>> parameterGroups, int batchSize, boolean eagerDispose, int queryTimeoutSec) {
4141
log.debug("Update.create {}", sql);
42-
Callable<NamedPreparedStatement> resourceFactory = () -> Util.prepare(con, sql);
42+
Callable<NamedPreparedStatement> resourceFactory = () -> Util.prepare(con, sql, queryTimeoutSec);
4343
final Function<NamedPreparedStatement, Flowable<Notification<Integer>>> flowableFactory;
4444
if (batchSize == 0) {
4545
flowableFactory = ps -> parameterGroups //
@@ -101,7 +101,7 @@ private static Single<Integer> create(NamedPreparedStatement ps, List<Parameter>
101101
if (hasCollection) {
102102
// create a new prepared statement with the collection ? substituted with
103103
// ?s to match the size of the collection parameter
104-
ps2 = Util.prepare(ps.ps.getConnection(), 0, sql, params);
104+
ps2 = Util.prepare(ps.ps.getConnection(), 0, sql, params, ps.ps.getQueryTimeout());
105105
} else {
106106
ps2 = ps.ps;
107107
}

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/UpdateBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public final class UpdateBuilder extends ParametersBuilder<UpdateBuilder> implem
2020
private final Database db;
2121
Flowable<?> dependsOn;
2222
int batchSize = DEFAULT_BATCH_SIZE;
23+
int queryTimeoutSec = Util.QUERY_TIMEOUT_NOT_SET;
2324

2425
UpdateBuilder(String sql, Single<Connection> connections, Database db) {
2526
super(sql);
@@ -40,6 +41,12 @@ public UpdateBuilder batchSize(int batchSize) {
4041
return this;
4142
}
4243

44+
public UpdateBuilder queryTimeoutSec(int queryTimeoutSec) {
45+
Preconditions.checkArgument(queryTimeoutSec >= 0);
46+
this.queryTimeoutSec = queryTimeoutSec;
47+
return this;
48+
}
49+
4350
/**
4451
* Returns a builder used to specify how to process the generated keys
4552
* {@link ResultSet}. Not all jdbc drivers support this functionality and
@@ -56,7 +63,7 @@ public ReturnGeneratedKeysBuilder returnGeneratedKeys() {
5663

5764
public Flowable<Integer> counts() {
5865
return startWithDependency(
59-
Update.create(connections, super.parameterGroupsToFlowable(), sql, batchSize, true).dematerialize());
66+
Update.create(connections, super.parameterGroupsToFlowable(), sql, batchSize, true, queryTimeoutSec).dematerialize());
6067
}
6168

6269
<T> Flowable<T> startWithDependency(@Nonnull Flowable<T> f) {

rxjava2-jdbc/src/main/java/org/davidmoten/rx/jdbc/Util.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public enum Util {
6464
;
6565

6666
private static final Logger log = LoggerFactory.getLogger(Util.class);
67+
public static final int QUERY_TIMEOUT_NOT_SET = -1;
6768

6869
/**
6970
* Sets parameters for the {@link PreparedStatement}.
@@ -287,39 +288,43 @@ static void closeCallableStatementAndConnection(NamedCallableStatement stmt) {
287288
closePreparedStatementAndConnection(stmt.stmt);
288289
}
289290

290-
static NamedPreparedStatement prepare(Connection con, String sql) throws SQLException {
291-
return prepare(con, 0, sql);
291+
static NamedPreparedStatement prepare(Connection con, String sql, int queryTimeoutSec) throws SQLException {
292+
return prepare(con, 0, sql, queryTimeoutSec);
292293
}
293294

294-
static NamedPreparedStatement prepare(Connection con, int fetchSize, String sql) throws SQLException {
295+
static NamedPreparedStatement prepare(Connection con, int fetchSize, String sql, int queryTimeoutSec) throws SQLException {
295296
// TODO can we parse SqlInfo through because already calculated by
296297
// builder?
297298
SqlInfo info = SqlInfo.parse(sql);
298299
log.debug("preparing statement: {}", sql);
299-
return prepare(con, fetchSize, info);
300+
return prepare(con, fetchSize, info, queryTimeoutSec);
300301
}
301302

302-
static PreparedStatement prepare(Connection connection, int fetchSize, String sql, List<Parameter> parameters)
303+
static PreparedStatement prepare(Connection connection, int fetchSize, String sql, List<Parameter> parameters, int queryTimeoutSec)
303304
throws SQLException {
304305
// should only get here when parameters contains a collection
305306
SqlInfo info = SqlInfo.parse(sql, parameters);
306307
log.debug("preparing statement: {}", info.sql());
307-
return createPreparedStatement(connection, fetchSize, info);
308+
return createPreparedStatement(connection, fetchSize, info, queryTimeoutSec);
308309
}
309310

310-
private static NamedPreparedStatement prepare(Connection con, int fetchSize, SqlInfo info) throws SQLException {
311-
PreparedStatement ps = createPreparedStatement(con, fetchSize, info);
311+
private static NamedPreparedStatement prepare(Connection con, int fetchSize, SqlInfo info, int queryTimeoutSec) throws SQLException {
312+
PreparedStatement ps = createPreparedStatement(con, fetchSize, info, queryTimeoutSec);
312313
return new NamedPreparedStatement(ps, info.names());
313314
}
314315

315-
private static PreparedStatement createPreparedStatement(Connection con, int fetchSize, SqlInfo info)
316+
private static PreparedStatement createPreparedStatement(Connection con, int fetchSize, SqlInfo info, int queryTimeoutSec)
316317
throws SQLException {
317318
PreparedStatement ps = null;
318319
try {
319320
ps = con.prepareStatement(info.sql(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
320321
if (fetchSize > 0) {
321322
ps.setFetchSize(fetchSize);
322323
}
324+
325+
if (queryTimeoutSec != QUERY_TIMEOUT_NOT_SET) {
326+
ps.setQueryTimeout(queryTimeoutSec);
327+
}
323328
} catch (RuntimeException | SQLException e) {
324329
if (ps != null) {
325330
ps.close();

0 commit comments

Comments
 (0)