Skip to content

Commit 91235f9

Browse files
committed
Remove usage of Handler/AsyncResult idiom.
1 parent 96eb1c0 commit 91235f9

File tree

12 files changed

+56
-70
lines changed

12 files changed

+56
-70
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.netty.channel.ChannelPipeline;
1919
import io.vertx.core.AsyncResult;
20+
import io.vertx.core.Completable;
2021
import io.vertx.core.Handler;
2122
import io.vertx.core.Promise;
2223
import io.vertx.core.internal.ContextInternal;
@@ -79,7 +80,7 @@ public void init() {
7980
}
8081

8182
@Override
82-
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
83+
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
8384
if (cmd instanceof TxCommand) {
8485
TxCommand<R> txCmd = (TxCommand<R>) cmd;
8586
if (txCmd.kind == TxCommand.Kind.BEGIN) {
@@ -90,7 +91,7 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
9091
} else {
9192
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(txCmd.kind.sql, false, false,
9293
QueryCommandBase.NULL_COLLECTOR, QueryResultHandler.NOOP_HANDLER);
93-
super.doSchedule(cmd2, ar -> handler.handle(ar.map(txCmd.result)));
94+
super.doSchedule(cmd2, (res, err) -> handler.complete(txCmd.result, err));
9495

9596
}
9697
} else {

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLSocketConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.netty.channel.ChannelPromise;
1616
import io.netty.handler.ssl.SslHandler;
1717
import io.vertx.core.AsyncResult;
18+
import io.vertx.core.Completable;
1819
import io.vertx.core.Future;
1920
import io.vertx.core.Handler;
2021
import io.vertx.core.internal.ContextInternal;
@@ -147,7 +148,7 @@ public void init() {
147148
}
148149

149150
@Override
150-
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
151+
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
151152
if (cmd instanceof TxCommand) {
152153
TxCommand<R> tx = (TxCommand<R>) cmd;
153154
String sql = tx.kind == BEGIN ? "BEGIN TRANSACTION":tx.kind.sql;
@@ -157,7 +158,7 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
157158
false,
158159
QueryCommandBase.NULL_COLLECTOR,
159160
QueryResultHandler.NOOP_HANDLER);
160-
super.doSchedule(cmd2, ar -> handler.handle(ar.map(tx.result)));
161+
super.doSchedule(cmd2, (res, err) -> handler.complete(tx.result, err));
161162
} else {
162163
super.doSchedule(cmd, handler);
163164
}

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLSocketConnection.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
package io.vertx.mysqlclient.impl;
1919

2020
import io.netty.channel.ChannelPipeline;
21-
import io.vertx.core.AsyncResult;
22-
import io.vertx.core.Future;
23-
import io.vertx.core.Handler;
24-
import io.vertx.core.Promise;
21+
import io.vertx.core.*;
2522
import io.vertx.core.buffer.Buffer;
2623
import io.vertx.core.internal.ContextInternal;
2724
import io.vertx.core.net.ClientSSLOptions;
@@ -100,7 +97,7 @@ public void init() {
10097
}
10198

10299
@Override
103-
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
100+
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
104101
if (cmd instanceof TxCommand) {
105102
TxCommand<R> tx = (TxCommand<R>) cmd;
106103
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(
@@ -109,7 +106,7 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
109106
false,
110107
QueryCommandBase.NULL_COLLECTOR,
111108
QueryResultHandler.NOOP_HANDLER);
112-
super.doSchedule(cmd2, ar -> handler.handle(ar.map(tx.result)));
109+
super.doSchedule(cmd2, (res, err) -> handler.complete(tx.result, err));
113110
} else {
114111
super.doSchedule(cmd, handler);
115112
}

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
173173
return promise.future();
174174
}
175175

176-
private <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
176+
private <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
177177
cmd.handler = handler;
178178
if (closePromise == null) {
179179
pending.add(cmd);

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919

2020
import io.netty.channel.ChannelPipeline;
2121
import io.netty.handler.codec.DecoderException;
22-
import io.vertx.core.AsyncResult;
23-
import io.vertx.core.Future;
24-
import io.vertx.core.Handler;
25-
import io.vertx.core.Promise;
22+
import io.vertx.core.*;
2623
import io.vertx.core.buffer.Buffer;
2724
import io.vertx.core.net.ClientSSLOptions;
2825
import io.vertx.core.spi.metrics.ClientMetrics;
@@ -158,7 +155,7 @@ void upgradeToSSLConnection(ClientSSLOptions sslOptions, Handler<AsyncResult<Voi
158155
}
159156

160157
@Override
161-
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
158+
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
162159
if (cmd instanceof TxCommand) {
163160
TxCommand<R> tx = (TxCommand<R>) cmd;
164161
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(
@@ -167,7 +164,7 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
167164
false,
168165
QueryCommandBase.NULL_COLLECTOR,
169166
QueryResultHandler.NOOP_HANDLER);
170-
super.doSchedule(cmd2, ar -> handler.handle(ar.map(tx.result)));
167+
super.doSchedule(cmd2, (res, err) -> handler.complete(tx.result, err));
171168
} else {
172169
super.doSchedule(cmd, handler);
173170
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ private void checkReconnect(int count) {
106106
if (!closed) {
107107
Long val = reconnectPolicy.apply(count);
108108
if (val >= 0) {
109-
tryConnect(val, ar -> {
110-
if (ar.failed()) {
109+
tryConnect(val, (res, err) -> {
110+
if (err != null) {
111111
checkReconnect(count + 1);
112112
}
113113
});
@@ -150,7 +150,7 @@ public Future<Void> connect() {
150150
return promise.future();
151151
}
152152

153-
private void tryConnect(long delayMillis, Handler<AsyncResult<Void>> handler) {
153+
private void tryConnect(long delayMillis, Completable<Void> handler) {
154154
if (!connecting) {
155155
connecting = true;
156156
if (delayMillis > 0) {
@@ -161,11 +161,11 @@ private void tryConnect(long delayMillis, Handler<AsyncResult<Void>> handler) {
161161
}
162162
}
163163

164-
private void doConnect(Handler<AsyncResult<Void>> completionHandler) {
164+
private void doConnect(Completable<Void> completionHandler) {
165165
PgConnection.connect(vertx, options).onComplete(ar -> handleConnectResult(completionHandler, ar));
166166
}
167167

168-
private synchronized void handleConnectResult(Handler<AsyncResult<Void>> completionHandler, AsyncResult<PgConnection> ar1) {
168+
private synchronized void handleConnectResult(Completable<Void> completionHandler, AsyncResult<PgConnection> ar1) {
169169
connecting = false;
170170
if (ar1.succeeded()) {
171171
conn = ar1.result();
@@ -192,12 +192,12 @@ private synchronized void handleConnectResult(Handler<AsyncResult<Void>> complet
192192
} else {
193193
handlers.forEach(vertx::runOnContext);
194194
}
195-
completionHandler.handle(ar2.mapEmpty());
195+
completionHandler.complete(null, ar2.cause());
196196
});
197197
return;
198198
}
199199
}
200-
completionHandler.handle(ar1.mapEmpty());
200+
completionHandler.complete(null, ar1.cause());
201201
}
202202

203203
private class ChannelList {

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,7 @@
2121
import io.netty.channel.ChannelFutureListener;
2222
import io.netty.channel.ChannelHandlerContext;
2323
import io.netty.handler.codec.DecoderException;
24-
import io.vertx.core.AsyncResult;
25-
import io.vertx.core.Context;
26-
import io.vertx.core.Future;
27-
import io.vertx.core.Handler;
28-
import io.vertx.core.Promise;
29-
import io.vertx.core.Vertx;
30-
import io.vertx.core.VertxException;
24+
import io.vertx.core.*;
3125
import io.vertx.core.internal.logging.Logger;
3226
import io.vertx.core.internal.logging.LoggerFactory;
3327
import io.vertx.core.net.SocketAddress;
@@ -206,7 +200,7 @@ public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
206200
return promise.future();
207201
}
208202

209-
protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handler) {
203+
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
210204
if (handler == null) {
211205
throw new IllegalArgumentException();
212206
}
@@ -220,7 +214,7 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
220214
CompositeCommand composite = (CompositeCommand) cmd;
221215
List<CommandBase<?>> commands = composite.commands();
222216
pending.addAll(commands);
223-
composite.handler.handle(Future.succeededFuture());
217+
composite.handler.succeed();
224218
} else {
225219
pending.add(cmd);
226220
}
@@ -285,10 +279,9 @@ private void checkPending() {
285279

286280
private PrepareStatementCommand prepareCommand(ExtendedQueryCommand<?> queryCmd, boolean cache, boolean sendParameterTypes) {
287281
PrepareStatementCommand prepareCmd = new PrepareStatementCommand(queryCmd.sql(), null, cache, sendParameterTypes ? queryCmd.parameterTypes() : null);
288-
prepareCmd.handler = ar -> {
282+
prepareCmd.handler = (ps, cause) -> {
289283
paused = false;
290-
if (ar.succeeded()) {
291-
PreparedStatement ps = ar.result();
284+
if (cause == null) {
292285
if (cache) {
293286
cacheStatement(ps);
294287
}
@@ -303,7 +296,6 @@ private PrepareStatementCommand prepareCommand(ExtendedQueryCommand<?> queryCmd,
303296
ctx.flush();
304297
}
305298
} else {
306-
Throwable cause = ar.cause();
307299
if (isIndeterminatePreparedStatementError(cause) && !sendParameterTypes) {
308300
ChannelHandlerContext ctx = socket.channelHandlerContext();
309301
// We cannot cache this prepared statement because it might be executed with another type
@@ -340,9 +332,9 @@ private CloseStatementCommand evictStatementIfNecessary() {
340332
if (psCache != null && psCache.isFull()) {
341333
PreparedStatement evicted = psCache.evict();
342334
CloseStatementCommand closeCmd = new CloseStatementCommand(evicted);
343-
closeCmd.handler = ar -> {
344-
if (ar.failed()) {
345-
logger.error("Error when closing cached prepared statement", ar.cause());
335+
closeCmd.handler = (res, err) -> {
336+
if (err != null) {
337+
logger.error("Error when closing cached prepared statement", err);
346338
}
347339
};
348340
return closeCmd;

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
*/
1717
package io.vertx.sqlclient.impl;
1818

19-
import io.vertx.core.AsyncResult;
20-
import io.vertx.core.Future;
21-
import io.vertx.core.Handler;
22-
import io.vertx.core.Promise;
19+
import io.vertx.core.*;
2320
import io.vertx.core.internal.ContextInternal;
2421
import io.vertx.core.internal.PromiseInternal;
2522
import io.vertx.sqlclient.Transaction;
@@ -59,18 +56,17 @@ public void fail() {
5956
}
6057

6158
private <R> void execute(CommandBase<R> cmd) {
62-
Handler<AsyncResult<R>> handler = cmd.handler;
59+
Completable<R> handler = cmd.handler;
6360
connection.schedule(context, cmd).onComplete(handler);
6461
}
6562

66-
private <T> Handler<AsyncResult<T>> wrap(CommandBase<?> cmd, Promise<T> handler) {
67-
return ar -> {
68-
CommandBase<?> abc = cmd;
63+
private <T> Completable<T> wrap(CommandBase<?> cmd, Promise<T> handler) {
64+
return (res, err) -> {
6965
synchronized (TransactionImpl.this) {
7066
pendingQueries--;
7167
}
7268
checkEnd();
73-
handler.handle(ar);
69+
handler.complete(res, err);
7470
};
7571
}
7672

@@ -150,11 +146,11 @@ public void rollback(Handler<AsyncResult<Void>> handler) {
150146

151147
private TxCommand<Void> txCommand(TxCommand.Kind kind) {
152148
TxCommand<Void> cmd = new TxCommand<>(kind, null);
153-
cmd.handler = ar -> {
154-
if (ar.succeeded()) {
149+
cmd.handler = (res, err) -> {
150+
if (err == null) {
155151
completion.complete(kind);
156152
} else {
157-
completion.fail(ar.cause());
153+
completion.fail(err);
158154
}
159155
};
160156
return cmd;

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public int size() {
150150

151151
public void evict() {
152152
long now = System.currentTimeMillis();
153-
pool.evict(conn -> conn.shouldEvict(now), ar -> {
153+
pool.evict(conn -> conn.shouldEvict(now)).onComplete(ar -> {
154154
if (ar.succeeded()) {
155155
List<PooledConnection> res = ar.result();
156156
for (PooledConnection conn : res) {
@@ -184,7 +184,8 @@ private void dequeueMetric(Object metric) {
184184
public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
185185
Promise<Lease<PooledConnection>> p = context.promise();
186186
Object metric = enqueueMetric();
187-
pool.acquire(context, 0, p);
187+
pool.acquire(context, 0)
188+
.onComplete(p);
188189
return p.future().compose(lease -> {
189190
dequeueMetric(metric);
190191
PooledConnection pooled = lease.get();
@@ -204,8 +205,8 @@ public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
204205
});
205206
}
206207

207-
public void acquire(ContextInternal context, long timeout, Handler<AsyncResult<PooledConnection>> handler) {
208-
class PoolRequest implements PoolWaiter.Listener<PooledConnection>, Handler<AsyncResult<Lease<PooledConnection>>> {
208+
public void acquire(ContextInternal context, long timeout, Completable<PooledConnection> handler) {
209+
class PoolRequest implements PoolWaiter.Listener<PooledConnection>, Completable<Lease<PooledConnection>> {
209210

210211
private final Object metric;
211212
private long timerID = -1L;
@@ -215,44 +216,43 @@ class PoolRequest implements PoolWaiter.Listener<PooledConnection>, Handler<Asyn
215216
}
216217

217218
@Override
218-
public void handle(AsyncResult<Lease<PooledConnection>> ar) {
219+
public void complete(Lease<PooledConnection> lease, Throwable failure) {
219220
if (timerID != -1L) {
220221
vertx.cancelTimer(timerID);
221222
}
222-
if (ar.succeeded()) {
223-
Lease<PooledConnection> lease = ar.result();
223+
if (failure == null) {
224224
if (afterAcquire != null) {
225225
afterAcquire.apply(lease.get().conn).onComplete(ar2 -> {
226226
if (ar2.succeeded()) {
227227
handle(lease);
228228
} else {
229229
// Should we do some cleanup ?
230-
handler.handle(Future.failedFuture(ar.cause()));
230+
handler.fail(failure);
231231
}
232232
});
233233
} else {
234234
handle(lease);
235235
}
236236
} else {
237-
handler.handle(Future.failedFuture(ar.cause()));
237+
handler.fail(failure);
238238
}
239239
}
240240

241241
private void handle(Lease<PooledConnection> lease) {
242242
dequeueMetric(metric);
243243
PooledConnection pooled = lease.get();
244244
pooled.lease = lease;
245-
handler.handle(Future.succeededFuture(pooled));
245+
handler.succeed(pooled);
246246
}
247247

248248
@Override
249249
public void onEnqueue(PoolWaiter<PooledConnection> waiter) {
250250
if (timeout > 0L && timerID == -1L) {
251251
timerID = context.setTimer(timeout, id -> {
252-
pool.cancel(waiter, ar -> {
252+
pool.cancel(waiter).onComplete(ar -> {
253253
if (ar.succeeded()) {
254254
if (ar.result()) {
255-
handler.handle(Future.failedFuture("Timeout"));
255+
handler.fail("Timeout");
256256
}
257257
} else {
258258
// ????
@@ -269,12 +269,13 @@ public void onConnect(PoolWaiter<PooledConnection> waiter) {
269269
}
270270
Object metric = enqueueMetric();
271271
PoolRequest request = new PoolRequest(metric);
272-
pool.acquire(context, request, 0, request);
272+
pool.acquire(context, request, 0)
273+
.onComplete(request);
273274
}
274275

275276
public Future<Void> close() {
276277
Promise<Void> promise = vertx.promise();
277-
pool.close(ar1 -> {
278+
pool.close().onComplete(ar1 -> {
278279
if (ar1.succeeded()) {
279280
List<Future<Void>> results = ar1
280281
.result()

0 commit comments

Comments
 (0)