Skip to content

Commit c40629e

Browse files
committed
Improve the implementation of prepared statement.
Motivation: The implementation of prepared statement tries to accomodate both a lazy mode and a non lazy one, leading to the utilization of a future to unify the obtention of the prepared statement. This is clearly suboptimal for the common case, since lazy prepared statement is only necessary when meeting indeterminate prepared statement (ambiguous) that requires a tuple for an actual preparation. In addition the execution is done using a context comparison which fails when on a duplicated context. Changes: Use the context in thread comparison instead of context equality. Make a prepared statement base which has two implementations lazy/direct which are both optimized for their specific cases.
1 parent 0aaff5f commit c40629e

File tree

5 files changed

+261
-277
lines changed

5 files changed

+261
-277
lines changed

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@
3636
public class CursorImpl implements Cursor {
3737

3838
private final Connection conn;
39-
private final PreparedStatementImpl ps;
39+
private final PreparedStatementBase ps;
4040
private final ContextInternal context;
4141
private final boolean autoCommit;
4242
private final TupleInternal params;
4343

4444
private String id;
4545
private boolean closed;
46-
private QueryResultBuilder<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> result;
46+
QueryResultBuilder<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> result;
4747

48-
CursorImpl(PreparedStatementImpl ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) {
48+
CursorImpl(PreparedStatementBase ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) {
4949
this.ps = ps;
5050
this.conn = conn;
5151
this.context = context;
@@ -64,22 +64,14 @@ public synchronized boolean hasMore() {
6464
@Override
6565
public synchronized Future<RowSet<Row>> read(int count) {
6666
PromiseInternal<RowSet<Row>> promise = context.promise();
67-
ps.withPreparedStatement(ps.options(), params, ar -> {
68-
if (ar.succeeded()) {
69-
PreparedStatement preparedStatement = ar.result();
70-
QueryExecutor<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR);
71-
if (id == null) {
72-
id = UUID.randomUUID().toString();
73-
this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, false, promise);
74-
} else if (this.result.isSuspended()) {
75-
this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, true, promise);
76-
} else {
77-
throw new IllegalStateException();
78-
}
79-
} else {
80-
promise.fail(ar.cause());
81-
}
82-
});
67+
boolean suspended;
68+
if (id == null) {
69+
id = UUID.randomUUID().toString();
70+
suspended = false;
71+
} else {
72+
suspended = true;
73+
}
74+
ps.readCursor(this, id, suspended, params, count, promise);
8375
return promise.future();
8476
}
8577

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright (C) 2017 Julien Viet
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.vertx.sqlclient.impl;
19+
20+
import io.vertx.core.internal.ContextInternal;
21+
import io.vertx.core.internal.PromiseInternal;
22+
import io.vertx.sqlclient.PrepareOptions;
23+
import io.vertx.sqlclient.PreparedQuery;
24+
import io.vertx.sqlclient.internal.ArrayTuple;
25+
import io.vertx.sqlclient.internal.Connection;
26+
import io.vertx.sqlclient.internal.command.CloseCursorCommand;
27+
import io.vertx.sqlclient.internal.command.CloseStatementCommand;
28+
import io.vertx.sqlclient.Cursor;
29+
import io.vertx.sqlclient.PreparedStatement;
30+
import io.vertx.sqlclient.SqlResult;
31+
import io.vertx.sqlclient.RowSet;
32+
import io.vertx.sqlclient.RowStream;
33+
import io.vertx.sqlclient.Row;
34+
import io.vertx.sqlclient.Tuple;
35+
import io.vertx.core.*;
36+
import io.vertx.sqlclient.internal.command.PrepareStatementCommand;
37+
import io.vertx.sqlclient.internal.TupleInternal;
38+
39+
import java.util.List;
40+
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.function.Function;
42+
import java.util.stream.Collector;
43+
44+
/**
45+
* @author <a href="mailto:[email protected]">Julien Viet</a>
46+
*/
47+
public abstract class PreparedStatementBase implements PreparedStatement {
48+
49+
public static PreparedStatement create(Connection conn,
50+
ContextInternal context,
51+
io.vertx.sqlclient.internal.PreparedStatement preparedStatement,
52+
boolean autoCommit) {
53+
return new PreparedStatementBase(conn, context, autoCommit) {
54+
@Override
55+
protected <R, F extends SqlResult<R>> void executeBatch(List<Tuple> argsList, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
56+
builder.executeBatchQuery(conn, null, preparedStatement, autoCommit, argsList, p);
57+
}
58+
@Override
59+
protected <R, F extends SqlResult<R>> void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
60+
builder.executeExtendedQuery(conn, preparedStatement, null, autoCommit, args, fetch, cursorId, suspended, p);
61+
}
62+
@Override
63+
protected void close(Promise<Void> promise) {
64+
conn.schedule(new CloseStatementCommand(preparedStatement), promise);
65+
}
66+
@Override
67+
protected void closeCursor(String cursorId, Promise<Void> promise) {
68+
CloseCursorCommand cmd = new CloseCursorCommand(cursorId, preparedStatement);
69+
conn.schedule(cmd, promise);
70+
}
71+
@Override
72+
protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal<RowSet<Row>> promise) {
73+
QueryExecutor<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR);
74+
cursor.result = builder.executeExtendedQuery(conn, preparedStatement, null, autoCommit, params, count, id, suspended, promise);
75+
}
76+
};
77+
}
78+
79+
public static PreparedStatement create(Connection conn, ContextInternal context, PrepareOptions options, String sql, boolean autoCommit) {
80+
return new PreparedStatementBase(conn, context, autoCommit) {
81+
Future<io.vertx.sqlclient.internal.PreparedStatement> future;
82+
void withPreparedStatement(PrepareOptions options, Tuple args, Handler<AsyncResult<io.vertx.sqlclient.internal.PreparedStatement>> handler) {
83+
if (context.inThread()) {
84+
if (future == null) {
85+
Promise<io.vertx.sqlclient.internal.PreparedStatement> promise = context.promise();
86+
PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types());
87+
conn.schedule(prepare, promise);
88+
future = promise.future();
89+
}
90+
future.onComplete(handler);
91+
} else {
92+
context.runOnContext(v -> withPreparedStatement(options, args, handler));
93+
}
94+
}
95+
@Override
96+
protected <R, F extends SqlResult<R>> void executeBatch(List<Tuple> argsList, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
97+
withPreparedStatement(options, argsList.get(0), ar -> {
98+
if (ar.succeeded()) {
99+
builder.executeBatchQuery(conn, options, ar.result(), autoCommit, argsList, p);
100+
} else {
101+
p.fail(ar.cause());
102+
}
103+
});
104+
}
105+
@Override
106+
protected <R, F extends SqlResult<R>> void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p) {
107+
withPreparedStatement(options, args, ar -> {
108+
if (ar.succeeded()) {
109+
builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, args, fetch, cursorId, suspended, p);
110+
} else {
111+
p.fail(ar.cause());
112+
}
113+
});
114+
}
115+
@Override
116+
protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal<RowSet<Row>> promise) {
117+
withPreparedStatement(options, params, ar -> {
118+
if (ar.succeeded()) {
119+
QueryExecutor<RowSet<Row>, RowSetImpl<Row>, RowSet<Row>> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR);
120+
cursor.result = builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, params, count, id, suspended, promise);
121+
} else {
122+
promise.fail(ar.cause());
123+
}
124+
});
125+
}
126+
@Override
127+
protected void close(Promise<Void> promise) {
128+
if (future != null) {
129+
future.onComplete(ar -> {
130+
if (ar.succeeded()) {
131+
CloseStatementCommand cmd = new CloseStatementCommand(ar.result());
132+
conn.schedule(cmd, promise);
133+
} else {
134+
promise.fail(ar.cause());
135+
}
136+
});
137+
}
138+
}
139+
@Override
140+
protected void closeCursor(String cursorId, Promise<Void> promise) {
141+
if (future != null) {
142+
future.onComplete(ar -> {
143+
if (ar.succeeded()) {
144+
CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result());
145+
conn.schedule(cmd, promise);
146+
} else {
147+
promise.fail(ar.cause());
148+
}
149+
});
150+
} else {
151+
promise.fail("Invalid");
152+
}
153+
}
154+
};
155+
}
156+
157+
private final Connection conn;
158+
private final ContextInternal context;
159+
private final boolean autoCommit;
160+
private final AtomicBoolean closed;
161+
162+
private PreparedStatementBase(Connection conn, ContextInternal context, boolean autoCommit) {
163+
this.conn = conn;
164+
this.context = context;
165+
this.autoCommit = autoCommit;
166+
this.closed = new AtomicBoolean();
167+
}
168+
169+
protected abstract <R, F extends SqlResult<R>> void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p);
170+
protected abstract <R, F extends SqlResult<R>> void executeBatch(List<Tuple> argsList, QueryExecutor<R, ?, F> builder, PromiseInternal<F> p);
171+
protected abstract void close(Promise<Void> promise);
172+
protected abstract void closeCursor(String cursorId, Promise<Void> promise);
173+
protected abstract void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal<RowSet<Row>> promise);
174+
175+
@Override
176+
public final PreparedQuery<RowSet<Row>> query() {
177+
return new PreparedStatementQuery<>(new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR));
178+
}
179+
180+
@Override
181+
public final Cursor cursor(Tuple args) {
182+
return new CursorImpl(this, conn, context, autoCommit, (TupleInternal) args);
183+
}
184+
185+
@Override
186+
public final Future<Void> close() {
187+
if (closed.compareAndSet(false, true)) {
188+
Promise<Void> promise = context.promise();
189+
close(promise);
190+
return promise.future();
191+
} else {
192+
return context.failedFuture("Already closed");
193+
}
194+
}
195+
196+
@Override
197+
public final RowStream<Row> createStream(int fetch, Tuple args) {
198+
return new RowStreamImpl(this, context, fetch, args);
199+
}
200+
201+
private class PreparedStatementQuery<T, R extends SqlResult<T>> extends QueryBase<T, R> implements PreparedQuery<R> {
202+
203+
public PreparedStatementQuery(QueryExecutor<T, ?, R> builder) {
204+
super(builder);
205+
}
206+
207+
@Override
208+
protected <T2, R2 extends SqlResult<T2>> QueryBase<T2, R2> copy(QueryExecutor<T2, ?, R2> builder) {
209+
return new PreparedStatementQuery<>(builder);
210+
}
211+
212+
@Override
213+
public <U> PreparedQuery<SqlResult<U>> collecting(Collector<Row, ?, U> collector) {
214+
return (PreparedQuery<SqlResult<U>>) super.collecting(collector);
215+
}
216+
217+
@Override
218+
public <U> PreparedQuery<RowSet<U>> mapping(Function<Row, U> mapper) {
219+
return (PreparedQuery<RowSet<U>>) super.mapping(mapper);
220+
}
221+
222+
@Override
223+
public Future<R> execute() {
224+
return execute(ArrayTuple.EMPTY);
225+
}
226+
227+
@Override
228+
public Future<R> execute(Tuple args) {
229+
PromiseInternal<R> promise = context.promise();
230+
PreparedStatementBase.this.execute(args, 0, null, false, builder, promise);
231+
return promise.future();
232+
}
233+
234+
@Override
235+
public Future<R> executeBatch(List<Tuple> argsList) {
236+
if (argsList.isEmpty()) {
237+
return context.failedFuture("Empty batch");
238+
} else {
239+
PromiseInternal<R> promise = context.promise();
240+
PreparedStatementBase.this.executeBatch(argsList, builder, promise);
241+
return promise.future();
242+
}
243+
}
244+
}
245+
}

0 commit comments

Comments
 (0)