Skip to content

Commit d15dd64

Browse files
committed
The internal command codec for PostgreSQL/MSSQL/MySQL should act differently after it has been signaled a failure. When the codec is signaled a failure, it should record it, then any processed command should be failed an not written in the channel pipeline.
The codec code has also been improved with more encapsulation of the infight command queue in the codec class, simplified and made more homogeneous accross implementations.
1 parent fbbb874 commit d15dd64

32 files changed

+267
-164
lines changed

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorCommandCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ void encode() {
3333
if (cursorData != null && cursorData.serverCursorId > 0) {
3434
sendCursorClose();
3535
} else {
36-
completionHandler.handle(CommandResponse.success(null));
36+
tdsMessageCodec.decoder().fireCommandResponse(CommandResponse.success(null));
3737
}
3838
}
3939

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementCommandCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ void encode() {
3030
if (ps.handle > 0) {
3131
sendUnprepareRequest();
3232
} else {
33-
completionHandler.handle(CommandResponse.success(null));
33+
tdsMessageCodec.decoder().fireCommandResponse(CommandResponse.success(null));
3434
}
3535
}
3636

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class ExtendedBatchQueryCommandCodec<T> extends ExtendedQueryCommandBaseCodec<T>
3333
@Override
3434
void encode() {
3535
if (paramsList.isEmpty()) {
36-
completionHandler.handle(CommandResponse.failure("Can not execute batch query with 0 sets of batch parameters."));
36+
tdsMessageCodec.decoder().fireCommandResponse(CommandResponse.failure("Can not execute batch query with 0 sets of batch parameters."));
3737
return;
3838
}
3939
super.encode();

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLCommandCodec.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
package io.vertx.mssqlclient.impl.codec;
1313

1414
import io.netty.buffer.ByteBuf;
15-
import io.vertx.core.Handler;
1615
import io.vertx.mssqlclient.MSSQLException;
1716
import io.vertx.mssqlclient.MSSQLInfo;
1817
import io.vertx.sqlclient.impl.command.CommandBase;
@@ -30,7 +29,6 @@ abstract class MSSQLCommandCodec<R, C extends CommandBase<R>> {
3029
final C cmd;
3130
public MSSQLException failure;
3231
public R result;
33-
Handler<? super CommandResponse<R>> completionHandler;
3432

3533
MSSQLCommandCodec(TdsMessageCodec tdsMessageCodec, C cmd) {
3634
this.tdsMessageCodec = tdsMessageCodec;
@@ -221,6 +219,6 @@ void complete() {
221219
} else {
222220
resp = CommandResponse.success(result);
223221
}
224-
completionHandler.handle(resp);
222+
tdsMessageCodec.decoder().fireCommandResponse(resp);
225223
}
226224
}

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PreLoginCommandCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,6 @@ void decode(ByteBuf payload) {
8989
}
9090
payload.resetReaderIndex();
9191
}
92-
completionHandler.handle(CommandResponse.success(new PreLoginResponse(metadata, encryptionLevel)));
92+
tdsMessageCodec.decoder().fireCommandResponse(CommandResponse.success(new PreLoginResponse(metadata, encryptionLevel)));
9393
}
9494
}

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PrepareStatementCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ class PrepareStatementCodec extends MSSQLCommandCodec<PreparedStatement, Prepare
2424
void encode() {
2525
// we use sp_prepexec instead of sp_prepare + sp_exec
2626
PreparedStatement preparedStatement = new MSSQLPreparedStatement(cmd.sql());
27-
completionHandler.handle(CommandResponse.success(preparedStatement));
27+
tdsMessageCodec.decoder().fireCommandResponse(CommandResponse.success(preparedStatement));
2828
}
2929
}

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageCodec.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ public class TdsMessageCodec extends CombinedChannelDuplexHandler<TdsMessageDeco
2727

2828
private final ArrayDeque<MSSQLCommandCodec<?, ?>> inflight = new ArrayDeque<>();
2929
private final TdsMessageEncoder encoder;
30+
private final TdsMessageDecoder decoder;
3031

3132
private ChannelHandlerContext chctx;
3233
private ByteBufAllocator alloc;
3334
private long transactionDescriptor;
3435
private Map<String, CursorData> cursorDataMap;
36+
private Throwable failure;
3537

3638
public TdsMessageCodec(int packetSize) {
37-
TdsMessageDecoder decoder = new TdsMessageDecoder(this);
39+
decoder = new TdsMessageDecoder(this);
3840
encoder = new TdsMessageEncoder(this, packetSize);
3941
init(decoder, encoder);
4042
}
@@ -48,30 +50,41 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
4850

4951
@Override
5052
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
51-
fail(ctx, cause);
53+
fail(cause);
5254
super.exceptionCaught(ctx, cause);
5355
}
5456

55-
private void fail(ChannelHandlerContext ctx, Throwable cause) {
56-
for (Iterator<MSSQLCommandCodec<?, ?>> it = inflight.iterator(); it.hasNext(); ) {
57-
MSSQLCommandCodec<?, ?> codec = it.next();
58-
it.remove();
59-
CommandResponse<Object> failure = CommandResponse.failure(cause);
60-
failure.cmd = (CommandBase) codec.cmd;
61-
ctx.fireChannelRead(failure);
57+
private void fail(Throwable cause) {
58+
if (failure == null) {
59+
failure = cause;
60+
for (Iterator<MSSQLCommandCodec<?, ?>> it = inflight.iterator(); it.hasNext(); ) {
61+
MSSQLCommandCodec<?, ?> codec = it.next();
62+
it.remove();
63+
fail(codec, cause);
64+
}
6265
}
6366
}
6467

68+
private void fail(MSSQLCommandCodec<?, ?> codec, Throwable cause) {
69+
CommandResponse<Object> failure = CommandResponse.failure(cause);
70+
failure.cmd = (CommandBase) codec.cmd;
71+
chctx.fireChannelRead(failure);
72+
}
73+
6574
@Override
6675
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
67-
fail(ctx, ClosedConnectionException.INSTANCE);
76+
fail(ClosedConnectionException.INSTANCE);
6877
super.channelInactive(ctx);
6978
}
7079

7180
TdsMessageEncoder encoder() {
7281
return encoder;
7382
}
7483

84+
TdsMessageDecoder decoder() {
85+
return decoder;
86+
}
87+
7588
ChannelHandlerContext chctx() {
7689
return chctx;
7790
}
@@ -96,8 +109,14 @@ void setTransactionDescriptor(long transactionDescriptor) {
96109
return inflight.poll();
97110
}
98111

99-
void add(MSSQLCommandCodec<?, ?> codec) {
100-
inflight.add(codec);
112+
boolean add(MSSQLCommandCodec<?, ?> codec) {
113+
if (failure == null) {
114+
inflight.add(codec);
115+
return true;
116+
} else {
117+
fail(codec, failure);
118+
return false;
119+
}
101120
}
102121

103122
CursorData getOrCreateCursorData(String cursorId) {

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageDecoder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import io.netty.buffer.ByteBufAllocator;
1515
import io.netty.channel.ChannelHandlerContext;
1616
import io.netty.channel.ChannelInboundHandlerAdapter;
17+
import io.vertx.sqlclient.impl.command.CommandBase;
18+
import io.vertx.sqlclient.impl.command.CommandResponse;
1719

1820
public class TdsMessageDecoder extends ChannelInboundHandlerAdapter {
1921

2022
private final TdsMessageCodec tdsMessageCodec;
2123

24+
private ChannelHandlerContext chctx;
2225
private ByteBufAllocator alloc;
2326
private TdsMessage message;
2427

@@ -28,6 +31,7 @@ public TdsMessageDecoder(TdsMessageCodec tdsMessageCodec) {
2831

2932
@Override
3033
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
34+
chctx = ctx;
3135
alloc = ctx.alloc();
3236
}
3337

@@ -49,6 +53,12 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
4953
releaseMessage();
5054
}
5155

56+
void fireCommandResponse(CommandResponse<?> commandResponse) {
57+
MSSQLCommandCodec<?, ?> c = tdsMessageCodec.poll();
58+
commandResponse.cmd = (CommandBase) c.cmd;
59+
chctx.fireChannelRead(commandResponse);
60+
}
61+
5262
private void releaseMessage() {
5363
if (message != null) {
5464
message.release();

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageEncoder.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
5252

5353
void write(CommandBase<?> cmd) {
5454
MSSQLCommandCodec<?, ?> codec = wrap(cmd);
55-
codec.completionHandler = resp -> {
56-
MSSQLCommandCodec<?, ?> c = this.tdsMessageCodec.poll();
57-
resp.cmd = (CommandBase) c.cmd;
58-
chctx.fireChannelRead(resp);
59-
};
60-
this.tdsMessageCodec.add(codec);
61-
codec.encode();
55+
if (tdsMessageCodec.add(codec)) {
56+
codec.encode();
57+
}
6258
}
6359

6460
private MSSQLCommandCodec<?, ?> wrap(CommandBase<?> cmd) {

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/AuthenticationCommandBaseCodec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected final void handleAuthMoreData(byte[] password, ByteBuf payload) {
7575
} else if (flag == FAST_AUTH_STATUS_FLAG) {
7676
// fast auth success
7777
} else {
78-
encoder.handleCommandResponse(CommandResponse.failure(new UnsupportedOperationException("Unsupported flag for AuthMoreData : " + flag)));
78+
encoder.fireCommandResponse(CommandResponse.failure(new UnsupportedOperationException("Unsupported flag for AuthMoreData : " + flag)));
7979
}
8080
}
8181
}
@@ -86,7 +86,7 @@ protected final void sendEncryptedPasswordWithServerRsaPublicKey(byte[] password
8686
byte[] passwordInput = Arrays.copyOf(password, password.length + 1); // need to append 0x00(NULL) to the password
8787
encryptedPassword = RsaPublicKeyEncryptor.encrypt(passwordInput, authPluginData, serverRsaPublicKeyContent);
8888
} catch (Exception e) {
89-
encoder.handleCommandResponse(CommandResponse.failure(e));
89+
encoder.fireCommandResponse(CommandResponse.failure(e));
9090
return;
9191
}
9292
sendBytesAsPacket(encryptedPassword);

0 commit comments

Comments
 (0)