Skip to content

Commit c684c42

Browse files
committed
Let the socket connection take care of the lifecycle of command message and the associated flow.
Motivation: Currently each codec takes care of failing the inflight command messages when a failure occurs in the pipeline. The socket connection already takes care of translating command responses to completion of the command handler. We should let the socket connection take care of handling the case of failure, this avoids propagating the command response handler to each codec and the codec only cares about writing an inbound command response message.
1 parent 7d64d85 commit c684c42

File tree

26 files changed

+135
-242
lines changed

26 files changed

+135
-242
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@
2323
import io.vertx.core.internal.net.NetSocketInternal;
2424
import io.vertx.core.spi.metrics.ClientMetrics;
2525
import io.vertx.db2client.DB2ConnectOptions;
26+
import io.vertx.db2client.DB2Exception;
27+
import io.vertx.db2client.impl.codec.ConnectionState;
2628
import io.vertx.db2client.impl.codec.DB2CommandMessage;
2729
import io.vertx.db2client.impl.codec.DB2Codec;
2830
import io.vertx.db2client.impl.codec.DB2PreparedStatement;
2931
import io.vertx.db2client.impl.codec.ExtendedBatchQueryDB2CommandMessage;
3032
import io.vertx.db2client.impl.codec.ExtendedQueryDB2CommandMessage;
3133
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
3234
import io.vertx.db2client.impl.drda.ConnectionMetaData;
35+
import io.vertx.db2client.impl.drda.SQLState;
36+
import io.vertx.db2client.impl.drda.SqlCode;
3337
import io.vertx.sqlclient.SqlConnectOptions;
3438
import io.vertx.sqlclient.codec.CommandMessage;
3539
import io.vertx.sqlclient.spi.connection.Connection;
@@ -51,6 +55,7 @@ public class DB2SocketConnection extends SocketConnectionBase {
5155
private DB2Codec codec;
5256
private Handler<Void> closeHandler;
5357
public final ConnectionMetaData connMetadata = new ConnectionMetaData();
58+
public ConnectionState status = ConnectionState.CONNECTING;
5459

5560
public DB2SocketConnection(NetSocketInternal socket,
5661
ClientMetrics clientMetrics,
@@ -64,6 +69,19 @@ public DB2SocketConnection(NetSocketInternal socket,
6469
this.connectOptions = connectOptions;
6570
}
6671

72+
@Override
73+
protected <R> void fail(CommandBase<R> command, Completable<R> handler, Throwable err) {
74+
if (status == ConnectionState.CONNECTING && command instanceof InitialHandshakeCommand) {
75+
// Sometimes DB2 closes the connection when sending an invalid Database name.
76+
// -4499 = A fatal error occurred that resulted in a disconnect from the data
77+
// source.
78+
// 08001 = "The connection was unable to be established"
79+
err = new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED,
80+
SQLState.AUTH_DATABASE_CONNECTION_REFUSED);
81+
}
82+
super.fail(command, handler, err);
83+
}
84+
6785
// TODO RETURN FUTURE ???
6886
void sendStartupMessage(String username,
6987
String password,

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorDB2CommandMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,6 @@ void encode(DB2Encoder encoder) {
5252
void decodePayload(ByteBuf payload, int payloadLength) {
5353
DRDAQueryResponse closeCursor = new DRDAQueryResponse(payload, encoder.socketConnection.connMetadata);
5454
closeCursor.readCursorClose();
55-
completionHandler.handle(CommandResponse.success(null));
55+
fireCommandSuccess(null);
5656
}
5757
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementDB2CommandMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ class CloseStatementDB2CommandMessage extends DB2CommandMessage<Void, CloseState
2727

2828
@Override
2929
void encode(DB2Encoder encoder) {
30+
super.encode(encoder);
3031
DB2PreparedStatement statement = (DB2PreparedStatement) cmd.statement();
3132
statement.close();
32-
completionHandler.handle(CommandResponse.success(null));
33+
fireCommandSuccess(null);
3334
}
3435

3536
@Override
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.vertx.db2client.impl.codec;
2+
3+
public enum ConnectionState {
4+
CONNECTING, AUTHENTICATING, CONNECTED, CONNECT_FAILED
5+
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Codec.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
*/
1616
package io.vertx.db2client.impl.codec;
1717

18-
import io.netty.channel.ChannelHandlerContext;
1918
import io.netty.channel.CombinedChannelDuplexHandler;
2019
import io.vertx.db2client.impl.DB2SocketConnection;
21-
import io.vertx.sqlclient.ClosedConnectionException;
2220

2321
import java.util.ArrayDeque;
2422

@@ -34,16 +32,4 @@ public DB2Codec(DB2SocketConnection db2SocketConnection) {
3432
DB2Decoder decoder = new DB2Decoder(inflight);
3533
init(decoder, encoder);
3634
}
37-
38-
@Override
39-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
40-
clearInflightCommands(ClosedConnectionException.INSTANCE);
41-
super.channelInactive(ctx);
42-
}
43-
44-
private void clearInflightCommands(Throwable failure) {
45-
for (DB2CommandMessage<?, ?> commandMsg : inflight) {
46-
commandMsg.fail(failure);
47-
}
48-
}
4935
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2CommandMessage.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
public abstract class DB2CommandMessage<R, C extends CommandBase<R>> extends CommandMessage<R, C> {
3232

33-
Handler<? super CommandResponse<R>> completionHandler;
3433
public Throwable failure;
3534
public R result;
3635
DB2Encoder encoder;
@@ -39,6 +38,14 @@ public abstract class DB2CommandMessage<R, C extends CommandBase<R>> extends Com
3938
super(cmd);
4039
}
4140

41+
void fireCommandFailure(Throwable err) {
42+
encoder.fireCommandFailure(this, err);
43+
}
44+
45+
void fireCommandSuccess(R result) {
46+
encoder.fireCommandSuccess(this, result);
47+
}
48+
4249
@SuppressWarnings({ "rawtypes", "unchecked" })
4350
public static DB2CommandMessage<?, ?> wrap(CommandBase<?> cmd) {
4451
DB2CommandMessage<?, ?> codec = null;

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Decoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private void decodePayload(ByteBuf payload, int payloadLength) {
9393
LOG.error("Command failed with: " + t.getMessage() + "\nCommand=" + ctx, t);
9494
}
9595
}
96-
ctx.completionHandler.handle(CommandResponse.failure(t));
96+
ctx.fireCommandFailure(t);
9797
} finally {
9898
payload.clear();
9999
payload.release();

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Encoder.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,33 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
5555
}
5656
}
5757

58+
void fireCommandFailure(DB2CommandMessage msg, Throwable err) {
59+
DB2CommandMessage<?, ?> c = inflight.poll();
60+
if (c == msg) {
61+
chctx.fireChannelRead(CommandResponse.failure(err));
62+
} else {
63+
throw new IllegalStateException();
64+
}
65+
}
66+
67+
<R> void fireCommandSuccess(DB2CommandMessage msg, R result) {
68+
DB2CommandMessage<?, ?> c = inflight.poll();
69+
if (c == msg) {
70+
chctx.fireChannelRead(CommandResponse.success(result));
71+
} else {
72+
throw new IllegalStateException();
73+
}
74+
}
75+
5876
@SuppressWarnings({ "unchecked", "rawtypes" })
5977
void write(DB2CommandMessage<?, ?> msg) {
60-
msg.completionHandler = resp -> {
61-
DB2CommandMessage<?, ?> c = inflight.poll();
62-
resp.handler = (Completable) c.handler;
63-
chctx.fireChannelRead(resp);
64-
};
65-
inflight.add(msg);
6678
try {
79+
inflight.add(msg);
6780
msg.encode(this);
6881
} catch (Throwable e) {
82+
inflight.pollLast();
6983
LOG.error("FATAL: Unable to encode command: " + msg, e);
70-
msg.completionHandler.handle(CommandResponse.failure(e));
84+
chctx.fireChannelRead(CommandResponse.failure(e));
7185
}
7286
}
7387
}

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedBatchQueryDB2CommandMessage.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.stream.Collectors;
2222

2323
import io.netty.buffer.ByteBuf;
24+
import io.vertx.core.VertxException;
2425
import io.vertx.core.internal.logging.Logger;
2526
import io.vertx.core.internal.logging.LoggerFactory;
2627
import io.vertx.db2client.impl.codec.DB2PreparedStatement.QueryInstance;
@@ -49,7 +50,7 @@ public ExtendedBatchQueryDB2CommandMessage(ExtendedQueryCommand<R> cmd, DB2Prepa
4950
@Override
5051
void encode(DB2Encoder encoder) {
5152
if (params.isEmpty()) {
52-
completionHandler.handle(CommandResponse.failure("Can not execute batch query with 0 sets of batch parameters."));
53+
fireCommandFailure(VertxException.noStackTrace("Can not execute batch query with 0 sets of batch parameters."));
5354
return;
5455
}
5556

@@ -88,7 +89,7 @@ void decodeQuery(ByteBuf payload) {
8889
hasMoreResults &= !queryComplete;
8990
handleQueryResult(decoder);
9091
}
91-
completionHandler.handle(CommandResponse.success(hasMoreResults));
92+
fireCommandSuccess(hasMoreResults);
9293
}
9394

9495
void decodeUpdate(ByteBuf payload) {
@@ -99,7 +100,7 @@ void decodeUpdate(ByteBuf payload) {
99100
if (cmd.autoCommit()) {
100101
updateResponse.readLocalCommit();
101102
}
102-
completionHandler.handle(CommandResponse.success(true));
103+
fireCommandSuccess(true);
103104
}
104105

105106
@Override

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryDB2CommandBaseMessage.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.stream.Collector;
2020

2121
import io.netty.buffer.ByteBuf;
22+
import io.vertx.core.VertxException;
2223
import io.vertx.core.buffer.Buffer;
2324
import io.vertx.core.internal.buffer.BufferInternal;
2425
import io.vertx.db2client.impl.codec.DB2PreparedStatement.QueryInstance;
@@ -44,8 +45,8 @@ abstract class ExtendedQueryDB2CommandBaseMessage<R, C extends ExtendedQueryComm
4445
void encodePreparedQuery(DRDAQueryRequest queryRequest, QueryInstance queryInstance, Tuple params) {
4546
int requiredParams = statement.paramDesc.paramDefinitions().columns_;
4647
if (params.size() != requiredParams) {
47-
completionHandler.handle(CommandResponse.failure("Only " + params.size()
48-
+ " prepared statement parameters were provided " + "but " + requiredParams + " parameters are required."));
48+
fireCommandFailure(VertxException.noStackTrace("Only " + params.size()
49+
+ " prepared statement parameters were provided " + "but " + requiredParams + " parameters are required."));
4950
return;
5051
}
5152

0 commit comments

Comments
 (0)