diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java index f845da108..ff16d9a26 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java @@ -167,10 +167,8 @@ public Future beforeRecycle() { } @Override - public Future schedule(ContextInternal context, CommandBase cmd) { - Promise promise = context.promise(); - this.context.emit(v -> doSchedule(cmd, promise)); - return promise.future(); + public void schedule(CommandBase cmd, Completable handler) { + this.context.emit(v -> doSchedule(cmd, handler)); } private void doSchedule(CommandBase cmd, Completable handler) { diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java index ce8b7d234..880bd6553 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java @@ -82,7 +82,9 @@ public void init() { // TODO RETURN FUTURE ??? Future sendStartupMessage(String username, String password, String database, Map properties) { InitCommand cmd = new InitCommand(this, username, password, database, properties); - return schedule(context, cmd); + Promise promise = context.promise(); + schedule(cmd, promise); + return promise.future(); } Future sendCancelRequestMessage(int processId, int secretKey) { diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java index 582fd1c8a..e9e2223b0 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java @@ -101,7 +101,7 @@ void withPreparedStatement(PrepareOptions options, Tuple args, Handler close() { Promise promise = context.promise(); if (this.promise == null) { CloseStatementCommand cmd = new CloseStatementCommand(future.result()); - conn.schedule(context, cmd).onComplete(promise); + conn.schedule(cmd, promise); } else { if (future == null) { future = this.promise.future(); @@ -170,7 +170,7 @@ public Future close() { future.onComplete(ar -> { if (ar.succeeded()) { CloseStatementCommand cmd = new CloseStatementCommand(ar.result()); - conn.schedule(context, cmd).onComplete(promise); + conn.schedule(cmd, promise); } else { promise.complete(); } @@ -191,7 +191,7 @@ void closeCursor(String cursorId, Promise promise) { future.onComplete(ar -> { if (ar.succeeded()) { CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result()); - conn.schedule(context, cmd).onComplete(promise); + conn.schedule(cmd, promise); } else { promise.fail(ar.cause()); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java index fb9a094cb..cc9d6e456 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java @@ -58,7 +58,7 @@ public void executeSimpleQuery(CommandScheduler scheduler, PromiseInternal promise) { ContextInternal context = promise.context(); QueryResultBuilder handler = createHandler(promise); - scheduler.schedule(context, new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler)).onComplete(handler); + scheduler.schedule(new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler), handler); } QueryResultBuilder executeExtendedQuery(CommandScheduler scheduler, @@ -88,7 +88,7 @@ QueryResultBuilder executeExtendedQuery(CommandScheduler scheduler, autoCommit, collector, handler); - scheduler.schedule(context, cmd).onComplete(handler); + scheduler.schedule(cmd, handler); return handler; } @@ -96,7 +96,7 @@ public void executeExtendedQuery(CommandScheduler scheduler, String sql, Prepare ContextInternal context = (ContextInternal) promise.context(); QueryResultBuilder handler = this.createHandler(promise); ExtendedQueryCommand cmd = createExtendedQueryCommand(sql, options, autoCommit, arguments, handler); - scheduler.schedule(context, cmd).onComplete(handler); + scheduler.schedule(cmd, handler); } private ExtendedQueryCommand createExtendedQueryCommand(String sql, @@ -130,14 +130,14 @@ void executeBatchQuery(CommandScheduler scheduler, } } ExtendedQueryCommand cmd = ExtendedQueryCommand.createBatch(preparedStatement.sql(), options, preparedStatement, batch, autoCommit, collector, handler); - scheduler.schedule(context, cmd).onComplete(handler); + scheduler.schedule(cmd, handler); } public void executeBatchQuery(CommandScheduler scheduler, String sql, PrepareOptions options, boolean autoCommit, List batch, PromiseInternal promise) { ContextInternal context = promise.context(); QueryResultBuilder handler = createHandler(promise); ExtendedQueryCommand cmd = createBatchQueryCommand(sql, options, autoCommit, batch, handler); - scheduler.schedule(context, cmd).onComplete(handler); + scheduler.schedule(cmd, handler); } private ExtendedQueryCommand createBatchQueryCommand(String sql, diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java index 8998ccdb0..04ef6a3fb 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java @@ -17,6 +17,7 @@ package io.vertx.sqlclient.impl; +import io.vertx.core.Completable; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.internal.PromiseInternal; @@ -30,7 +31,7 @@ /** * A query result for building a {@link SqlResult}. */ -public class QueryResultBuilder, L extends SqlResult> implements QueryResultHandler, Promise { +public class QueryResultBuilder, L extends SqlResult> implements QueryResultHandler, Completable { private final Promise handler; private final Function factory; @@ -81,30 +82,19 @@ public void addProperty(PropertyKind property, V value) { } @Override - public boolean tryComplete(Boolean result) { - suspended = result; - if (failure != null) { - return tryFail(failure); + public void complete(Boolean aBoolean, Throwable throwable) { + if (throwable == null) { + suspended = aBoolean; + if (failure != null) { + handler.tryFail(failure); + } else { + handler.tryComplete((L) first); + } } else { - return handler.tryComplete((L) first); + handler.tryFail(throwable); } } - @Override - public boolean tryFail(Throwable cause) { - return handler.tryFail(cause); - } - - @Override - public boolean tryFail(String message) { - return handler.tryFail(message); - } - - @Override - public Future future() { - return handler.future().map(l -> isSuspended()); - } - public boolean isSuspended() { return suspended; } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java index 548a32512..af411b86b 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java @@ -194,10 +194,8 @@ public void close(Holder holder, Promise promise) { } @Override - public Future schedule(ContextInternal context, CommandBase cmd) { - Promise promise = context.promise(); - this.context.emit(v -> doSchedule(cmd, promise)); - return promise.future(); + public void schedule(CommandBase cmd, Completable handler) { + this.context.emit(v -> doSchedule(cmd, handler)); } protected void doSchedule(CommandBase cmd, Completable handler) { diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java index 837dfb616..a52bdda20 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java @@ -57,10 +57,10 @@ public void fail() { private void execute(CommandBase cmd) { Completable handler = cmd.handler; - connection.schedule(context, cmd).onComplete(handler); + connection.schedule(cmd, handler); } - private Completable wrap(CommandBase cmd, Promise handler) { + private Completable wrap(CommandBase cmd, Completable handler) { return (res, err) -> { synchronized (TransactionImpl.this) { pendingQueries--; @@ -70,7 +70,7 @@ private Completable wrap(CommandBase cmd, Promise handler) { }; } - public void schedule(CommandBase cmd, Promise handler) { + public void schedule(CommandBase cmd, Completable handler) { cmd.handler = wrap(cmd, handler); if (!schedule(cmd)) { handler.fail("Transaction already completed"); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index cc66f6daf..bd8665fc5 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -13,6 +13,7 @@ import io.netty.channel.EventLoop; import io.vertx.core.*; +import io.vertx.core.internal.PromiseInternal; import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.ClientMetrics; @@ -181,28 +182,32 @@ private void dequeueMetric(Object metric) { } } - public Future execute(ContextInternal context, CommandBase cmd) { + // TODO : try optimize without promise + public void execute(CommandBase cmd, Completable handler) { + ContextInternal context = vertx.getOrCreateContext(); Promise> p = context.promise(); Object metric = enqueueMetric(); pool.acquire(context, 0) .onComplete(p); - return p.future().compose(lease -> { + p.future().compose(lease -> { dequeueMetric(metric); PooledConnection pooled = lease.get(); Connection conn = pooled.conn; Future future; if (afterAcquire != null) { future = afterAcquire.apply(conn) - .compose(v -> pooled.schedule(context, cmd)) + .compose(v -> Future.future(d -> pooled.schedule(cmd, d))) .eventually(() -> beforeRecycle.apply(conn)); } else { - future = pooled.schedule(context, cmd); + PromiseInternal pp = context.promise(); + pooled.schedule(cmd, pp); + future = pp; } return future.andThen(ar -> { pooled.refresh(); lease.recycle(); }); - }); + }).onComplete(handler); } public void acquire(ContextInternal context, long timeout, Completable handler) { @@ -368,7 +373,8 @@ public DatabaseMetadata getDatabaseMetaData() { } @Override - public Future schedule(ContextInternal context, CommandBase cmd) { + public void schedule(CommandBase cmd, Completable handler) { + ContextInternal context = vertx.getOrCreateContext(); QueryReporter queryReporter; VertxTracer tracer = vertx.tracer(); ClientMetrics metrics = conn.metrics(); @@ -378,13 +384,17 @@ public Future schedule(ContextInternal context, CommandBase cmd) { } else { queryReporter = null; } - Future fut = conn.schedule(context, cmd); if (queryReporter != null) { - fut = fut.andThen(queryReporter::after); + Completable ori = handler; + handler = (res, err) -> { + queryReporter.after(res, err); + ori.complete(res, err); + }; } - return fut; + conn.schedule(cmd, handler); } + /** * Close the underlying connection */ diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryReporter.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryReporter.java index 82acd7e84..88ffe02c8 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryReporter.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryReporter.java @@ -131,13 +131,13 @@ public void before() { } } - public void after(AsyncResult ar) { + public void after(Object res, Throwable err) { if (tracer != null) { QueryResultBuilder qbr = (QueryResultBuilder) cmd.resultHandler(); - receiveResponse(context, payload, ar.succeeded() ? qbr.first : null, ar.succeeded() ? null : ar.cause()); + receiveResponse(context, payload, err == null ? qbr.first : null, err); } if (metrics != null) { - if (ar.succeeded()) { + if (err == null) { metrics.responseBegin(metric, null); metrics.responseEnd(metric); } else { diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlClientBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlClientBase.java index bf65e2109..b6af37871 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlClientBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlClientBase.java @@ -17,6 +17,7 @@ package io.vertx.sqlclient.internal; +import io.vertx.core.Completable; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; import io.vertx.sqlclient.PrepareOptions; @@ -167,7 +168,7 @@ private void executeBatch(List batch, PromiseInternal promise) { public void group(Handler block) { GroupingClient grouping = new GroupingClient(); block.handle(grouping); - schedule(context(), grouping.composite); + schedule(grouping.composite, (res, err) -> {}); } private class GroupingClient extends SqlClientBase { @@ -194,8 +195,8 @@ protected PromiseInternal promise() { } @Override - public Future schedule(ContextInternal context, CommandBase cmd) { - return composite.add(context, cmd); + public void schedule(CommandBase cmd, Completable handler) { + composite.add(cmd, handler); } } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java index 2b37009ca..569b5231e 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java @@ -74,7 +74,9 @@ public C prepare(String sql, PrepareOptions options, Handler prepare(String sql, PrepareOptions options) { - return schedule(context, new PrepareStatementCommand(sql, options, true)) + Promise promise = context.promise(); + schedule(new PrepareStatementCommand(sql, options, true), promise); + return promise.future() .compose( cr -> Future.succeededFuture(PreparedStatementImpl.create(conn, context, cr, autoCommit())), err -> { @@ -113,12 +115,10 @@ public void handleClosed() { } @Override - public Future schedule(ContextInternal context, CommandBase cmd) { + public void schedule(CommandBase cmd, Completable handler) { if (tx != null) { // TODO - Promise promise = context.promise(); - tx.schedule(cmd, promise); - return promise.future(); + tx.schedule(cmd, handler); } else { QueryReporter queryReporter; VertxTracer tracer = context.owner().tracer(); @@ -126,13 +126,13 @@ public Future schedule(ContextInternal context, CommandBase cmd) { if (!(conn instanceof SqlConnectionPool.PooledConnection) && cmd instanceof QueryCommandBase && (tracer != null || metrics != null)) { queryReporter = new QueryReporter(tracer, metrics, context, (QueryCommandBase) cmd, conn); queryReporter.before(); - return conn - .schedule(context, cmd) - .andThen(ar -> { - queryReporter.after(ar); + conn + .schedule(cmd, (res, err) -> { + queryReporter.after(res, err); + handler.complete(res, err); }); } else { - return conn.schedule(context, cmd); + conn.schedule(cmd, handler); } } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandScheduler.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandScheduler.java index 087cbdbcc..2d6e7a1f0 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandScheduler.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CommandScheduler.java @@ -16,12 +16,20 @@ */ package io.vertx.sqlclient.internal.command; +import io.vertx.core.Completable; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.internal.ContextInternal; @FunctionalInterface public interface CommandScheduler { - Future schedule(ContextInternal context, CommandBase cmd); + default Future schedule(ContextInternal context, CommandBase cmd) { + Promise promise = context.promise(); + schedule(cmd, promise); + return promise.future(); + } + + void schedule(CommandBase cmd, Completable handler); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CompositeCommand.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CompositeCommand.java index 87b7a38e2..ed28068a3 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CompositeCommand.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/CompositeCommand.java @@ -1,5 +1,6 @@ package io.vertx.sqlclient.internal.command; +import io.vertx.core.Completable; import io.vertx.core.Future; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; @@ -11,11 +12,9 @@ public class CompositeCommand extends CommandBase { private final List> commands = new ArrayList<>(); - public Future add(ContextInternal context, CommandBase cmd) { - PromiseInternal promise = context.promise(); - cmd.handler = promise; + public void add(CommandBase cmd, Completable handler) { + cmd.handler = handler; commands.add(cmd); - return promise.future(); } public List> commands() { diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java index bf269a080..a8fafba95 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/pool/PoolImpl.java @@ -171,8 +171,8 @@ public Future getConnection() { } @Override - public Future schedule(ContextInternal context, CommandBase cmd) { - return pool.execute(context, cmd); + public void schedule(CommandBase cmd, Completable handler) { + pool.execute(cmd, handler); } private void acquire(ContextInternal context, long timeout, Completable completionHandler) {