Skip to content

Commit 5a383f4

Browse files
committed
Simplify and optimize the command response future/promise flow.
Motivation: The command response flow uses a futures that piggies back to the query result builder and then completes the overall result promise. Only the terminal node needs to be a promise/future as we are always on the same connection context. The internal command response flow can uses instead a Completable flow that is much more straightforward for debugging and saves un-necessary method calls. Changes: Use a Completable response flow internally instead of the Promise based flow.
1 parent be56b06 commit 5a383f4

File tree

14 files changed

+80
-74
lines changed

14 files changed

+80
-74
lines changed

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,8 @@ public Future<Void> beforeRecycle() {
167167
}
168168

169169
@Override
170-
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
171-
Promise<R> promise = context.promise();
172-
this.context.emit(v -> doSchedule(cmd, promise));
173-
return promise.future();
170+
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
171+
this.context.emit(v -> doSchedule(cmd, handler));
174172
}
175173

176174
private <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ public void init() {
8282
// TODO RETURN FUTURE ???
8383
Future<Connection> sendStartupMessage(String username, String password, String database, Map<String, String> properties) {
8484
InitCommand cmd = new InitCommand(this, username, password, database, properties);
85-
return schedule(context, cmd);
85+
Promise<Connection> promise = context.promise();
86+
schedule(cmd, promise);
87+
return promise.future();
8688
}
8789

8890
Future<Void> sendCancelRequestMessage(int processId, int secretKey) {

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ void withPreparedStatement(PrepareOptions options, Tuple args, Handler<AsyncResu
101101
if (future == null) {
102102
// Lazy statement;
103103
PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types());
104-
conn.schedule(context, prepare).onComplete(promise);
104+
conn.schedule(prepare, promise);
105105
future = promise.future();
106106
}
107107
future.onComplete(handler);
@@ -161,7 +161,7 @@ public Future<Void> close() {
161161
Promise<Void> promise = context.promise();
162162
if (this.promise == null) {
163163
CloseStatementCommand cmd = new CloseStatementCommand(future.result());
164-
conn.schedule(context, cmd).onComplete(promise);
164+
conn.schedule(cmd, promise);
165165
} else {
166166
if (future == null) {
167167
future = this.promise.future();
@@ -170,7 +170,7 @@ public Future<Void> close() {
170170
future.onComplete(ar -> {
171171
if (ar.succeeded()) {
172172
CloseStatementCommand cmd = new CloseStatementCommand(ar.result());
173-
conn.schedule(context, cmd).onComplete(promise);
173+
conn.schedule(cmd, promise);
174174
} else {
175175
promise.complete();
176176
}
@@ -191,7 +191,7 @@ void closeCursor(String cursorId, Promise<Void> promise) {
191191
future.onComplete(ar -> {
192192
if (ar.succeeded()) {
193193
CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result());
194-
conn.schedule(context, cmd).onComplete(promise);
194+
conn.schedule(cmd, promise);
195195
} else {
196196
promise.fail(ar.cause());
197197
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void executeSimpleQuery(CommandScheduler scheduler,
5858
PromiseInternal<L> promise) {
5959
ContextInternal context = promise.context();
6060
QueryResultBuilder handler = createHandler(promise);
61-
scheduler.schedule(context, new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler)).onComplete(handler);
61+
scheduler.schedule(new SimpleQueryCommand<>(sql, singleton, autoCommit, collector, handler), handler);
6262
}
6363

6464
QueryResultBuilder<T, R, L> executeExtendedQuery(CommandScheduler scheduler,
@@ -88,15 +88,15 @@ QueryResultBuilder<T, R, L> executeExtendedQuery(CommandScheduler scheduler,
8888
autoCommit,
8989
collector,
9090
handler);
91-
scheduler.schedule(context, cmd).onComplete(handler);
91+
scheduler.schedule(cmd, handler);
9292
return handler;
9393
}
9494

9595
public void executeExtendedQuery(CommandScheduler scheduler, String sql, PrepareOptions options, boolean autoCommit, Tuple arguments, PromiseInternal<L> promise) {
9696
ContextInternal context = (ContextInternal) promise.context();
9797
QueryResultBuilder handler = this.createHandler(promise);
9898
ExtendedQueryCommand cmd = createExtendedQueryCommand(sql, options, autoCommit, arguments, handler);
99-
scheduler.schedule(context, cmd).onComplete(handler);
99+
scheduler.schedule(cmd, handler);
100100
}
101101

102102
private ExtendedQueryCommand<T> createExtendedQueryCommand(String sql,
@@ -130,14 +130,14 @@ void executeBatchQuery(CommandScheduler scheduler,
130130
}
131131
}
132132
ExtendedQueryCommand<T> cmd = ExtendedQueryCommand.createBatch(preparedStatement.sql(), options, preparedStatement, batch, autoCommit, collector, handler);
133-
scheduler.schedule(context, cmd).onComplete(handler);
133+
scheduler.schedule(cmd, handler);
134134
}
135135

136136
public void executeBatchQuery(CommandScheduler scheduler, String sql, PrepareOptions options, boolean autoCommit, List<Tuple> batch, PromiseInternal<L> promise) {
137137
ContextInternal context = promise.context();
138138
QueryResultBuilder handler = createHandler(promise);
139139
ExtendedQueryCommand<T> cmd = createBatchQueryCommand(sql, options, autoCommit, batch, handler);
140-
scheduler.schedule(context, cmd).onComplete(handler);
140+
scheduler.schedule(cmd, handler);
141141
}
142142

143143
private ExtendedQueryCommand<T> createBatchQueryCommand(String sql,

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryResultBuilder.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.vertx.sqlclient.impl;
1919

20+
import io.vertx.core.Completable;
2021
import io.vertx.core.Future;
2122
import io.vertx.core.Promise;
2223
import io.vertx.core.internal.PromiseInternal;
@@ -30,7 +31,7 @@
3031
/**
3132
* A query result for building a {@link SqlResult}.
3233
*/
33-
public class QueryResultBuilder<T, R extends SqlResultBase<T>, L extends SqlResult<T>> implements QueryResultHandler<T>, Promise<Boolean> {
34+
public class QueryResultBuilder<T, R extends SqlResultBase<T>, L extends SqlResult<T>> implements QueryResultHandler<T>, Completable<Boolean> {
3435

3536
private final Promise<L> handler;
3637
private final Function<T, R> factory;
@@ -81,30 +82,19 @@ public <V> void addProperty(PropertyKind<V> property, V value) {
8182
}
8283

8384
@Override
84-
public boolean tryComplete(Boolean result) {
85-
suspended = result;
86-
if (failure != null) {
87-
return tryFail(failure);
85+
public void complete(Boolean aBoolean, Throwable throwable) {
86+
if (throwable == null) {
87+
suspended = aBoolean;
88+
if (failure != null) {
89+
handler.tryFail(failure);
90+
} else {
91+
handler.tryComplete((L) first);
92+
}
8893
} else {
89-
return handler.tryComplete((L) first);
94+
handler.tryFail(throwable);
9095
}
9196
}
9297

93-
@Override
94-
public boolean tryFail(Throwable cause) {
95-
return handler.tryFail(cause);
96-
}
97-
98-
@Override
99-
public boolean tryFail(String message) {
100-
return handler.tryFail(message);
101-
}
102-
103-
@Override
104-
public Future<Boolean> future() {
105-
return handler.future().map(l -> isSuspended());
106-
}
107-
10898
public boolean isSuspended() {
10999
return suspended;
110100
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,8 @@ public void close(Holder holder, Promise<Void> promise) {
194194
}
195195

196196
@Override
197-
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
198-
Promise<R> promise = context.promise();
199-
this.context.emit(v -> doSchedule(cmd, promise));
200-
return promise.future();
197+
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
198+
this.context.emit(v -> doSchedule(cmd, handler));
201199
}
202200

203201
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ public void fail() {
5757

5858
private <R> void execute(CommandBase<R> cmd) {
5959
Completable<R> handler = cmd.handler;
60-
connection.schedule(context, cmd).onComplete(handler);
60+
connection.schedule(cmd, handler);
6161
}
6262

63-
private <T> Completable<T> wrap(CommandBase<?> cmd, Promise<T> handler) {
63+
private <T> Completable<T> wrap(CommandBase<?> cmd, Completable<T> handler) {
6464
return (res, err) -> {
6565
synchronized (TransactionImpl.this) {
6666
pendingQueries--;
@@ -70,7 +70,7 @@ private <T> Completable<T> wrap(CommandBase<?> cmd, Promise<T> handler) {
7070
};
7171
}
7272

73-
public <R> void schedule(CommandBase<R> cmd, Promise<R> handler) {
73+
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
7474
cmd.handler = wrap(cmd, handler);
7575
if (!schedule(cmd)) {
7676
handler.fail("Transaction already completed");

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import io.netty.channel.EventLoop;
1515
import io.vertx.core.*;
16+
import io.vertx.core.internal.PromiseInternal;
1617
import io.vertx.core.internal.net.NetSocketInternal;
1718
import io.vertx.core.net.SocketAddress;
1819
import io.vertx.core.spi.metrics.ClientMetrics;
@@ -181,28 +182,32 @@ private void dequeueMetric(Object metric) {
181182
}
182183
}
183184

184-
public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
185+
// TODO : try optimize without promise
186+
public <R> void execute(CommandBase<R> cmd, Completable<R> handler) {
187+
ContextInternal context = vertx.getOrCreateContext();
185188
Promise<Lease<PooledConnection>> p = context.promise();
186189
Object metric = enqueueMetric();
187190
pool.acquire(context, 0)
188191
.onComplete(p);
189-
return p.future().compose(lease -> {
192+
p.future().compose(lease -> {
190193
dequeueMetric(metric);
191194
PooledConnection pooled = lease.get();
192195
Connection conn = pooled.conn;
193196
Future<R> future;
194197
if (afterAcquire != null) {
195198
future = afterAcquire.apply(conn)
196-
.compose(v -> pooled.schedule(context, cmd))
199+
.compose(v -> Future.<R>future(d -> pooled.schedule(cmd, d)))
197200
.eventually(() -> beforeRecycle.apply(conn));
198201
} else {
199-
future = pooled.schedule(context, cmd);
202+
PromiseInternal<R> pp = context.promise();
203+
pooled.schedule(cmd, pp);
204+
future = pp;
200205
}
201206
return future.andThen(ar -> {
202207
pooled.refresh();
203208
lease.recycle();
204209
});
205-
});
210+
}).onComplete(handler);
206211
}
207212

208213
public void acquire(ContextInternal context, long timeout, Completable<PooledConnection> handler) {
@@ -368,7 +373,8 @@ public DatabaseMetadata getDatabaseMetaData() {
368373
}
369374

370375
@Override
371-
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
376+
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
377+
ContextInternal context = vertx.getOrCreateContext();
372378
QueryReporter queryReporter;
373379
VertxTracer tracer = vertx.tracer();
374380
ClientMetrics metrics = conn.metrics();
@@ -378,13 +384,17 @@ public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
378384
} else {
379385
queryReporter = null;
380386
}
381-
Future<R> fut = conn.schedule(context, cmd);
382387
if (queryReporter != null) {
383-
fut = fut.andThen(queryReporter::after);
388+
Completable<R> ori = handler;
389+
handler = (res, err) -> {
390+
queryReporter.after(res, err);
391+
ori.complete(res, err);
392+
};
384393
}
385-
return fut;
394+
conn.schedule(cmd, handler);
386395
}
387396

397+
388398
/**
389399
* Close the underlying connection
390400
*/

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/tracing/QueryReporter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ public void before() {
131131
}
132132
}
133133

134-
public void after(AsyncResult ar) {
134+
public void after(Object res, Throwable err) {
135135
if (tracer != null) {
136136
QueryResultBuilder<?, ?, ?> qbr = (QueryResultBuilder) cmd.resultHandler();
137-
receiveResponse(context, payload, ar.succeeded() ? qbr.first : null, ar.succeeded() ? null : ar.cause());
137+
receiveResponse(context, payload, err == null ? qbr.first : null, err);
138138
}
139139
if (metrics != null) {
140-
if (ar.succeeded()) {
140+
if (err == null) {
141141
metrics.responseBegin(metric, null);
142142
metrics.responseEnd(metric);
143143
} else {

vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlClientBase.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.vertx.sqlclient.internal;
1919

20+
import io.vertx.core.Completable;
2021
import io.vertx.core.internal.ContextInternal;
2122
import io.vertx.core.internal.PromiseInternal;
2223
import io.vertx.sqlclient.PrepareOptions;
@@ -167,7 +168,7 @@ private void executeBatch(List<Tuple> batch, PromiseInternal<R> promise) {
167168
public void group(Handler<SqlClient> block) {
168169
GroupingClient grouping = new GroupingClient();
169170
block.handle(grouping);
170-
schedule(context(), grouping.composite);
171+
schedule(grouping.composite, (res, err) -> {});
171172
}
172173

173174
private class GroupingClient extends SqlClientBase {
@@ -194,8 +195,8 @@ protected <T> PromiseInternal<T> promise() {
194195
}
195196

196197
@Override
197-
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
198-
return composite.add(context, cmd);
198+
public <R> void schedule(CommandBase<R> cmd, Completable<R> handler) {
199+
composite.add(cmd, handler);
199200
}
200201
}
201202
}

0 commit comments

Comments
 (0)