diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java index 1c02524a9..c525e6dba 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java @@ -36,16 +36,16 @@ public class CursorImpl implements Cursor { private final Connection conn; - private final PreparedStatementImpl ps; + private final PreparedStatementBase ps; private final ContextInternal context; private final boolean autoCommit; private final TupleInternal params; private String id; private boolean closed; - private QueryResultBuilder, RowSetImpl, RowSet> result; + QueryResultBuilder, RowSetImpl, RowSet> result; - CursorImpl(PreparedStatementImpl ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) { + CursorImpl(PreparedStatementBase ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) { this.ps = ps; this.conn = conn; this.context = context; @@ -64,22 +64,14 @@ public synchronized boolean hasMore() { @Override public synchronized Future> read(int count) { PromiseInternal> promise = context.promise(); - ps.withPreparedStatement(ps.options(), params, ar -> { - if (ar.succeeded()) { - PreparedStatement preparedStatement = ar.result(); - QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); - if (id == null) { - id = UUID.randomUUID().toString(); - this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, false, promise); - } else if (this.result.isSuspended()) { - this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, true, promise); - } else { - throw new IllegalStateException(); - } - } else { - promise.fail(ar.cause()); - } - }); + boolean suspended; + if (id == null) { + id = UUID.randomUUID().toString(); + suspended = false; + } else { + suspended = true; + } + ps.readCursor(this, id, suspended, params, count, promise); return promise.future(); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementBase.java new file mode 100644 index 000000000..0686a2b6e --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementBase.java @@ -0,0 +1,245 @@ +/* + * Copyright (C) 2017 Julien Viet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.vertx.sqlclient.impl; + +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.PromiseInternal; +import io.vertx.sqlclient.PrepareOptions; +import io.vertx.sqlclient.PreparedQuery; +import io.vertx.sqlclient.internal.ArrayTuple; +import io.vertx.sqlclient.internal.Connection; +import io.vertx.sqlclient.internal.command.CloseCursorCommand; +import io.vertx.sqlclient.internal.command.CloseStatementCommand; +import io.vertx.sqlclient.Cursor; +import io.vertx.sqlclient.PreparedStatement; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.RowStream; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.Tuple; +import io.vertx.core.*; +import io.vertx.sqlclient.internal.command.PrepareStatementCommand; +import io.vertx.sqlclient.internal.TupleInternal; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collector; + +/** + * @author Julien Viet + */ +public abstract class PreparedStatementBase implements PreparedStatement { + + public static PreparedStatement create(Connection conn, + ContextInternal context, + io.vertx.sqlclient.internal.PreparedStatement preparedStatement, + boolean autoCommit) { + return new PreparedStatementBase(conn, context, autoCommit) { + @Override + protected > void executeBatch(List argsList, QueryExecutor builder, PromiseInternal p) { + builder.executeBatchQuery(conn, null, preparedStatement, autoCommit, argsList, p); + } + @Override + protected > void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor builder, PromiseInternal p) { + builder.executeExtendedQuery(conn, preparedStatement, null, autoCommit, args, fetch, cursorId, suspended, p); + } + @Override + protected void close(Promise promise) { + conn.schedule(new CloseStatementCommand(preparedStatement), promise); + } + @Override + protected void closeCursor(String cursorId, Promise promise) { + CloseCursorCommand cmd = new CloseCursorCommand(cursorId, preparedStatement); + conn.schedule(cmd, promise); + } + @Override + protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal> promise) { + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); + cursor.result = builder.executeExtendedQuery(conn, preparedStatement, null, autoCommit, params, count, id, suspended, promise); + } + }; + } + + public static PreparedStatement create(Connection conn, ContextInternal context, PrepareOptions options, String sql, boolean autoCommit) { + return new PreparedStatementBase(conn, context, autoCommit) { + Future future; + void withPreparedStatement(PrepareOptions options, Tuple args, Handler> handler) { + if (context.inThread()) { + if (future == null) { + Promise promise = context.promise(); + PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types()); + conn.schedule(prepare, promise); + future = promise.future(); + } + future.onComplete(handler); + } else { + context.runOnContext(v -> withPreparedStatement(options, args, handler)); + } + } + @Override + protected > void executeBatch(List argsList, QueryExecutor builder, PromiseInternal p) { + withPreparedStatement(options, argsList.get(0), ar -> { + if (ar.succeeded()) { + builder.executeBatchQuery(conn, options, ar.result(), autoCommit, argsList, p); + } else { + p.fail(ar.cause()); + } + }); + } + @Override + protected > void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor builder, PromiseInternal p) { + withPreparedStatement(options, args, ar -> { + if (ar.succeeded()) { + builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, args, fetch, cursorId, suspended, p); + } else { + p.fail(ar.cause()); + } + }); + } + @Override + protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal> promise) { + withPreparedStatement(options, params, ar -> { + if (ar.succeeded()) { + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); + cursor.result = builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, params, count, id, suspended, promise); + } else { + promise.fail(ar.cause()); + } + }); + } + @Override + protected void close(Promise promise) { + if (future != null) { + future.onComplete(ar -> { + if (ar.succeeded()) { + CloseStatementCommand cmd = new CloseStatementCommand(ar.result()); + conn.schedule(cmd, promise); + } else { + promise.fail(ar.cause()); + } + }); + } + } + @Override + protected void closeCursor(String cursorId, Promise promise) { + if (future != null) { + future.onComplete(ar -> { + if (ar.succeeded()) { + CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result()); + conn.schedule(cmd, promise); + } else { + promise.fail(ar.cause()); + } + }); + } else { + promise.fail("Invalid"); + } + } + }; + } + + private final Connection conn; + private final ContextInternal context; + private final boolean autoCommit; + private final AtomicBoolean closed; + + private PreparedStatementBase(Connection conn, ContextInternal context, boolean autoCommit) { + this.conn = conn; + this.context = context; + this.autoCommit = autoCommit; + this.closed = new AtomicBoolean(); + } + + protected abstract > void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor builder, PromiseInternal p); + protected abstract > void executeBatch(List argsList, QueryExecutor builder, PromiseInternal p); + protected abstract void close(Promise promise); + protected abstract void closeCursor(String cursorId, Promise promise); + protected abstract void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal> promise); + + @Override + public final PreparedQuery> query() { + return new PreparedStatementQuery<>(new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR)); + } + + @Override + public final Cursor cursor(Tuple args) { + return new CursorImpl(this, conn, context, autoCommit, (TupleInternal) args); + } + + @Override + public final Future close() { + if (closed.compareAndSet(false, true)) { + Promise promise = context.promise(); + close(promise); + return promise.future(); + } else { + return context.failedFuture("Already closed"); + } + } + + @Override + public final RowStream createStream(int fetch, Tuple args) { + return new RowStreamImpl(this, context, fetch, args); + } + + private class PreparedStatementQuery> extends QueryBase implements PreparedQuery { + + public PreparedStatementQuery(QueryExecutor builder) { + super(builder); + } + + @Override + protected > QueryBase copy(QueryExecutor builder) { + return new PreparedStatementQuery<>(builder); + } + + @Override + public PreparedQuery> collecting(Collector collector) { + return (PreparedQuery>) super.collecting(collector); + } + + @Override + public PreparedQuery> mapping(Function mapper) { + return (PreparedQuery>) super.mapping(mapper); + } + + @Override + public Future execute() { + return execute(ArrayTuple.EMPTY); + } + + @Override + public Future execute(Tuple args) { + PromiseInternal promise = context.promise(); + PreparedStatementBase.this.execute(args, 0, null, false, builder, promise); + return promise.future(); + } + + @Override + public Future executeBatch(List argsList) { + if (argsList.isEmpty()) { + context.failedFuture("Empty batch"); + } else { + PromiseInternal promise = context.promise(); + PreparedStatementBase.this.executeBatch(argsList, builder, promise); + return promise.future(); + } + } + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java deleted file mode 100644 index e9e2223b0..000000000 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright (C) 2017 Julien Viet - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.vertx.sqlclient.impl; - -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.PromiseInternal; -import io.vertx.sqlclient.PrepareOptions; -import io.vertx.sqlclient.PreparedQuery; -import io.vertx.sqlclient.internal.ArrayTuple; -import io.vertx.sqlclient.internal.Connection; -import io.vertx.sqlclient.internal.command.CloseCursorCommand; -import io.vertx.sqlclient.internal.command.CloseStatementCommand; -import io.vertx.sqlclient.Cursor; -import io.vertx.sqlclient.PreparedStatement; -import io.vertx.sqlclient.SqlResult; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.RowStream; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.Tuple; -import io.vertx.core.*; -import io.vertx.sqlclient.internal.command.PrepareStatementCommand; -import io.vertx.sqlclient.internal.TupleInternal; - -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.stream.Collector; - -/** - * @author Julien Viet - */ -public class PreparedStatementImpl implements PreparedStatement { - - public static PreparedStatement create(Connection conn, ContextInternal context, io.vertx.sqlclient.internal.PreparedStatement ps, boolean autoCommit) { - return new PreparedStatementImpl(conn, context, ps, autoCommit); - } - - public static PreparedStatement create(Connection conn, ContextInternal context, PrepareOptions options, String sql, boolean autoCommit) { - return new PreparedStatementImpl(conn, context, sql, options, autoCommit); - } - - private final Connection conn; - private final ContextInternal context; - private final String sql; - private final PrepareOptions options; - private Promise promise; - private Future future; - private final boolean autoCommit; - private final AtomicBoolean closed = new AtomicBoolean(); - - private PreparedStatementImpl(Connection conn, ContextInternal context, io.vertx.sqlclient.internal.PreparedStatement ps, boolean autoCommit) { - this.conn = conn; - this.context = context; - this.sql = null; - this.options = null; - this.promise = null; - this.future = Future.succeededFuture(ps); - this.autoCommit = autoCommit; - } - - private PreparedStatementImpl(Connection conn, - ContextInternal context, - String sql, - PrepareOptions options, - boolean autoCommit) { - this.conn = conn; - this.context = context; - this.sql = sql; - this.options = options; - this.promise = Promise.promise(); - this.autoCommit = autoCommit; - } - - PrepareOptions options() { - return options; - } - - @Override - public PreparedQuery> query() { - QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); - return new PreparedStatementQuery<>(builder); - } - - void withPreparedStatement(PrepareOptions options, Tuple args, Handler> handler) { - if (context == Vertx.currentContext()) { - if (future == null) { - // Lazy statement; - PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types()); - conn.schedule(prepare, promise); - future = promise.future(); - } - future.onComplete(handler); - } else { - context.runOnContext(v -> withPreparedStatement(options, args, handler)); - } - } - - > void execute(Tuple args, - int fetch, - String cursorId, - boolean suspended, - QueryExecutor builder, - PromiseInternal p) { - withPreparedStatement(options, args, ar -> { - if (ar.succeeded()) { - builder.executeExtendedQuery( - conn, - ar.result(), - options, - autoCommit, - args, - fetch, - cursorId, - suspended, - p); - } else { - p.fail(ar.cause()); - } - }); - } - - > void executeBatch(List argsList, - QueryExecutor builder, - PromiseInternal p) { - withPreparedStatement(options, argsList.get(0), ar -> { - if (ar.succeeded()) { - builder.executeBatchQuery(conn, options, ar.result(), autoCommit, argsList, p); - } else { - p.fail(ar.cause()); - } - }); - } - - @Override - public Cursor cursor(Tuple args) { - return cursor((TupleInternal) args); - } - - private Cursor cursor(TupleInternal args) { - return new CursorImpl(this, conn, context, autoCommit, args); - } - - @Override - public Future close() { - if (closed.compareAndSet(false, true)) { - Promise promise = context.promise(); - if (this.promise == null) { - CloseStatementCommand cmd = new CloseStatementCommand(future.result()); - conn.schedule(cmd, promise); - } else { - if (future == null) { - future = this.promise.future(); - this.promise.fail("Closed"); - } - future.onComplete(ar -> { - if (ar.succeeded()) { - CloseStatementCommand cmd = new CloseStatementCommand(ar.result()); - conn.schedule(cmd, promise); - } else { - promise.complete(); - } - }); - } - return promise.future(); - } else { - return context.failedFuture("Already closed"); - } - } - - @Override - public RowStream createStream(int fetch, Tuple args) { - return new RowStreamImpl(this, context, fetch, args); - } - - void closeCursor(String cursorId, Promise promise) { - future.onComplete(ar -> { - if (ar.succeeded()) { - CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result()); - conn.schedule(cmd, promise); - } else { - promise.fail(ar.cause()); - } - }); - } - - private class PreparedStatementQuery> extends QueryBase implements PreparedQuery { - - public PreparedStatementQuery(QueryExecutor builder) { - super(builder); - } - - @Override - protected > QueryBase copy(QueryExecutor builder) { - return new PreparedStatementQuery<>(builder); - } - - @Override - public PreparedQuery> collecting(Collector collector) { - return (PreparedQuery>) super.collecting(collector); - } - - @Override - public PreparedQuery> mapping(Function mapper) { - return (PreparedQuery>) super.mapping(mapper); - } - - @Override - public Future execute() { - return execute(ArrayTuple.EMPTY); - } - - @Override - public Future execute(Tuple args) { - PromiseInternal promise = context.promise(); - execute(args, promise); - return promise.future(); - } - - private void execute(Tuple args, PromiseInternal promise) { - PreparedStatementImpl.this.execute(args, 0, null, false, builder, promise); - } - - @Override - public Future executeBatch(List argsList) { - PromiseInternal promise = context.promise(); - executeBatch(argsList, promise); - return promise.future(); - } - - private void executeBatch(List argsList, PromiseInternal promise) { - if (argsList.isEmpty()) { - promise.fail("Empty batch"); - } else { - PreparedStatementImpl.this.executeBatch(argsList, builder, promise); - } - } - } -} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java index 301999110..1333a186a 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java @@ -32,7 +32,7 @@ public class RowStreamImpl implements RowStreamInternal, Handler>> { - private final PreparedStatementImpl ps; + private final PreparedStatementBase ps; private final ContextInternal context; private final int fetch; private final Tuple params; @@ -46,7 +46,7 @@ public class RowStreamImpl implements RowStreamInternal, Handler result; - RowStreamImpl(PreparedStatementImpl ps, ContextInternal context, int fetch, Tuple params) { + RowStreamImpl(PreparedStatementBase ps, ContextInternal context, int fetch, Tuple params) { this.ps = ps; this.context = context; this.fetch = fetch; diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java index 569b5231e..f8aca2b28 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java @@ -26,7 +26,7 @@ import io.vertx.sqlclient.PreparedStatement; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.Transaction; -import io.vertx.sqlclient.impl.PreparedStatementImpl; +import io.vertx.sqlclient.impl.PreparedStatementBase; import io.vertx.sqlclient.impl.TransactionImpl; import io.vertx.sqlclient.internal.command.CommandBase; import io.vertx.sqlclient.internal.command.PrepareStatementCommand; @@ -78,10 +78,10 @@ public Future prepare(String sql, PrepareOptions options) { schedule(new PrepareStatementCommand(sql, options, true), promise); return promise.future() .compose( - cr -> Future.succeededFuture(PreparedStatementImpl.create(conn, context, cr, autoCommit())), + cr -> Future.succeededFuture(PreparedStatementBase.create(conn, context, cr, autoCommit())), err -> { if (conn.isIndeterminatePreparedStatementError(err)) { - return Future.succeededFuture(PreparedStatementImpl.create(conn, context, options, sql, autoCommit())); + return Future.succeededFuture(PreparedStatementBase.create(conn, context, options, sql, autoCommit())); } else { return Future.failedFuture(err); }