Skip to content

Commit bb78664

Browse files
committed
Improve the implementation of prepared statement.
Motivation: The implementation of prepared statement tries to accomodate both a lazy mode and a non lazy one, leading to the utilization of a future to unify the obtention of the prepared statement. This is clearly suboptimal for the common case, since lazy prepared statement is only necessary when meeting indeterminate prepared statement (ambiguous) that requires a tuple for an actual preparation. In addition the execution is done using a context comparison which fails when on a duplicated context. Changes: Use the context in thread comparison instead of context equality. Make a prepared statement base which has two implementations lazy/direct which are both optimized for their specific cases.
1 parent 0aaff5f commit bb78664

File tree

5 files changed

+286
-277
lines changed

5 files changed

+286
-277
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@
3636
public class CursorImpl implements Cursor {
3737

3838
private final Connection conn;
39-
private final PreparedStatementImpl ps;
39+
private final PreparedStatementBase ps;
4040
private final ContextInternal context;
4141
private final boolean autoCommit;
4242
private final TupleInternal params;
4343

4444
private String id;
4545
private boolean closed;
46-
private QueryResultBuilder<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> result;
46+
QueryResultBuilder<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> result;
4747

48-
CursorImpl(PreparedStatementImpl ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) {
48+
CursorImpl(PreparedStatementBase ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) {
4949
this.ps = ps;
5050
this.conn = conn;
5151
this.context = context;
@@ -64,22 +64,14 @@ public synchronized boolean hasMore() {
6464
@Override
6565
public synchronized Future<RowSet<Row>> read(int count) {
6666
PromiseInternal<RowSet<Row>> promise = context.promise();
67-
ps.withPreparedStatement(ps.options(), params, ar -> {
68-
if (ar.succeeded()) {
69-
PreparedStatement preparedStatement = ar.result();
70-
QueryExecutor<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR);
71-
if (id == null) {
72-
id = UUID.randomUUID().toString();
73-
this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, false, promise);
74-
} else if (this.result.isSuspended()) {
75-
this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, true, promise);
76-
} else {
77-
throw new IllegalStateException();
78-
}
79-
} else {
80-
promise.fail(ar.cause());
81-
}
82-
});
67+
boolean suspended;
68+
if (id == null) {
69+
id = UUID.randomUUID().toString();
70+
suspended = false;
71+
} else {
72+
suspended = true;
73+
}
74+
ps.readCursor(this, id, suspended, params, count, promise);
8375
return promise.future();
8476
}
8577

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/*
2+
* Copyright (C) 2017 Julien Viet
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.vertx.sqlclient.impl;
19+
20+
import io.vertx.core.internal.ContextInternal;
21+
import io.vertx.core.internal.PromiseInternal;
22+
import io.vertx.sqlclient.PrepareOptions;
23+
import io.vertx.sqlclient.PreparedQuery;
24+
import io.vertx.sqlclient.internal.ArrayTuple;
25+
import io.vertx.sqlclient.internal.Connection;
26+
import io.vertx.sqlclient.internal.command.CloseCursorCommand;
27+
import io.vertx.sqlclient.internal.command.CloseStatementCommand;
28+
import io.vertx.sqlclient.Cursor;
29+
import io.vertx.sqlclient.PreparedStatement;
30+
import io.vertx.sqlclient.SqlResult;
31+
import io.vertx.sqlclient.RowSet;
32+
import io.vertx.sqlclient.RowStream;
33+
import io.vertx.sqlclient.Row;
34+
import io.vertx.sqlclient.Tuple;
35+
import io.vertx.core.*;
36+
import io.vertx.sqlclient.internal.command.PrepareStatementCommand;
37+
import io.vertx.sqlclient.internal.TupleInternal;
38+
39+
import java.util.List;
40+
import java.util.UUID;
41+
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.function.Function;
43+
import java.util.stream.Collector;
44+
45+
/**
46+
* @author <a href="mailto:[email protected]">Julien Viet</a>
47+
*/
48+
public abstract class PreparedStatementBase implements PreparedStatement {
49+
50+
public static PreparedStatement create(Connection conn, ContextInternal context, io.vertx.sqlclient.internal.PreparedStatement ps, boolean autoCommit) {
51+
return new PreparedStatementBase(conn, context, ps, autoCommit) {
52+
@Override
53+
protected <R, F extends SqlResult<R>> void executeBatch(List<Tuple> argsList, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
54+
builder.executeBatchQuery(conn, options, ps, autoCommit, argsList, p);
55+
}
56+
@Override
57+
protected <R, F extends SqlResult<R>> void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
58+
builder.executeExtendedQuery(conn, ps, options, autoCommit, args, fetch, cursorId, suspended, p);
59+
}
60+
@Override
61+
protected void doClose(Promise<Void> promise) {
62+
CloseStatementCommand cmd = new CloseStatementCommand(ps);
63+
conn.schedule(cmd, promise);
64+
}
65+
@Override
66+
protected void closeCursor(String cursorId, Promise<Void> promise) {
67+
CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ps);
68+
conn.schedule(cmd, promise);
69+
}
70+
@Override
71+
protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal<RowSet<Row>> promise) {
72+
QueryExecutor<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR);
73+
cursor.result = builder.executeExtendedQuery(conn, ps, options, autoCommit, params, count, id, suspended, promise);
74+
}
75+
};
76+
}
77+
78+
public static PreparedStatement create(Connection conn, ContextInternal context, PrepareOptions options, String sql, boolean autoCommit) {
79+
return new PreparedStatementBase(conn, context, sql, options, autoCommit) {
80+
Future<io.vertx.sqlclient.internal.PreparedStatement> future;
81+
void withPreparedStatement(PrepareOptions options, Tuple args, Handler<AsyncResult<io.vertx.sqlclient.internal.PreparedStatement>> handler) {
82+
if (context.inThread()) {
83+
if (future == null) {
84+
Promise<io.vertx.sqlclient.internal.PreparedStatement> promise = context.promise();
85+
PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types());
86+
conn.schedule(prepare, promise);
87+
future = promise.future();
88+
}
89+
future.onComplete(handler);
90+
} else {
91+
context.runOnContext(v -> withPreparedStatement(options, args, handler));
92+
}
93+
}
94+
@Override
95+
protected <R, F extends SqlResult<R>> void executeBatch(List<Tuple> argsList, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
96+
withPreparedStatement(options, argsList.get(0), ar -> {
97+
if (ar.succeeded()) {
98+
builder.executeBatchQuery(conn, options, ar.result(), autoCommit, argsList, p);
99+
} else {
100+
p.fail(ar.cause());
101+
}
102+
});
103+
}
104+
@Override
105+
protected <R, F extends SqlResult<R>> void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
106+
withPreparedStatement(options, args, ar -> {
107+
if (ar.succeeded()) {
108+
builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, args, fetch, cursorId, suspended, p);
109+
} else {
110+
p.fail(ar.cause());
111+
}
112+
});
113+
}
114+
@Override
115+
protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal<RowSet<Row>> promise) {
116+
withPreparedStatement(options, params, ar -> {
117+
if (ar.succeeded()) {
118+
QueryExecutor<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR);
119+
cursor.result = builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, params, count, id, suspended, promise);
120+
} else {
121+
promise.fail(ar.cause());
122+
}
123+
});
124+
}
125+
@Override
126+
protected void doClose(Promise<Void> promise) {
127+
if (future != null) {
128+
future.onComplete(ar -> {
129+
if (ar.succeeded()) {
130+
CloseStatementCommand cmd = new CloseStatementCommand(ar.result());
131+
conn.schedule(cmd, promise);
132+
} else {
133+
promise.fail(ar.cause());
134+
}
135+
});
136+
}
137+
}
138+
@Override
139+
protected void closeCursor(String cursorId, Promise<Void> promise) {
140+
if (future != null) {
141+
future.onComplete(ar -> {
142+
if (ar.succeeded()) {
143+
CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result());
144+
conn.schedule(cmd, promise);
145+
} else {
146+
promise.fail(ar.cause());
147+
}
148+
});
149+
} else {
150+
promise.fail("Invalid");
151+
}
152+
}
153+
};
154+
}
155+
156+
private final Connection conn;
157+
private final ContextInternal context;
158+
protected final PrepareOptions options;
159+
private final boolean autoCommit;
160+
private final AtomicBoolean closed = new AtomicBoolean();
161+
162+
private PreparedStatementBase(Connection conn, ContextInternal context, io.vertx.sqlclient.internal.PreparedStatement ps, boolean autoCommit) {
163+
this.conn = conn;
164+
this.context = context;
165+
this.options = null;
166+
this.autoCommit = autoCommit;
167+
}
168+
169+
private PreparedStatementBase(Connection conn,
170+
ContextInternal context,
171+
String sql,
172+
PrepareOptions options,
173+
boolean autoCommit) {
174+
this.conn = conn;
175+
this.context = context;
176+
this.options = options;
177+
this.autoCommit = autoCommit;
178+
}
179+
180+
@Override
181+
public PreparedQuery<RowSet<Row>> query() {
182+
QueryExecutor<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR);
183+
return new PreparedStatementQuery<>(builder);
184+
}
185+
186+
187+
protected abstract <R, F extends SqlResult<R>> void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p);
188+
protected abstract <R, F extends SqlResult<R>> void executeBatch(List<Tuple> argsList, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p);
189+
protected abstract void doClose(Promise<Void> promise);
190+
protected abstract void closeCursor(String cursorId, Promise<Void> promise);
191+
protected abstract void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal<RowSet<Row>> promise);
192+
193+
@Override
194+
public Cursor cursor(Tuple args) {
195+
return cursor((TupleInternal) args);
196+
}
197+
198+
private Cursor cursor(TupleInternal args) {
199+
return new CursorImpl(this, conn, context, autoCommit, args);
200+
}
201+
202+
@Override
203+
public Future<Void> close() {
204+
if (closed.compareAndSet(false, true)) {
205+
Promise<Void> promise = context.promise();
206+
doClose(promise);
207+
return promise.future();
208+
} else {
209+
return context.failedFuture("Already closed");
210+
}
211+
}
212+
213+
@Override
214+
public RowStream<Row> createStream(int fetch, Tuple args) {
215+
return new RowStreamImpl(this, context, fetch, args);
216+
}
217+
218+
private class PreparedStatementQuery<T, R extends SqlResult<T>> extends QueryBase<T, R> implements PreparedQuery<R> {
219+
220+
public PreparedStatementQuery(QueryExecutor<T, ?, R> builder) {
221+
super(builder);
222+
}
223+
224+
@Override
225+
protected <T2, R2 extends SqlResult<T2>> QueryBase<T2, R2> copy(QueryExecutor<T2, ?, R2> builder) {
226+
return new PreparedStatementQuery<>(builder);
227+
}
228+
229+
@Override
230+
public <U> PreparedQuery<SqlResult<U>> collecting(Collector<Row, ?, U> collector) {
231+
return (PreparedQuery<SqlResult<U>>) super.collecting(collector);
232+
}
233+
234+
@Override
235+
public <U> PreparedQuery<RowSet<U>> mapping(Function<Row, U> mapper) {
236+
return (PreparedQuery<RowSet<U>>) super.mapping(mapper);
237+
}
238+
239+
@Override
240+
public Future<R> execute() {
241+
return execute(ArrayTuple.EMPTY);
242+
}
243+
244+
@Override
245+
public Future<R> execute(Tuple args) {
246+
PromiseInternal<R> promise = context.promise();
247+
execute(args, promise);
248+
return promise.future();
249+
}
250+
251+
private void execute(Tuple args, PromiseInternal<R> promise) {
252+
PreparedStatementBase.this.execute(args, 0, null, false, builder, promise);
253+
}
254+
255+
@Override
256+
public Future<R> executeBatch(List<Tuple> argsList) {
257+
PromiseInternal<R> promise = context.promise();
258+
executeBatch(argsList, promise);
259+
return promise.future();
260+
}
261+
262+
private void executeBatch(List<Tuple> argsList, PromiseInternal<R> promise) {
263+
if (argsList.isEmpty()) {
264+
promise.fail("Empty batch");
265+
} else {
266+
PreparedStatementBase.this.executeBatch(argsList, builder, promise);
267+
}
268+
}
269+
}
270+
}

0 commit comments

Comments
 (0)