Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,8 @@ public Future<Void> beforeRecycle() {
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
Promise<R> promise = context.promise();
this.context.emit(v -> doSchedule(cmd, promise));
return promise.future();
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
this.context.emit(v -> doSchedule(cmd, handler));
}

private <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public void init() {
// TODO RETURN FUTURE ???
Future<Connection> sendStartupMessage(String username, String password, String database, Map<String, String> properties) {
InitCommand cmd = new InitCommand(this, username, password, database, properties);
return schedule(context, cmd);
Promise<Connection> promise = context.promise();
schedule(cmd, promise);
return promise.future();
}

Future<Void> sendCancelRequestMessage(int processId, int secretKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void withPreparedStatement(PrepareOptions options, Tuple args, Handler<AsyncResu
if (future == null) {
// Lazy statement;
PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types());
conn.schedule(context, prepare).onComplete(promise);
conn.schedule(prepare, promise);
future = promise.future();
}
future.onComplete(handler);
Expand Down Expand Up @@ -161,7 +161,7 @@ public Future<Void> close() {
Promise<Void> promise = context.promise();
if (this.promise == null) {
CloseStatementCommand cmd = new CloseStatementCommand(future.result());
conn.schedule(context, cmd).onComplete(promise);
conn.schedule(cmd, promise);
} else {
if (future == null) {
future = this.promise.future();
Expand All @@ -170,7 +170,7 @@ public Future<Void> close() {
future.onComplete(ar -> {
if (ar.succeeded()) {
CloseStatementCommand cmd = new CloseStatementCommand(ar.result());
conn.schedule(context, cmd).onComplete(promise);
conn.schedule(cmd, promise);
} else {
promise.complete();
}
Expand All @@ -191,7 +191,7 @@ void closeCursor(String cursorId, Promise<Void> promise) {
future.onComplete(ar -> {
if (ar.succeeded()) {
CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result());
conn.schedule(context, cmd).onComplete(promise);
conn.schedule(cmd, promise);
} else {
promise.fail(ar.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void executeSimpleQuery(CommandScheduler scheduler,
PromiseInternal<L> promise) {
ContextInternal context = promise.context();
QueryResultBuilder handler = createHandler(promise);
scheduler.schedule(context, new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler)).onComplete(handler);
scheduler.schedule(new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler), handler);
}

QueryResultBuilder<T, R, L> executeExtendedQuery(CommandScheduler scheduler,
Expand Down Expand Up @@ -88,15 +88,15 @@ QueryResultBuilder<T, R, L> executeExtendedQuery(CommandScheduler scheduler,
autoCommit,
collector,
handler);
scheduler.schedule(context, cmd).onComplete(handler);
scheduler.schedule(cmd, handler);
return handler;
}

public void executeExtendedQuery(CommandScheduler scheduler, String sql, PrepareOptions options, boolean autoCommit, Tuple arguments, PromiseInternal<L> promise) {
ContextInternal context = (ContextInternal) promise.context();
QueryResultBuilder handler = this.createHandler(promise);
ExtendedQueryCommand cmd = createExtendedQueryCommand(sql, options, autoCommit, arguments, handler);
scheduler.schedule(context, cmd).onComplete(handler);
scheduler.schedule(cmd, handler);
}

private ExtendedQueryCommand<T> createExtendedQueryCommand(String sql,
Expand Down Expand Up @@ -130,14 +130,14 @@ void executeBatchQuery(CommandScheduler scheduler,
}
}
ExtendedQueryCommand<T> cmd = ExtendedQueryCommand.createBatch(preparedStatement.sql(), options, preparedStatement, batch, autoCommit, collector, handler);
scheduler.schedule(context, cmd).onComplete(handler);
scheduler.schedule(cmd, handler);
}

public void executeBatchQuery(CommandScheduler scheduler, String sql, PrepareOptions options, boolean autoCommit, List<Tuple> batch, PromiseInternal<L> promise) {
ContextInternal context = promise.context();
QueryResultBuilder handler = createHandler(promise);
ExtendedQueryCommand<T> cmd = createBatchQueryCommand(sql, options, autoCommit, batch, handler);
scheduler.schedule(context, cmd).onComplete(handler);
scheduler.schedule(cmd, handler);
}

private ExtendedQueryCommand<T> createBatchQueryCommand(String sql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.vertx.sqlclient.impl;

import io.vertx.core.Completable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.internal.PromiseInternal;
Expand All @@ -30,7 +31,7 @@
/**
* A query result for building a {@link SqlResult}.
*/
public class QueryResultBuilder<T, R extends SqlResultBase<T>, L extends SqlResult<T>> implements QueryResultHandler<T>, Promise<Boolean> {
public class QueryResultBuilder<T, R extends SqlResultBase<T>, L extends SqlResult<T>> implements QueryResultHandler<T>, Completable<Boolean> {

private final Promise<L> handler;
private final Function<T, R> factory;
Expand Down Expand Up @@ -81,30 +82,19 @@ public <V> void addProperty(PropertyKind<V> property, V value) {
}

@Override
public boolean tryComplete(Boolean result) {
suspended = result;
if (failure != null) {
return tryFail(failure);
public void complete(Boolean aBoolean, Throwable throwable) {
if (throwable == null) {
suspended = aBoolean;
if (failure != null) {
handler.tryFail(failure);
} else {
handler.tryComplete((L) first);
}
} else {
return handler.tryComplete((L) first);
handler.tryFail(throwable);
}
}

@Override
public boolean tryFail(Throwable cause) {
return handler.tryFail(cause);
}

@Override
public boolean tryFail(String message) {
return handler.tryFail(message);
}

@Override
public Future<Boolean> future() {
return handler.future().map(l -> isSuspended());
}

public boolean isSuspended() {
return suspended;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,8 @@ public void close(Holder holder, Promise<Void> promise) {
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
Promise<R> promise = context.promise();
this.context.emit(v -> doSchedule(cmd, promise));
return promise.future();
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
this.context.emit(v -> doSchedule(cmd, handler));
}

protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public void fail() {

private <R> void execute(CommandBase<R> cmd) {
Completable<R> handler = cmd.handler;
connection.schedule(context, cmd).onComplete(handler);
connection.schedule(cmd, handler);
}

private <T> Completable<T> wrap(CommandBase<?> cmd, Promise<T> handler) {
private <T> Completable<T> wrap(CommandBase<?> cmd, Completable<T> handler) {
return (res, err) -> {
synchronized (TransactionImpl.this) {
pendingQueries--;
Expand All @@ -70,7 +70,7 @@ private <T> Completable<T> wrap(CommandBase<?> cmd, Promise<T> handler) {
};
}

public <R> void schedule(CommandBase<R> cmd, Promise<R> handler) {
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
cmd.handler = wrap(cmd, handler);
if (!schedule(cmd)) {
handler.fail("Transaction already completed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.netty.channel.EventLoop;
import io.vertx.core.*;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.ClientMetrics;
Expand Down Expand Up @@ -181,28 +182,32 @@ private void dequeueMetric(Object metric) {
}
}

public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
// TODO : try optimize without promise
public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
ContextInternal context = vertx.getOrCreateContext();
Promise<Lease<PooledConnection>> p = context.promise();
Object metric = enqueueMetric();
pool.acquire(context, 0)
.onComplete(p);
return p.future().compose(lease -> {
p.future().compose(lease -> {
dequeueMetric(metric);
PooledConnection pooled = lease.get();
Connection conn = pooled.conn;
Future<R> future;
if (afterAcquire != null) {
future = afterAcquire.apply(conn)
.compose(v -> pooled.schedule(context, cmd))
.compose(v -> Future.<R>future(d -> pooled.schedule(cmd, d)))
.eventually(() -> beforeRecycle.apply(conn));
} else {
future = pooled.schedule(context, cmd);
PromiseInternal<R> pp = context.promise();
pooled.schedule(cmd, pp);
future = pp;
}
return future.andThen(ar -> {
pooled.refresh();
lease.recycle();
});
});
}).onComplete(handler);
}

public void acquire(ContextInternal context, long timeout, Completable<PooledConnection> handler) {
Expand Down Expand Up @@ -368,7 +373,8 @@ public DatabaseMetadata getDatabaseMetaData() {
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
ContextInternal context = vertx.getOrCreateContext();
QueryReporter queryReporter;
VertxTracer tracer = vertx.tracer();
ClientMetrics metrics = conn.metrics();
Expand All @@ -378,13 +384,17 @@ public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
} else {
queryReporter = null;
}
Future<R> fut = conn.schedule(context, cmd);
if (queryReporter != null) {
fut = fut.andThen(queryReporter::after);
Completable<R> ori = handler;
handler = (res, err) -> {
queryReporter.after(res, err);
ori.complete(res, err);
};
}
return fut;
conn.schedule(cmd, handler);
}


/**
* Close the underlying connection
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ public void before() {
}
}

public void after(AsyncResult ar) {
public void after(Object res, Throwable err) {
if (tracer != null) {
QueryResultBuilder<?, ?, ?> qbr = (QueryResultBuilder) cmd.resultHandler();
receiveResponse(context, payload, ar.succeeded() ? qbr.first : null, ar.succeeded() ? null : ar.cause());
receiveResponse(context, payload, err == null ? qbr.first : null, err);
}
if (metrics != null) {
if (ar.succeeded()) {
if (err == null) {
metrics.responseBegin(metric, null);
metrics.responseEnd(metric);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.vertx.sqlclient.internal;

import io.vertx.core.Completable;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.sqlclient.PrepareOptions;
Expand Down Expand Up @@ -167,7 +168,7 @@ private void executeBatch(List<Tuple> batch, PromiseInternal<R> promise) {
public void group(Handler<SqlClient> block) {
GroupingClient grouping = new GroupingClient();
block.handle(grouping);
schedule(context(), grouping.composite);
schedule(grouping.composite, (res, err) -> {});
}

private class GroupingClient extends SqlClientBase {
Expand All @@ -194,8 +195,8 @@ protected <T> PromiseInternal<T> promise() {
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
return composite.add(context, cmd);
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
composite.add(cmd, handler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ public C prepare(String sql, PrepareOptions options, Handler<AsyncResult<Prepare
}

public Future<PreparedStatement> prepare(String sql, PrepareOptions options) {
return schedule(context, new PrepareStatementCommand(sql, options, true))
Promise<io.vertx.sqlclient.internal.PreparedStatement> promise = context.promise();
schedule(new PrepareStatementCommand(sql, options, true), promise);
return promise.future()
.compose(
cr -> Future.succeededFuture(PreparedStatementImpl.create(conn, context, cr, autoCommit())),
err -> {
Expand Down Expand Up @@ -113,26 +115,24 @@ public void handleClosed() {
}

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
if (tx != null) {
// TODO
Promise<R> promise = context.promise();
tx.schedule(cmd, promise);
return promise.future();
tx.schedule(cmd, handler);
} else {
QueryReporter queryReporter;
VertxTracer tracer = context.owner().tracer();
ClientMetrics metrics = conn.metrics();
if (!(conn instanceof SqlConnectionPool.PooledConnection) && cmd instanceof QueryCommandBase && (tracer != null || metrics != null)) {
queryReporter = new QueryReporter(tracer, metrics, context, (QueryCommandBase<?>) cmd, conn);
queryReporter.before();
return conn
.schedule(context, cmd)
.andThen(ar -> {
queryReporter.after(ar);
conn
.schedule(cmd, (res, err) -> {
queryReporter.after(res, err);
handler.complete(res, err);
});
} else {
return conn.schedule(context, cmd);
conn.schedule(cmd, handler);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@
*/
package io.vertx.sqlclient.internal.command;

import io.vertx.core.Completable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.internal.ContextInternal;

@FunctionalInterface
public interface CommandScheduler {

<R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd);
default <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
Promise<R> promise = context.promise();
schedule(cmd, promise);
return promise.future();
}

<R> void schedule(CommandBase<R> cmd, Completable<R> handler);

}
Loading