Skip to content

Commit 5549e13

Browse files
committed
Remove more usage of completion handler
1 parent 81c6f50 commit 5549e13

File tree

5 files changed

+39
-40
lines changed

5 files changed

+39
-40
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@
2323
import io.vertx.core.internal.net.NetSocketInternal;
2424
import io.vertx.core.spi.metrics.ClientMetrics;
2525
import io.vertx.db2client.DB2ConnectOptions;
26+
import io.vertx.db2client.DB2Exception;
27+
import io.vertx.db2client.impl.codec.ConnectionState;
2628
import io.vertx.db2client.impl.codec.DB2CommandMessage;
2729
import io.vertx.db2client.impl.codec.DB2Codec;
2830
import io.vertx.db2client.impl.codec.DB2PreparedStatement;
2931
import io.vertx.db2client.impl.codec.ExtendedBatchQueryDB2CommandMessage;
3032
import io.vertx.db2client.impl.codec.ExtendedQueryDB2CommandMessage;
3133
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
3234
import io.vertx.db2client.impl.drda.ConnectionMetaData;
35+
import io.vertx.db2client.impl.drda.SQLState;
36+
import io.vertx.db2client.impl.drda.SqlCode;
3337
import io.vertx.sqlclient.SqlConnectOptions;
3438
import io.vertx.sqlclient.codec.CommandMessage;
3539
import io.vertx.sqlclient.spi.connection.Connection;
@@ -51,6 +55,7 @@ public class DB2SocketConnection extends SocketConnectionBase {
5155
private DB2Codec codec;
5256
private Handler<Void> closeHandler;
5357
public final ConnectionMetaData connMetadata = new ConnectionMetaData();
58+
public ConnectionState status = ConnectionState.CONNECTING;
5459

5560
public DB2SocketConnection(NetSocketInternal socket,
5661
ClientMetrics clientMetrics,
@@ -64,6 +69,19 @@ public DB2SocketConnection(NetSocketInternal socket,
6469
this.connectOptions = connectOptions;
6570
}
6671

72+
@Override
73+
protected <R> void fail(CommandBase<R> command, Completable<R> handler, Throwable err) {
74+
if (status == ConnectionState.CONNECTING && command instanceof InitialHandshakeCommand) {
75+
// Sometimes DB2 closes the connection when sending an invalid Database name.
76+
// -4499 = A fatal error occurred that resulted in a disconnect from the data
77+
// source.
78+
// 08001 = "The connection was unable to be established"
79+
err = new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED,
80+
SQLState.AUTH_DATABASE_CONNECTION_REFUSED);
81+
}
82+
super.fail(command, handler, err);
83+
}
84+
6785
// TODO RETURN FUTURE ???
6886
void sendStartupMessage(String username,
6987
String password,
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.vertx.db2client.impl.codec;
2+
3+
public enum ConnectionState {
4+
CONNECTING, AUTHENTICATING, CONNECTED, CONNECT_FAILED
5+
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Codec.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package io.vertx.db2client.impl.codec;
1717

18-
import io.netty.channel.ChannelHandlerContext;
1918
import io.netty.channel.CombinedChannelDuplexHandler;
2019
import io.vertx.db2client.impl.DB2SocketConnection;
21-
import io.vertx.sqlclient.ClosedConnectionException;
2220

2321
import java.util.ArrayDeque;
2422

@@ -34,16 +32,4 @@ public DB2Codec(DB2SocketConnection db2SocketConnection) {
3432
DB2Decoder decoder = new DB2Decoder(inflight);
3533
init(decoder, encoder);
3634
}
37-
38-
@Override
39-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
40-
clearInflightCommands(ClosedConnectionException.INSTANCE);
41-
super.channelInactive(ctx);
42-
}
43-
44-
private void clearInflightCommands(Throwable failure) {
45-
for (DB2CommandMessage<?, ?> commandMsg : inflight) {
46-
commandMsg.fail(failure);
47-
}
48-
}
4935
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/InitialHandshakeDB2CommandMessage.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.vertx.db2client.impl.drda.SQLState;
2727
import io.vertx.db2client.impl.drda.SqlCode;
2828
import io.vertx.sqlclient.spi.connection.Connection;
29-
import io.vertx.sqlclient.codec.CommandResponse;
3029

3130
/**
3231
* InitialHandshakeCommandCodec encodes the packets to get a connection from the database.
@@ -37,14 +36,8 @@
3736
*/
3837
class InitialHandshakeDB2CommandMessage extends AuthenticationDB2CommandBaseMessage<Connection, InitialHandshakeCommand> {
3938

40-
private static enum ConnectionState {
41-
CONNECTING, AUTHENTICATING, CONNECTED, CONNECT_FAILED
42-
}
43-
4439
private static final int TARGET_SECURITY_MEASURE = DRDAConstants.SECMEC_USRIDPWD;
4540

46-
private ConnectionState status = ConnectionState.CONNECTING;
47-
4841
InitialHandshakeDB2CommandMessage(InitialHandshakeCommand cmd) {
4942
super(cmd);
5043
}
@@ -53,16 +46,6 @@ private static enum ConnectionState {
5346
void encode(DB2Encoder encoder) {
5447
super.encode(encoder);
5548
encoder.socketConnection.connMetadata.databaseName = cmd.database();
56-
encoder.socketConnection.closeHandler(h -> {
57-
if (status == ConnectionState.CONNECTING) {
58-
// Sometimes DB2 closes the connection when sending an invalid Database name.
59-
// -4499 = A fatal error occurred that resulted in a disconnect from the data
60-
// source.
61-
// 08001 = "The connection was unable to be established"
62-
fail(new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED,
63-
SQLState.AUTH_DATABASE_CONNECTION_REFUSED));
64-
}
65-
});
6649

6750
ByteBuf packet = allocateBuffer();
6851
int packetStartIdx = packet.writerIndex();
@@ -90,7 +73,7 @@ void encode(DB2Encoder encoder) {
9073
@Override
9174
void decodePayload(ByteBuf payload, int payloadLength) {
9275
DRDAConnectResponse response = new DRDAConnectResponse(payload, encoder.socketConnection.connMetadata);
93-
switch (status) {
76+
switch (encoder.socketConnection.status) {
9477
case CONNECTING:
9578
response.readExchangeServerAttributes();
9679
// readAccessSecurity can throw a DB2Exception if there are problems connecting.
@@ -100,7 +83,7 @@ void decodePayload(ByteBuf payload, int payloadLength) {
10083
try {
10184
response.readAccessSecurity(TARGET_SECURITY_MEASURE);
10285
} catch (DB2Exception de) {
103-
status = ConnectionState.CONNECT_FAILED;
86+
encoder.socketConnection.status = ConnectionState.CONNECT_FAILED;
10487
throw de;
10588
}
10689

@@ -116,7 +99,7 @@ void decodePayload(ByteBuf payload, int payloadLength) {
11699

117100
int lenOfPayload = packet.writerIndex() - packetStartIdx;
118101
sendPacket(packet, lenOfPayload);
119-
status = ConnectionState.AUTHENTICATING;
102+
encoder.socketConnection.status = ConnectionState.AUTHENTICATING;
120103
break;
121104

122105
case AUTHENTICATING:
@@ -125,7 +108,7 @@ void decodePayload(ByteBuf payload, int payloadLength) {
125108
if (accData.correlationToken != null) {
126109
encoder.socketConnection.connMetadata.correlationToken = accData.correlationToken;
127110
}
128-
status = ConnectionState.CONNECTED;
111+
encoder.socketConnection.status = ConnectionState.CONNECTED;
129112
fireCommandSuccess(cmd.connection());
130113
break;
131114

vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -436,22 +436,29 @@ protected void handleClose(Throwable t) {
436436
}
437437
CommandMessage<?, ?> msg;
438438
while ((msg = bilto.poll()) != null) {
439-
Completable<?> handler = msg.handler;
440-
context.runOnContext(v -> handler.fail(ClosedConnectionException.INSTANCE));
439+
fail(msg, ClosedConnectionException.INSTANCE);
441440
}
442441
Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : new VertxException(PENDING_CMD_CONNECTION_CORRUPT_MSG, t);
443442
CommandBase<?> cmd;
444443
while ((cmd = pending.poll()) != null) {
445-
CommandBase<?> c = cmd;
446-
Completable<?> handler = handlers.poll();
447-
context.runOnContext(v -> handler.fail(cause));
444+
CommandBase c = cmd;
445+
Completable handler = handlers.poll();
446+
fail(c, handler, cause);
448447
}
449448
if (holder != null) {
450449
holder.handleClosed();
451450
}
452451
}
453452
}
454453

454+
private <R> void fail(CommandMessage<R, ?> msg, Throwable err) {
455+
fail(msg.cmd, msg.handler, err);
456+
}
457+
458+
protected <R> void fail(CommandBase<R> command, Completable<R> handler, Throwable err) {
459+
context.runOnContext(v -> handler.fail(err));
460+
}
461+
455462
public boolean pipeliningEnabled() {
456463
return pipeliningLimit > 1;
457464
}

0 commit comments

Comments
 (0)