Skip to content

Commit 6a42909

Browse files
authored
CommandMessage are named XXXCodec due to the original design
We should name those classes XXXCommandMessage instead to implement a more idiomatic design. This rename the hierarchies for the implemented codecs. In addition we try to avoid command message completion handler leaking into each codec and instead we keep this confined in the database socket connection.
2 parents b9c07b2 + c684c42 commit 6a42909

File tree

80 files changed

+381
-489
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+381
-489
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@
2323
import io.vertx.core.internal.net.NetSocketInternal;
2424
import io.vertx.core.spi.metrics.ClientMetrics;
2525
import io.vertx.db2client.DB2ConnectOptions;
26-
import io.vertx.db2client.impl.codec.CommandCodec;
26+
import io.vertx.db2client.DB2Exception;
27+
import io.vertx.db2client.impl.codec.ConnectionState;
28+
import io.vertx.db2client.impl.codec.DB2CommandMessage;
2729
import io.vertx.db2client.impl.codec.DB2Codec;
2830
import io.vertx.db2client.impl.codec.DB2PreparedStatement;
29-
import io.vertx.db2client.impl.codec.ExtendedBatchQueryCommandCodec;
30-
import io.vertx.db2client.impl.codec.ExtendedQueryCommandCodec;
31+
import io.vertx.db2client.impl.codec.ExtendedBatchQueryDB2CommandMessage;
32+
import io.vertx.db2client.impl.codec.ExtendedQueryDB2CommandMessage;
3133
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
3234
import io.vertx.db2client.impl.drda.ConnectionMetaData;
35+
import io.vertx.db2client.impl.drda.SQLState;
36+
import io.vertx.db2client.impl.drda.SqlCode;
3337
import io.vertx.sqlclient.SqlConnectOptions;
3438
import io.vertx.sqlclient.codec.CommandMessage;
3539
import io.vertx.sqlclient.spi.connection.Connection;
@@ -51,6 +55,7 @@ public class DB2SocketConnection extends SocketConnectionBase {
5155
private DB2Codec codec;
5256
private Handler<Void> closeHandler;
5357
public final ConnectionMetaData connMetadata = new ConnectionMetaData();
58+
public ConnectionState status = ConnectionState.CONNECTING;
5459

5560
public DB2SocketConnection(NetSocketInternal socket,
5661
ClientMetrics clientMetrics,
@@ -64,6 +69,19 @@ public DB2SocketConnection(NetSocketInternal socket,
6469
this.connectOptions = connectOptions;
6570
}
6671

72+
@Override
73+
protected <R> void fail(CommandBase<R> command, Completable<R> handler, Throwable err) {
74+
if (status == ConnectionState.CONNECTING && command instanceof InitialHandshakeCommand) {
75+
// Sometimes DB2 closes the connection when sending an invalid Database name.
76+
// -4499 = A fatal error occurred that resulted in a disconnect from the data
77+
// source.
78+
// 08001 = "The connection was unable to be established"
79+
err = new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED,
80+
SQLState.AUTH_DATABASE_CONNECTION_REFUSED);
81+
}
82+
super.fail(command, handler, err);
83+
}
84+
6785
// TODO RETURN FUTURE ???
6886
void sendStartupMessage(String username,
6987
String password,
@@ -90,15 +108,15 @@ public void init() {
90108
@Override
91109
protected CommandMessage<?, ?> toMessage(ExtendedQueryCommand<?> command, PreparedStatement preparedStatement) {
92110
if (command.isBatch()) {
93-
return new ExtendedBatchQueryCommandCodec<>(command, (DB2PreparedStatement) preparedStatement);
111+
return new ExtendedBatchQueryDB2CommandMessage<>(command, (DB2PreparedStatement) preparedStatement);
94112
} else {
95-
return new ExtendedQueryCommandCodec(command, (DB2PreparedStatement) preparedStatement);
113+
return new ExtendedQueryDB2CommandMessage(command, (DB2PreparedStatement) preparedStatement);
96114
}
97115
}
98116

99117
@Override
100118
protected CommandMessage<?, ?> toMessage(CommandBase<?> command) {
101-
return CommandCodec.wrap(command);
119+
return DB2CommandMessage.wrap(command);
102120
}
103121

104122
@Override
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
import io.vertx.db2client.impl.command.AuthenticationCommandBase;
1919

20-
abstract class AuthenticationCommandBaseCodec<R, C extends AuthenticationCommandBase<R>> extends CommandCodec<R, C> {
20+
abstract class AuthenticationDB2CommandBaseMessage<R, C extends AuthenticationCommandBase<R>> extends DB2CommandMessage<R, C> {
2121

22-
AuthenticationCommandBaseCodec(C cmd) {
22+
AuthenticationDB2CommandBaseMessage(C cmd) {
2323
super(cmd);
2424
}
2525

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import io.vertx.db2client.impl.drda.DRDAQueryResponse;
2121
import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand;
2222

23-
class CloseConnectionCommandCodec extends CommandCodec<Void, CloseConnectionCommand> {
23+
class CloseConnectionDB2CommandMessage extends DB2CommandMessage<Void, CloseConnectionCommand> {
2424

25-
CloseConnectionCommandCodec(CloseConnectionCommand cmd) {
25+
CloseConnectionDB2CommandMessage(CloseConnectionCommand cmd) {
2626
super(cmd);
2727
}
2828

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorCommandCodec.java renamed to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorDB2CommandMessage.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import io.vertx.sqlclient.spi.protocol.CloseCursorCommand;
2525
import io.vertx.sqlclient.codec.CommandResponse;
2626

27-
class CloseCursorCommandCodec extends CommandCodec<Void, CloseCursorCommand> {
27+
class CloseCursorDB2CommandMessage extends DB2CommandMessage<Void, CloseCursorCommand> {
2828

29-
private static final Logger LOG = LoggerFactory.getLogger(CloseCursorCommandCodec.class);
29+
private static final Logger LOG = LoggerFactory.getLogger(CloseCursorDB2CommandMessage.class);
3030

31-
CloseCursorCommandCodec(CloseCursorCommand cmd) {
31+
CloseCursorDB2CommandMessage(CloseCursorCommand cmd) {
3232
super(cmd);
3333
}
3434

@@ -52,6 +52,6 @@ void encode(DB2Encoder encoder) {
5252
void decodePayload(ByteBuf payload, int payloadLength) {
5353
DRDAQueryResponse closeCursor = new DRDAQueryResponse(payload, encoder.socketConnection.connMetadata);
5454
closeCursor.readCursorClose();
55-
completionHandler.handle(CommandResponse.success(null));
55+
fireCommandSuccess(null);
5656
}
5757
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementCommandCodec.java renamed to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementDB2CommandMessage.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919
import io.vertx.sqlclient.spi.protocol.CloseStatementCommand;
2020
import io.vertx.sqlclient.codec.CommandResponse;
2121

22-
class CloseStatementCommandCodec extends CommandCodec<Void, CloseStatementCommand> {
22+
class CloseStatementDB2CommandMessage extends DB2CommandMessage<Void, CloseStatementCommand> {
2323

24-
CloseStatementCommandCodec(CloseStatementCommand cmd) {
24+
CloseStatementDB2CommandMessage(CloseStatementCommand cmd) {
2525
super(cmd);
2626
}
2727

2828
@Override
2929
void encode(DB2Encoder encoder) {
30+
super.encode(encoder);
3031
DB2PreparedStatement statement = (DB2PreparedStatement) cmd.statement();
3132
statement.close();
32-
completionHandler.handle(CommandResponse.success(null));
33+
fireCommandSuccess(null);
3334
}
3435

3536
@Override
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.vertx.db2client.impl.codec;
2+
3+
public enum ConnectionState {
4+
CONNECTING, AUTHENTICATING, CONNECTED, CONNECT_FAILED
5+
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Codec.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package io.vertx.db2client.impl.codec;
1717

18-
import io.netty.channel.ChannelHandlerContext;
1918
import io.netty.channel.CombinedChannelDuplexHandler;
2019
import io.vertx.db2client.impl.DB2SocketConnection;
21-
import io.vertx.sqlclient.ClosedConnectionException;
2220

2321
import java.util.ArrayDeque;
2422

@@ -27,23 +25,11 @@ public class DB2Codec extends CombinedChannelDuplexHandler<DB2Decoder, DB2Encode
2725
// TODO @AGG check what packet length limit actually is for DB2
2826
static final int PACKET_PAYLOAD_LENGTH_LIMIT = 0xFFFFFF;
2927

30-
private final ArrayDeque<CommandCodec<?, ?>> inflight = new ArrayDeque<>();
28+
private final ArrayDeque<DB2CommandMessage<?, ?>> inflight = new ArrayDeque<>();
3129

3230
public DB2Codec(DB2SocketConnection db2SocketConnection) {
3331
DB2Encoder encoder = new DB2Encoder(inflight, db2SocketConnection);
3432
DB2Decoder decoder = new DB2Decoder(inflight);
3533
init(decoder, encoder);
3634
}
37-
38-
@Override
39-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
40-
clearInflightCommands(ClosedConnectionException.INSTANCE);
41-
super.channelInactive(ctx);
42-
}
43-
44-
private void clearInflightCommands(Throwable failure) {
45-
for (CommandCodec<?, ?> commandCodec : inflight) {
46-
commandCodec.fail(failure);
47-
}
48-
}
4935
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CommandCodec.java renamed to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2CommandMessage.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,34 +28,41 @@
2828
import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand;
2929
import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand;
3030

31-
public abstract class CommandCodec<R, C extends CommandBase<R>> extends CommandMessage<R, C> {
31+
public abstract class DB2CommandMessage<R, C extends CommandBase<R>> extends CommandMessage<R, C> {
3232

33-
Handler<? super CommandResponse<R>> completionHandler;
3433
public Throwable failure;
3534
public R result;
3635
DB2Encoder encoder;
3736

38-
CommandCodec(C cmd) {
37+
DB2CommandMessage(C cmd) {
3938
super(cmd);
4039
}
4140

41+
void fireCommandFailure(Throwable err) {
42+
encoder.fireCommandFailure(this, err);
43+
}
44+
45+
void fireCommandSuccess(R result) {
46+
encoder.fireCommandSuccess(this, result);
47+
}
48+
4249
@SuppressWarnings({ "rawtypes", "unchecked" })
43-
public static CommandCodec<?, ?> wrap(CommandBase<?> cmd) {
44-
CommandCodec<?, ?> codec = null;
50+
public static DB2CommandMessage<?, ?> wrap(CommandBase<?> cmd) {
51+
DB2CommandMessage<?, ?> codec = null;
4552
if (cmd instanceof InitialHandshakeCommand) {
46-
codec = new InitialHandshakeCommandCodec((InitialHandshakeCommand) cmd);
53+
codec = new InitialHandshakeDB2CommandMessage((InitialHandshakeCommand) cmd);
4754
} else if (cmd instanceof SimpleQueryCommand) {
48-
codec = new SimpleQueryCommandCodec((SimpleQueryCommand) cmd);
55+
codec = new SimpleQueryDB2CommandMessage((SimpleQueryCommand) cmd);
4956
} else if (cmd instanceof CloseConnectionCommand) {
50-
codec = new CloseConnectionCommandCodec((CloseConnectionCommand) cmd);
57+
codec = new CloseConnectionDB2CommandMessage((CloseConnectionCommand) cmd);
5158
} else if (cmd instanceof PrepareStatementCommand) {
52-
codec = new PrepareStatementCodec((PrepareStatementCommand) cmd);
59+
codec = new PrepareStatementDB2CommandMessage((PrepareStatementCommand) cmd);
5360
} else if (cmd instanceof CloseStatementCommand) {
54-
codec = new CloseStatementCommandCodec((CloseStatementCommand) cmd);
61+
codec = new CloseStatementDB2CommandMessage((CloseStatementCommand) cmd);
5562
} else if (cmd instanceof CloseCursorCommand) {
56-
codec = new CloseCursorCommandCodec((CloseCursorCommand) cmd);
63+
codec = new CloseCursorDB2CommandMessage((CloseCursorCommand) cmd);
5764
} else if (cmd instanceof PingCommand) {
58-
codec = new PingCommandCodec((PingCommand) cmd);
65+
codec = new PingDB2CommandMessage((PingCommand) cmd);
5966
// } else if (cmd instanceof InitDbCommand) {
6067
// codec = new InitDbCommandCodec((InitDbCommand) cmd);
6168
// } else if (cmd instanceof StatisticsCommand) {

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Decoder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ class DB2Decoder extends ByteToMessageDecoder {
3131

3232
private static final Logger LOG = LoggerFactory.getLogger(DB2Decoder.class);
3333

34-
private final ArrayDeque<CommandCodec<?, ?>> inflight;
34+
private final ArrayDeque<DB2CommandMessage<?, ?>> inflight;
3535

36-
DB2Decoder(ArrayDeque<CommandCodec<?, ?>> inflight) {
36+
DB2Decoder(ArrayDeque<DB2CommandMessage<?, ?>> inflight) {
3737
this.inflight = inflight;
3838
}
3939

@@ -71,7 +71,7 @@ private int computeLength(ByteBuf in) {
7171
}
7272

7373
private void decodePayload(ByteBuf payload, int payloadLength) {
74-
CommandCodec<?, ?> ctx = inflight.peek();
74+
DB2CommandMessage<?, ?> ctx = inflight.peek();
7575
int startIndex = payload.readerIndex();
7676
try {
7777
if (LOG.isDebugEnabled())
@@ -93,7 +93,7 @@ private void decodePayload(ByteBuf payload, int payloadLength) {
9393
LOG.error("Command failed with: " + t.getMessage() + "\nCommand=" + ctx, t);
9494
}
9595
}
96-
ctx.completionHandler.handle(CommandResponse.failure(t));
96+
ctx.fireCommandFailure(t);
9797
} finally {
9898
payload.clear();
9999
payload.release();

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Encoder.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ class DB2Encoder extends ChannelOutboundHandlerAdapter {
3030

3131
public static final Logger LOG = LoggerFactory.getLogger(DB2Encoder.class);
3232

33-
private final ArrayDeque<CommandCodec<?, ?>> inflight;
33+
private final ArrayDeque<DB2CommandMessage<?, ?>> inflight;
3434
ChannelHandlerContext chctx;
3535

3636
final DB2SocketConnection socketConnection;
3737

38-
DB2Encoder(ArrayDeque<CommandCodec<?, ?>> inflight, DB2SocketConnection db2SocketConnection) {
38+
DB2Encoder(ArrayDeque<DB2CommandMessage<?, ?>> inflight, DB2SocketConnection db2SocketConnection) {
3939
this.inflight = inflight;
4040
this.socketConnection = db2SocketConnection;
4141
}
@@ -47,27 +47,41 @@ public void handlerAdded(ChannelHandlerContext ctx) {
4747

4848
@Override
4949
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
50-
if (msg instanceof CommandCodec<?, ?>) {
51-
CommandCodec<?, ?> cmd = (CommandCodec<?, ?>) msg;
50+
if (msg instanceof DB2CommandMessage<?, ?>) {
51+
DB2CommandMessage<?, ?> cmd = (DB2CommandMessage<?, ?>) msg;
5252
write(cmd);
5353
} else {
5454
super.write(ctx, msg, promise);
5555
}
5656
}
5757

58+
void fireCommandFailure(DB2CommandMessage msg, Throwable err) {
59+
DB2CommandMessage<?, ?> c = inflight.poll();
60+
if (c == msg) {
61+
chctx.fireChannelRead(CommandResponse.failure(err));
62+
} else {
63+
throw new IllegalStateException();
64+
}
65+
}
66+
67+
<R> void fireCommandSuccess(DB2CommandMessage msg, R result) {
68+
DB2CommandMessage<?, ?> c = inflight.poll();
69+
if (c == msg) {
70+
chctx.fireChannelRead(CommandResponse.success(result));
71+
} else {
72+
throw new IllegalStateException();
73+
}
74+
}
75+
5876
@SuppressWarnings({ "unchecked", "rawtypes" })
59-
void write(CommandCodec<?, ?> msg) {
60-
msg.completionHandler = resp -> {
61-
CommandCodec<?, ?> c = inflight.poll();
62-
resp.handler = (Completable) c.handler;
63-
chctx.fireChannelRead(resp);
64-
};
65-
inflight.add(msg);
77+
void write(DB2CommandMessage<?, ?> msg) {
6678
try {
79+
inflight.add(msg);
6780
msg.encode(this);
6881
} catch (Throwable e) {
82+
inflight.pollLast();
6983
LOG.error("FATAL: Unable to encode command: " + msg, e);
70-
msg.completionHandler.handle(CommandResponse.failure(e));
84+
chctx.fireChannelRead(CommandResponse.failure(e));
7185
}
7286
}
7387
}

0 commit comments

Comments
 (0)