diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java index 3b47291838..766dfb5a7f 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2SocketConnection.java @@ -23,13 +23,17 @@ import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.db2client.DB2ConnectOptions; -import io.vertx.db2client.impl.codec.CommandCodec; +import io.vertx.db2client.DB2Exception; +import io.vertx.db2client.impl.codec.ConnectionState; +import io.vertx.db2client.impl.codec.DB2CommandMessage; import io.vertx.db2client.impl.codec.DB2Codec; import io.vertx.db2client.impl.codec.DB2PreparedStatement; -import io.vertx.db2client.impl.codec.ExtendedBatchQueryCommandCodec; -import io.vertx.db2client.impl.codec.ExtendedQueryCommandCodec; +import io.vertx.db2client.impl.codec.ExtendedBatchQueryDB2CommandMessage; +import io.vertx.db2client.impl.codec.ExtendedQueryDB2CommandMessage; import io.vertx.db2client.impl.command.InitialHandshakeCommand; import io.vertx.db2client.impl.drda.ConnectionMetaData; +import io.vertx.db2client.impl.drda.SQLState; +import io.vertx.db2client.impl.drda.SqlCode; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.codec.CommandMessage; import io.vertx.sqlclient.spi.connection.Connection; @@ -51,6 +55,7 @@ public class DB2SocketConnection extends SocketConnectionBase { private DB2Codec codec; private Handler closeHandler; public final ConnectionMetaData connMetadata = new ConnectionMetaData(); + public ConnectionState status = ConnectionState.CONNECTING; public DB2SocketConnection(NetSocketInternal socket, ClientMetrics clientMetrics, @@ -64,6 +69,19 @@ public DB2SocketConnection(NetSocketInternal socket, this.connectOptions = connectOptions; } + @Override + protected void fail(CommandBase command, Completable handler, Throwable err) { + if (status == ConnectionState.CONNECTING && command instanceof InitialHandshakeCommand) { + // Sometimes DB2 closes the connection when sending an invalid Database name. + // -4499 = A fatal error occurred that resulted in a disconnect from the data + // source. + // 08001 = "The connection was unable to be established" + err = new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED, + SQLState.AUTH_DATABASE_CONNECTION_REFUSED); + } + super.fail(command, handler, err); + } + // TODO RETURN FUTURE ??? void sendStartupMessage(String username, String password, @@ -90,15 +108,15 @@ public void init() { @Override protected CommandMessage toMessage(ExtendedQueryCommand command, PreparedStatement preparedStatement) { if (command.isBatch()) { - return new ExtendedBatchQueryCommandCodec<>(command, (DB2PreparedStatement) preparedStatement); + return new ExtendedBatchQueryDB2CommandMessage<>(command, (DB2PreparedStatement) preparedStatement); } else { - return new ExtendedQueryCommandCodec(command, (DB2PreparedStatement) preparedStatement); + return new ExtendedQueryDB2CommandMessage(command, (DB2PreparedStatement) preparedStatement); } } @Override protected CommandMessage toMessage(CommandBase command) { - return CommandCodec.wrap(command); + return DB2CommandMessage.wrap(command); } @Override diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/AuthenticationCommandBaseCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/AuthenticationDB2CommandBaseMessage.java similarity index 80% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/AuthenticationCommandBaseCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/AuthenticationDB2CommandBaseMessage.java index a0debd392b..ae69bf0580 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/AuthenticationCommandBaseCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/AuthenticationDB2CommandBaseMessage.java @@ -17,9 +17,9 @@ import io.vertx.db2client.impl.command.AuthenticationCommandBase; -abstract class AuthenticationCommandBaseCodec> extends CommandCodec { +abstract class AuthenticationDB2CommandBaseMessage> extends DB2CommandMessage { - AuthenticationCommandBaseCodec(C cmd) { + AuthenticationDB2CommandBaseMessage(C cmd) { super(cmd); } diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseConnectionCommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseConnectionDB2CommandMessage.java similarity index 89% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseConnectionCommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseConnectionDB2CommandMessage.java index 11bef520da..db269a0fea 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseConnectionCommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseConnectionDB2CommandMessage.java @@ -20,9 +20,9 @@ import io.vertx.db2client.impl.drda.DRDAQueryResponse; import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand; -class CloseConnectionCommandCodec extends CommandCodec { +class CloseConnectionDB2CommandMessage extends DB2CommandMessage { - CloseConnectionCommandCodec(CloseConnectionCommand cmd) { + CloseConnectionDB2CommandMessage(CloseConnectionCommand cmd) { super(cmd); } diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorCommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorDB2CommandMessage.java similarity index 90% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorCommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorDB2CommandMessage.java index 184d859362..c9f02f62a7 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorCommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseCursorDB2CommandMessage.java @@ -24,11 +24,11 @@ import io.vertx.sqlclient.spi.protocol.CloseCursorCommand; import io.vertx.sqlclient.codec.CommandResponse; -class CloseCursorCommandCodec extends CommandCodec { +class CloseCursorDB2CommandMessage extends DB2CommandMessage { - private static final Logger LOG = LoggerFactory.getLogger(CloseCursorCommandCodec.class); + private static final Logger LOG = LoggerFactory.getLogger(CloseCursorDB2CommandMessage.class); - CloseCursorCommandCodec(CloseCursorCommand cmd) { + CloseCursorDB2CommandMessage(CloseCursorCommand cmd) { super(cmd); } @@ -52,6 +52,6 @@ void encode(DB2Encoder encoder) { void decodePayload(ByteBuf payload, int payloadLength) { DRDAQueryResponse closeCursor = new DRDAQueryResponse(payload, encoder.socketConnection.connMetadata); closeCursor.readCursorClose(); - completionHandler.handle(CommandResponse.success(null)); + fireCommandSuccess(null); } } diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementCommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementDB2CommandMessage.java similarity index 83% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementCommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementDB2CommandMessage.java index 199dd7bde8..216a32ca9d 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementCommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CloseStatementDB2CommandMessage.java @@ -19,17 +19,18 @@ import io.vertx.sqlclient.spi.protocol.CloseStatementCommand; import io.vertx.sqlclient.codec.CommandResponse; -class CloseStatementCommandCodec extends CommandCodec { +class CloseStatementDB2CommandMessage extends DB2CommandMessage { - CloseStatementCommandCodec(CloseStatementCommand cmd) { + CloseStatementDB2CommandMessage(CloseStatementCommand cmd) { super(cmd); } @Override void encode(DB2Encoder encoder) { + super.encode(encoder); DB2PreparedStatement statement = (DB2PreparedStatement) cmd.statement(); statement.close(); - completionHandler.handle(CommandResponse.success(null)); + fireCommandSuccess(null); } @Override diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ConnectionState.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ConnectionState.java new file mode 100644 index 0000000000..e068680d2c --- /dev/null +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ConnectionState.java @@ -0,0 +1,5 @@ +package io.vertx.db2client.impl.codec; + +public enum ConnectionState { + CONNECTING, AUTHENTICATING, CONNECTED, CONNECT_FAILED +} diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Codec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Codec.java index 6891d21143..154f229d72 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Codec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Codec.java @@ -15,10 +15,8 @@ */ package io.vertx.db2client.impl.codec; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.CombinedChannelDuplexHandler; import io.vertx.db2client.impl.DB2SocketConnection; -import io.vertx.sqlclient.ClosedConnectionException; import java.util.ArrayDeque; @@ -27,23 +25,11 @@ public class DB2Codec extends CombinedChannelDuplexHandler> inflight = new ArrayDeque<>(); + private final ArrayDeque> inflight = new ArrayDeque<>(); public DB2Codec(DB2SocketConnection db2SocketConnection) { DB2Encoder encoder = new DB2Encoder(inflight, db2SocketConnection); DB2Decoder decoder = new DB2Decoder(inflight); init(decoder, encoder); } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - clearInflightCommands(ClosedConnectionException.INSTANCE); - super.channelInactive(ctx); - } - - private void clearInflightCommands(Throwable failure) { - for (CommandCodec commandCodec : inflight) { - commandCodec.fail(failure); - } - } } diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2CommandMessage.java similarity index 79% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2CommandMessage.java index b966a6115d..973b09828b 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/CommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2CommandMessage.java @@ -28,34 +28,41 @@ import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand; import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand; -public abstract class CommandCodec> extends CommandMessage { +public abstract class DB2CommandMessage> extends CommandMessage { - Handler> completionHandler; public Throwable failure; public R result; DB2Encoder encoder; - CommandCodec(C cmd) { + DB2CommandMessage(C cmd) { super(cmd); } + void fireCommandFailure(Throwable err) { + encoder.fireCommandFailure(this, err); + } + + void fireCommandSuccess(R result) { + encoder.fireCommandSuccess(this, result); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) - public static CommandCodec wrap(CommandBase cmd) { - CommandCodec codec = null; + public static DB2CommandMessage wrap(CommandBase cmd) { + DB2CommandMessage codec = null; if (cmd instanceof InitialHandshakeCommand) { - codec = new InitialHandshakeCommandCodec((InitialHandshakeCommand) cmd); + codec = new InitialHandshakeDB2CommandMessage((InitialHandshakeCommand) cmd); } else if (cmd instanceof SimpleQueryCommand) { - codec = new SimpleQueryCommandCodec((SimpleQueryCommand) cmd); + codec = new SimpleQueryDB2CommandMessage((SimpleQueryCommand) cmd); } else if (cmd instanceof CloseConnectionCommand) { - codec = new CloseConnectionCommandCodec((CloseConnectionCommand) cmd); + codec = new CloseConnectionDB2CommandMessage((CloseConnectionCommand) cmd); } else if (cmd instanceof PrepareStatementCommand) { - codec = new PrepareStatementCodec((PrepareStatementCommand) cmd); + codec = new PrepareStatementDB2CommandMessage((PrepareStatementCommand) cmd); } else if (cmd instanceof CloseStatementCommand) { - codec = new CloseStatementCommandCodec((CloseStatementCommand) cmd); + codec = new CloseStatementDB2CommandMessage((CloseStatementCommand) cmd); } else if (cmd instanceof CloseCursorCommand) { - codec = new CloseCursorCommandCodec((CloseCursorCommand) cmd); + codec = new CloseCursorDB2CommandMessage((CloseCursorCommand) cmd); } else if (cmd instanceof PingCommand) { - codec = new PingCommandCodec((PingCommand) cmd); + codec = new PingDB2CommandMessage((PingCommand) cmd); // } else if (cmd instanceof InitDbCommand) { // codec = new InitDbCommandCodec((InitDbCommand) cmd); // } else if (cmd instanceof StatisticsCommand) { diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Decoder.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Decoder.java index 39c81a9f16..841d1ea70e 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Decoder.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Decoder.java @@ -31,9 +31,9 @@ class DB2Decoder extends ByteToMessageDecoder { private static final Logger LOG = LoggerFactory.getLogger(DB2Decoder.class); - private final ArrayDeque> inflight; + private final ArrayDeque> inflight; - DB2Decoder(ArrayDeque> inflight) { + DB2Decoder(ArrayDeque> inflight) { this.inflight = inflight; } @@ -71,7 +71,7 @@ private int computeLength(ByteBuf in) { } private void decodePayload(ByteBuf payload, int payloadLength) { - CommandCodec ctx = inflight.peek(); + DB2CommandMessage ctx = inflight.peek(); int startIndex = payload.readerIndex(); try { if (LOG.isDebugEnabled()) @@ -93,7 +93,7 @@ private void decodePayload(ByteBuf payload, int payloadLength) { LOG.error("Command failed with: " + t.getMessage() + "\nCommand=" + ctx, t); } } - ctx.completionHandler.handle(CommandResponse.failure(t)); + ctx.fireCommandFailure(t); } finally { payload.clear(); payload.release(); diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Encoder.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Encoder.java index 0778f926dd..468b999e27 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Encoder.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2Encoder.java @@ -30,12 +30,12 @@ class DB2Encoder extends ChannelOutboundHandlerAdapter { public static final Logger LOG = LoggerFactory.getLogger(DB2Encoder.class); - private final ArrayDeque> inflight; + private final ArrayDeque> inflight; ChannelHandlerContext chctx; final DB2SocketConnection socketConnection; - DB2Encoder(ArrayDeque> inflight, DB2SocketConnection db2SocketConnection) { + DB2Encoder(ArrayDeque> inflight, DB2SocketConnection db2SocketConnection) { this.inflight = inflight; this.socketConnection = db2SocketConnection; } @@ -47,27 +47,41 @@ public void handlerAdded(ChannelHandlerContext ctx) { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof CommandCodec) { - CommandCodec cmd = (CommandCodec) msg; + if (msg instanceof DB2CommandMessage) { + DB2CommandMessage cmd = (DB2CommandMessage) msg; write(cmd); } else { super.write(ctx, msg, promise); } } + void fireCommandFailure(DB2CommandMessage msg, Throwable err) { + DB2CommandMessage c = inflight.poll(); + if (c == msg) { + chctx.fireChannelRead(CommandResponse.failure(err)); + } else { + throw new IllegalStateException(); + } + } + + void fireCommandSuccess(DB2CommandMessage msg, R result) { + DB2CommandMessage c = inflight.poll(); + if (c == msg) { + chctx.fireChannelRead(CommandResponse.success(result)); + } else { + throw new IllegalStateException(); + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) - void write(CommandCodec msg) { - msg.completionHandler = resp -> { - CommandCodec c = inflight.poll(); - resp.handler = (Completable) c.handler; - chctx.fireChannelRead(resp); - }; - inflight.add(msg); + void write(DB2CommandMessage msg) { try { + inflight.add(msg); msg.encode(this); } catch (Throwable e) { + inflight.pollLast(); LOG.error("FATAL: Unable to encode command: " + msg, e); - msg.completionHandler.handle(CommandResponse.failure(e)); + chctx.fireChannelRead(CommandResponse.failure(e)); } } } diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedBatchQueryCommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedBatchQueryDB2CommandMessage.java similarity index 87% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedBatchQueryCommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedBatchQueryDB2CommandMessage.java index e396a76523..5e28cdd1b5 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedBatchQueryCommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedBatchQueryDB2CommandMessage.java @@ -21,6 +21,7 @@ import java.util.stream.Collectors; import io.netty.buffer.ByteBuf; +import io.vertx.core.VertxException; import io.vertx.core.internal.logging.Logger; import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.db2client.impl.codec.DB2PreparedStatement.QueryInstance; @@ -31,15 +32,15 @@ import io.vertx.sqlclient.codec.CommandResponse; import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; -public class ExtendedBatchQueryCommandCodec extends ExtendedQueryCommandBaseCodec> { +public class ExtendedBatchQueryDB2CommandMessage extends ExtendedQueryDB2CommandBaseMessage> { - private static final Logger LOG = LoggerFactory.getLogger(ExtendedBatchQueryCommandCodec.class); + private static final Logger LOG = LoggerFactory.getLogger(ExtendedBatchQueryDB2CommandMessage.class); private final List params; private final List queryInstances; private final String baseCursorId; - public ExtendedBatchQueryCommandCodec(ExtendedQueryCommand cmd, DB2PreparedStatement statement) { + public ExtendedBatchQueryDB2CommandMessage(ExtendedQueryCommand cmd, DB2PreparedStatement statement) { super(cmd, statement); params = cmd.paramsList(); queryInstances = new ArrayList<>(params.size()); @@ -49,7 +50,7 @@ public ExtendedBatchQueryCommandCodec(ExtendedQueryCommand cmd, DB2PreparedSt @Override void encode(DB2Encoder encoder) { if (params.isEmpty()) { - completionHandler.handle(CommandResponse.failure("Can not execute batch query with 0 sets of batch parameters.")); + fireCommandFailure(VertxException.noStackTrace("Can not execute batch query with 0 sets of batch parameters.")); return; } @@ -88,7 +89,7 @@ void decodeQuery(ByteBuf payload) { hasMoreResults &= !queryComplete; handleQueryResult(decoder); } - completionHandler.handle(CommandResponse.success(hasMoreResults)); + fireCommandSuccess(hasMoreResults); } void decodeUpdate(ByteBuf payload) { @@ -99,7 +100,7 @@ void decodeUpdate(ByteBuf payload) { if (cmd.autoCommit()) { updateResponse.readLocalCommit(); } - completionHandler.handle(CommandResponse.success(true)); + fireCommandSuccess(true); } @Override diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryCommandBaseCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryDB2CommandBaseMessage.java similarity index 90% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryCommandBaseCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryDB2CommandBaseMessage.java index 8630d3b685..1ae8a6c28d 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryCommandBaseCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/ExtendedQueryDB2CommandBaseMessage.java @@ -19,6 +19,7 @@ import java.util.stream.Collector; import io.netty.buffer.ByteBuf; +import io.vertx.core.VertxException; import io.vertx.core.buffer.Buffer; import io.vertx.core.internal.buffer.BufferInternal; import io.vertx.db2client.impl.codec.DB2PreparedStatement.QueryInstance; @@ -30,12 +31,12 @@ import io.vertx.sqlclient.codec.CommandResponse; import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; -abstract class ExtendedQueryCommandBaseCodec> - extends QueryCommandBaseCodec { +abstract class ExtendedQueryDB2CommandBaseMessage> + extends QueryDB2CommandBaseMessage { final DB2PreparedStatement statement; - ExtendedQueryCommandBaseCodec(C cmd, DB2PreparedStatement statement) { + ExtendedQueryDB2CommandBaseMessage(C cmd, DB2PreparedStatement statement) { super(cmd); this.statement = statement; this.columnDefinitions = statement.rowDesc.columnDefinitions(); @@ -44,8 +45,8 @@ abstract class ExtendedQueryCommandBaseCodec extends ExtendedQueryCommandBaseCodec> { +public class ExtendedQueryDB2CommandMessage extends ExtendedQueryDB2CommandBaseMessage> { final QueryInstance queryInstance; - public ExtendedQueryCommandCodec(ExtendedQueryCommand cmd, DB2PreparedStatement statement) { + public ExtendedQueryDB2CommandMessage(ExtendedQueryCommand cmd, DB2PreparedStatement statement) { super(cmd, statement); queryInstance = statement.getQueryInstance(cmd.cursorId()); } @@ -49,7 +49,7 @@ void decodeQuery(ByteBuf payload) { RowResultDecoder decoder = decodePreparedQuery(payload, resp, queryInstance); boolean hasMoreResults = !decoder.isQueryComplete(); handleQueryResult(decoder); - completionHandler.handle(CommandResponse.success(hasMoreResults)); + fireCommandSuccess(hasMoreResults); } void decodeUpdate(ByteBuf payload) { @@ -58,7 +58,7 @@ void decodeUpdate(ByteBuf payload) { if (cmd.autoCommit()) { updateResponse.readLocalCommit(); } - completionHandler.handle(CommandResponse.success(true)); + fireCommandSuccess(true); } @Override diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/InitialHandshakeCommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/InitialHandshakeDB2CommandMessage.java similarity index 78% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/InitialHandshakeCommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/InitialHandshakeDB2CommandMessage.java index 7b7882c4bc..9c4fb6e724 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/InitialHandshakeCommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/InitialHandshakeDB2CommandMessage.java @@ -26,7 +26,6 @@ import io.vertx.db2client.impl.drda.SQLState; import io.vertx.db2client.impl.drda.SqlCode; import io.vertx.sqlclient.spi.connection.Connection; -import io.vertx.sqlclient.codec.CommandResponse; /** * InitialHandshakeCommandCodec encodes the packets to get a connection from the database. @@ -35,17 +34,11 @@ * always EBCDIC, but the second two are EBCDIC for Db2 LUW and UTF8 for Db2/z * */ -class InitialHandshakeCommandCodec extends AuthenticationCommandBaseCodec { - - private static enum ConnectionState { - CONNECTING, AUTHENTICATING, CONNECTED, CONNECT_FAILED - } +class InitialHandshakeDB2CommandMessage extends AuthenticationDB2CommandBaseMessage { private static final int TARGET_SECURITY_MEASURE = DRDAConstants.SECMEC_USRIDPWD; - private ConnectionState status = ConnectionState.CONNECTING; - - InitialHandshakeCommandCodec(InitialHandshakeCommand cmd) { + InitialHandshakeDB2CommandMessage(InitialHandshakeCommand cmd) { super(cmd); } @@ -53,16 +46,6 @@ private static enum ConnectionState { void encode(DB2Encoder encoder) { super.encode(encoder); encoder.socketConnection.connMetadata.databaseName = cmd.database(); - encoder.socketConnection.closeHandler(h -> { - if (status == ConnectionState.CONNECTING) { - // Sometimes DB2 closes the connection when sending an invalid Database name. - // -4499 = A fatal error occurred that resulted in a disconnect from the data - // source. - // 08001 = "The connection was unable to be established" - fail(new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED, - SQLState.AUTH_DATABASE_CONNECTION_REFUSED)); - } - }); ByteBuf packet = allocateBuffer(); int packetStartIdx = packet.writerIndex(); @@ -90,7 +73,7 @@ void encode(DB2Encoder encoder) { @Override void decodePayload(ByteBuf payload, int payloadLength) { DRDAConnectResponse response = new DRDAConnectResponse(payload, encoder.socketConnection.connMetadata); - switch (status) { + switch (encoder.socketConnection.status) { case CONNECTING: response.readExchangeServerAttributes(); // readAccessSecurity can throw a DB2Exception if there are problems connecting. @@ -100,7 +83,7 @@ void decodePayload(ByteBuf payload, int payloadLength) { try { response.readAccessSecurity(TARGET_SECURITY_MEASURE); } catch (DB2Exception de) { - status = ConnectionState.CONNECT_FAILED; + encoder.socketConnection.status = ConnectionState.CONNECT_FAILED; throw de; } @@ -116,7 +99,7 @@ void decodePayload(ByteBuf payload, int payloadLength) { int lenOfPayload = packet.writerIndex() - packetStartIdx; sendPacket(packet, lenOfPayload); - status = ConnectionState.AUTHENTICATING; + encoder.socketConnection.status = ConnectionState.AUTHENTICATING; break; case AUTHENTICATING: @@ -125,12 +108,12 @@ void decodePayload(ByteBuf payload, int payloadLength) { if (accData.correlationToken != null) { encoder.socketConnection.connMetadata.correlationToken = accData.correlationToken; } - status = ConnectionState.CONNECTED; - completionHandler.handle(CommandResponse.success(cmd.connection())); + encoder.socketConnection.status = ConnectionState.CONNECTED; + fireCommandSuccess(cmd.connection()); break; default: - fail(new DB2Exception("The connection was unable to be established. Invalid connection state.", SqlCode.CONNECTION_REFUSED, + fireCommandFailure(new DB2Exception("The connection was unable to be established. Invalid connection state.", SqlCode.CONNECTION_REFUSED, SQLState.AUTH_DATABASE_CONNECTION_REFUSED)); break; diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PingCommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PingDB2CommandMessage.java similarity index 92% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PingCommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PingDB2CommandMessage.java index ad9da6cba9..ab012c1367 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PingCommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PingDB2CommandMessage.java @@ -24,12 +24,12 @@ import io.vertx.db2client.impl.drda.ConnectionMetaData; import io.vertx.sqlclient.codec.CommandResponse; -class PingCommandCodec extends CommandCodec { +class PingDB2CommandMessage extends DB2CommandMessage { // Use an isolated metadata instance since we will flow a new EXCSAT private final ConnectionMetaData md = new ConnectionMetaData(); - PingCommandCodec(PingCommand cmd) { + PingDB2CommandMessage(PingCommand cmd) { super(cmd); } @@ -43,8 +43,7 @@ void encode(DB2Encoder encoder) { void decodePayload(ByteBuf payload, int payloadLength) { DRDAConnectResponse response = new DRDAConnectResponse(payload, md); response.readExchangeServerAttributes(); - completionHandler.handle(CommandResponse.success(null)); - return; + fireCommandSuccess(null); } private void sendPingRequest() { diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PrepareStatementCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PrepareStatementDB2CommandMessage.java similarity index 91% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PrepareStatementCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PrepareStatementDB2CommandMessage.java index d5282ecca0..6f5f5bebdf 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PrepareStatementCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/PrepareStatementDB2CommandMessage.java @@ -26,9 +26,9 @@ import io.vertx.sqlclient.codec.CommandResponse; import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand; -class PrepareStatementCodec extends CommandCodec { +class PrepareStatementDB2CommandMessage extends DB2CommandMessage { - private static final Logger LOG = LoggerFactory.getLogger(PrepareStatementCodec.class); + private static final Logger LOG = LoggerFactory.getLogger(PrepareStatementDB2CommandMessage.class); private static enum CommandHandlerState { INIT, HANDLING_PARAM_COLUMN_DEFINITION, @@ -42,7 +42,7 @@ private static enum CommandHandlerState { private ColumnMetaData rowDesc; private Section section; - PrepareStatementCodec(PrepareStatementCommand cmd) { + PrepareStatementDB2CommandMessage(PrepareStatementCommand cmd) { super(cmd); } @@ -88,8 +88,8 @@ void decodePayload(ByteBuf payload, int payloadLength) { } private void handleReadyForQuery() { - completionHandler.handle(CommandResponse.success(new DB2PreparedStatement(cmd.sql(), new DB2ParamDesc(paramDesc), - DB2RowDescriptor.create(rowDesc), section))); + fireCommandSuccess(new DB2PreparedStatement(cmd.sql(), new DB2ParamDesc(paramDesc), + DB2RowDescriptor.create(rowDesc), section)); } private void resetIntermediaryResult() { diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/QueryCommandBaseCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/QueryDB2CommandBaseMessage.java similarity index 94% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/QueryCommandBaseCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/QueryDB2CommandBaseMessage.java index a640b175c6..c356e8302d 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/QueryCommandBaseCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/QueryDB2CommandBaseMessage.java @@ -21,12 +21,12 @@ import io.vertx.sqlclient.internal.RowDescriptorBase; import io.vertx.sqlclient.spi.protocol.QueryCommandBase; -abstract class QueryCommandBaseCodec> extends CommandCodec { +abstract class QueryDB2CommandBaseMessage> extends DB2CommandMessage { protected ColumnMetaData columnDefinitions; protected final boolean isQuery; - QueryCommandBaseCodec(C cmd) { + QueryDB2CommandBaseMessage(C cmd) { super(cmd); this.isQuery = DRDAQueryRequest.isQuery(cmd.sql()); } diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/SimpleQueryCommandCodec.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/SimpleQueryDB2CommandMessage.java similarity index 92% rename from vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/SimpleQueryCommandCodec.java rename to vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/SimpleQueryDB2CommandMessage.java index 305ffa20f3..7dee044457 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/SimpleQueryCommandCodec.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/SimpleQueryDB2CommandMessage.java @@ -26,11 +26,11 @@ import io.vertx.sqlclient.codec.CommandResponse; import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand; -class SimpleQueryCommandCodec extends QueryCommandBaseCodec> { +class SimpleQueryDB2CommandMessage extends QueryDB2CommandBaseMessage> { private Section querySection; - SimpleQueryCommandCodec(SimpleQueryCommand cmd) { + SimpleQueryDB2CommandMessage(SimpleQueryCommand cmd) { super(cmd); } @@ -61,7 +61,7 @@ void decodeUpdate(ByteBuf payload) { if (cmd.autoCommit()) { updateResponse.readLocalCommit(); } - completionHandler.handle(CommandResponse.success(true)); + fireCommandSuccess(true); } static T emptyResult(Collector collector) { @@ -87,7 +87,7 @@ void decodeQuery(ByteBuf payload) { } handleQueryResult(decoder); - completionHandler.handle(CommandResponse.success(true)); + fireCommandSuccess(true); } @Override diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLSocketConnection.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLSocketConnection.java index 4294805a61..07ceb66273 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLSocketConnection.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLSocketConnection.java @@ -27,8 +27,8 @@ import io.vertx.core.spi.metrics.ClientMetrics; import io.vertx.mssqlclient.MSSQLConnectOptions; import io.vertx.mssqlclient.MSSQLInfo; -import io.vertx.mssqlclient.impl.codec.ExtendedQueryCommandBaseCodec; -import io.vertx.mssqlclient.impl.codec.MSSQLCommandCodec; +import io.vertx.mssqlclient.impl.codec.ExtendedQueryMSSQLCommandBaseMessage; +import io.vertx.mssqlclient.impl.codec.MSSQLCommandMessage; import io.vertx.mssqlclient.impl.codec.MSSQLPreparedStatement; import io.vertx.mssqlclient.impl.codec.TdsLoginSentCompletionHandler; import io.vertx.mssqlclient.impl.codec.TdsMessageCodec; @@ -156,12 +156,12 @@ public void init() { @Override protected CommandMessage toMessage(ExtendedQueryCommand command, PreparedStatement preparedStatement) { - return ExtendedQueryCommandBaseCodec.create(command, (MSSQLPreparedStatement)preparedStatement); + return ExtendedQueryMSSQLCommandBaseMessage.create(command, (MSSQLPreparedStatement)preparedStatement); } @Override protected CommandMessage toMessage(CommandBase command) { - return MSSQLCommandCodec.wrap(command); + return MSSQLCommandMessage.wrap(command); } @Override diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseConnectionCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseConnectionMSSQLCommandMessage.java similarity index 78% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseConnectionCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseConnectionMSSQLCommandMessage.java index dedad18d8c..b6c9427a9a 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseConnectionCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseConnectionMSSQLCommandMessage.java @@ -13,8 +13,8 @@ import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand; -class CloseConnectionCommandCodec extends MSSQLCommandCodec { - CloseConnectionCommandCodec(CloseConnectionCommand cmd) { +class CloseConnectionMSSQLCommandMessage extends MSSQLCommandMessage { + CloseConnectionMSSQLCommandMessage(CloseConnectionCommand cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorMSSQLCommandMessage.java similarity index 93% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorMSSQLCommandMessage.java index 4e23a13234..d15c03d202 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseCursorMSSQLCommandMessage.java @@ -18,12 +18,12 @@ import static io.vertx.mssqlclient.impl.codec.DataType.INTN; import static io.vertx.mssqlclient.impl.codec.MessageType.RPC; -public class CloseCursorCommandCodec extends MSSQLCommandCodec { +public class CloseCursorMSSQLCommandMessage extends MSSQLCommandMessage { private CursorData cursorData; private boolean cursorClosed; - public CloseCursorCommandCodec(CloseCursorCommand cmd) { + public CloseCursorMSSQLCommandMessage(CloseCursorCommand cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementMSSQLCommandMessage.java similarity index 90% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementMSSQLCommandMessage.java index 63a28bdf14..91c718754c 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/CloseStatementMSSQLCommandMessage.java @@ -18,9 +18,9 @@ import static io.vertx.mssqlclient.impl.codec.DataType.INTN; import static io.vertx.mssqlclient.impl.codec.MessageType.RPC; -class CloseStatementCommandCodec extends MSSQLCommandCodec { +class CloseStatementMSSQLCommandMessage extends MSSQLCommandMessage { - CloseStatementCommandCodec(CloseStatementCommand cmd) { + CloseStatementMSSQLCommandMessage(CloseStatementCommand cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryMSSQLCommandMessage.java similarity index 90% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryMSSQLCommandMessage.java index abeb8b444a..0793948f09 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedBatchQueryMSSQLCommandMessage.java @@ -18,14 +18,14 @@ import java.util.List; -class ExtendedBatchQueryCommandCodec extends ExtendedQueryCommandBaseCodec { +class ExtendedBatchQueryMSSQLCommandMessage extends ExtendedQueryMSSQLCommandBaseMessage { private final List paramsList; private int paramsIdx; private int messageDecoded; - ExtendedBatchQueryCommandCodec(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { + ExtendedBatchQueryMSSQLCommandMessage(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { super(cmd, ps); paramsList = cmd.paramsList(); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedCursorQueryCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedCursorQueryMSSQLCommandMessage.java similarity index 96% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedCursorQueryCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedCursorQueryMSSQLCommandMessage.java index 236be34d43..197e911d6d 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedCursorQueryCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedCursorQueryMSSQLCommandMessage.java @@ -19,11 +19,11 @@ import static io.vertx.mssqlclient.impl.codec.DataType.NVARCHAR; import static io.vertx.mssqlclient.impl.codec.MessageType.RPC; -class ExtendedCursorQueryCommandCodec extends ExtendedQueryCommandBaseCodec { +class ExtendedCursorQueryMSSQLCommandMessage extends ExtendedQueryMSSQLCommandBaseMessage { private CursorData cursorData; - ExtendedCursorQueryCommandCodec(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { + ExtendedCursorQueryMSSQLCommandMessage(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { super(cmd, ps); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryCommandBaseCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryMSSQLCommandBaseMessage.java similarity index 88% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryCommandBaseCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryMSSQLCommandBaseMessage.java index dd1d902cb9..4009553950 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryCommandBaseCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryMSSQLCommandBaseMessage.java @@ -19,22 +19,22 @@ import static io.vertx.mssqlclient.impl.codec.DataType.*; import static io.vertx.mssqlclient.impl.codec.MessageType.RPC; -public abstract class ExtendedQueryCommandBaseCodec extends QueryCommandBaseCodec> { +public abstract class ExtendedQueryMSSQLCommandBaseMessage extends QueryMSSQLCommandBaseMessage> { final MSSQLPreparedStatement ps; - public ExtendedQueryCommandBaseCodec(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { + public ExtendedQueryMSSQLCommandBaseMessage(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { super(cmd); this.ps = ps; } - public static MSSQLCommandCodec create(ExtendedQueryCommand queryCmd, MSSQLPreparedStatement ps) { + public static MSSQLCommandMessage create(ExtendedQueryCommand queryCmd, MSSQLPreparedStatement ps) { if (queryCmd.isBatch()) { - return new ExtendedBatchQueryCommandCodec<>(queryCmd, ps); + return new ExtendedBatchQueryMSSQLCommandMessage<>(queryCmd, ps); } else if (queryCmd.cursorId() != null) { - return new ExtendedCursorQueryCommandCodec<>(queryCmd, ps); + return new ExtendedCursorQueryMSSQLCommandMessage<>(queryCmd, ps); } else { - return new ExtendedQueryCommandCodec<>(queryCmd, ps); + return new ExtendedQueryMSSQLCommandMessage<>(queryCmd, ps); } } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryMSSQLCommandMessage.java similarity index 80% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryMSSQLCommandMessage.java index bb1986f683..664436a183 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/ExtendedQueryMSSQLCommandMessage.java @@ -14,9 +14,9 @@ import io.vertx.sqlclient.internal.TupleBase; import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; -class ExtendedQueryCommandCodec extends ExtendedQueryCommandBaseCodec { +class ExtendedQueryMSSQLCommandMessage extends ExtendedQueryMSSQLCommandBaseMessage { - ExtendedQueryCommandCodec(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { + ExtendedQueryMSSQLCommandMessage(ExtendedQueryCommand cmd, MSSQLPreparedStatement ps) { super(cmd, ps); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/InitCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/InitMSSQLCommandMessage.java similarity index 98% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/InitCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/InitMSSQLCommandMessage.java index 6cdda99e5f..e3bee07a94 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/InitCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/InitMSSQLCommandMessage.java @@ -27,11 +27,11 @@ import static java.nio.charset.StandardCharsets.UTF_16LE; import static java.util.Locale.ENGLISH; -class InitCommandCodec extends MSSQLCommandCodec { +class InitMSSQLCommandMessage extends MSSQLCommandMessage { static final Object LOGIN_SENT = new Object(); - InitCommandCodec(InitCommand cmd) { + InitMSSQLCommandMessage(InitCommand cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLCommandMessage.java similarity index 91% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLCommandMessage.java index d17c03bc45..5de708baa4 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLCommandMessage.java @@ -32,32 +32,32 @@ import static io.vertx.mssqlclient.impl.utils.ByteBufUtils.readUnsignedByteLengthString; import static io.vertx.mssqlclient.impl.utils.ByteBufUtils.readUnsignedShortLengthString; -public abstract class MSSQLCommandCodec> extends CommandMessage { +public abstract class MSSQLCommandMessage> extends CommandMessage { public TdsMessageCodec tdsMessageCodec; public MSSQLException failure; public R result; - MSSQLCommandCodec(C cmd) { + MSSQLCommandMessage(C cmd) { super(cmd); } - public static MSSQLCommandCodec wrap(CommandBase cmd) { + public static MSSQLCommandMessage wrap(CommandBase cmd) { if (cmd instanceof PreLoginCommand) { - return new PreLoginCommandCodec((PreLoginCommand) cmd); + return new PreLoginMSSQLCommandMessage((PreLoginCommand) cmd); } else if (cmd instanceof InitCommand) { - return new InitCommandCodec((InitCommand) cmd); + return new InitMSSQLCommandMessage((InitCommand) cmd); } else if (cmd instanceof SimpleQueryCommand) { - return new SQLBatchCommandCodec<>((SimpleQueryCommand) cmd); + return new SQLBatchMSSQLCommandMessage<>((SimpleQueryCommand) cmd); } else if (cmd instanceof PrepareStatementCommand) { - return new PrepareStatementCodec((PrepareStatementCommand) cmd); + return new PrepareStatementMSSQLMessage((PrepareStatementCommand) cmd); } else if (cmd instanceof CloseStatementCommand) { - return new CloseStatementCommandCodec((CloseStatementCommand) cmd); + return new CloseStatementMSSQLCommandMessage((CloseStatementCommand) cmd); } else if (cmd == CloseConnectionCommand.INSTANCE) { - return new CloseConnectionCommandCodec((CloseConnectionCommand) cmd); + return new CloseConnectionMSSQLCommandMessage((CloseConnectionCommand) cmd); } else if (cmd instanceof CloseCursorCommand) { - return new CloseCursorCommandCodec((CloseCursorCommand) cmd); + return new CloseCursorMSSQLCommandMessage((CloseCursorCommand) cmd); } else { throw new UnsupportedOperationException(); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PreLoginCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PreLoginMSSQLCommandMessage.java similarity index 95% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PreLoginCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PreLoginMSSQLCommandMessage.java index 00158f50d1..b9541d9d45 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PreLoginCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PreLoginMSSQLCommandMessage.java @@ -21,13 +21,13 @@ import static io.vertx.mssqlclient.impl.codec.EncryptionLevel.ENCRYPT_ON; import static io.vertx.mssqlclient.impl.codec.MessageType.PRE_LOGIN; -class PreLoginCommandCodec extends MSSQLCommandCodec { +class PreLoginMSSQLCommandMessage extends MSSQLCommandMessage { private static final int VERSION = 0x00; private static final int ENCRYPTION = 0x01; private static final int TERMINATOR = 0xFF; - PreLoginCommandCodec(PreLoginCommand cmd) { + PreLoginMSSQLCommandMessage(PreLoginCommand cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PrepareStatementCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PrepareStatementMSSQLMessage.java similarity index 84% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PrepareStatementCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PrepareStatementMSSQLMessage.java index 73e7365ff0..72d10dcb6e 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PrepareStatementCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/PrepareStatementMSSQLMessage.java @@ -15,8 +15,8 @@ import io.vertx.sqlclient.codec.CommandResponse; import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand; -class PrepareStatementCodec extends MSSQLCommandCodec { - PrepareStatementCodec(PrepareStatementCommand cmd) { +class PrepareStatementMSSQLMessage extends MSSQLCommandMessage { + PrepareStatementMSSQLMessage(PrepareStatementCommand cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/QueryCommandBaseCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/QueryMSSQLCommandBaseMessage.java similarity index 92% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/QueryCommandBaseCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/QueryMSSQLCommandBaseMessage.java index 50f03bd093..f3eb1b2e4b 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/QueryCommandBaseCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/QueryMSSQLCommandBaseMessage.java @@ -18,12 +18,12 @@ import java.util.stream.Collector; -abstract class QueryCommandBaseCodec> extends MSSQLCommandCodec { +abstract class QueryMSSQLCommandBaseMessage> extends MSSQLCommandMessage { protected int rowCount; protected RowResultDecoder rowResultDecoder; - QueryCommandBaseCodec(C cmd) { + QueryMSSQLCommandBaseMessage(C cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/SQLBatchCommandCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/SQLBatchMSSQLCommandMessage.java similarity index 87% rename from vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/SQLBatchCommandCodec.java rename to vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/SQLBatchMSSQLCommandMessage.java index fb3aa56d10..ae42d67f3b 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/SQLBatchCommandCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/SQLBatchMSSQLCommandMessage.java @@ -18,8 +18,8 @@ import static io.vertx.mssqlclient.impl.codec.MessageType.SQL_BATCH; -class SQLBatchCommandCodec extends QueryCommandBaseCodec> { - SQLBatchCommandCodec(SimpleQueryCommand cmd) { +class SQLBatchMSSQLCommandMessage extends QueryMSSQLCommandBaseMessage> { + SQLBatchMSSQLCommandMessage(SimpleQueryCommand cmd) { super(cmd); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsLoginSentCompletionHandler.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsLoginSentCompletionHandler.java index 3fa6ad8139..1aafe7df7a 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsLoginSentCompletionHandler.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsLoginSentCompletionHandler.java @@ -21,7 +21,7 @@ /** * A channel handler which listens to the {@code LOGIN_SENT} user event. *

- * This event is triggered by the {@link InitCommandCodec} after the login packet has been sent to the server. + * This event is triggered by the {@link InitMSSQLCommandMessage} after the login packet has been sent to the server. * This handler removes the {@link SslHandler} from the pipeline if the client and the server agreed to encrypt the login packet only. * In this case, further traffic (including the login response) will be unencrypted. */ @@ -44,7 +44,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt == InitCommandCodec.LOGIN_SENT) { + if (evt == InitMSSQLCommandMessage.LOGIN_SENT) { pipeline.remove(this); if (encryptionLevel == ENCRYPT_OFF) { pipeline.remove(sslHandler); diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageCodec.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageCodec.java index 10c217cd49..6be931b368 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageCodec.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageCodec.java @@ -14,7 +14,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.CombinedChannelDuplexHandler; -import io.vertx.core.Completable; import io.vertx.sqlclient.ClosedConnectionException; import java.util.ArrayDeque; @@ -24,7 +23,7 @@ public class TdsMessageCodec extends CombinedChannelDuplexHandler { - private final ArrayDeque> inflight = new ArrayDeque<>(); + private final ArrayDeque> inflight = new ArrayDeque<>(); private final TdsMessageEncoder encoder; private final TdsMessageDecoder decoder; @@ -32,7 +31,6 @@ public class TdsMessageCodec extends CombinedChannelDuplexHandler cursorDataMap; - private Throwable failure; public TdsMessageCodec(int desiredPacketSize) { decoder = new TdsMessageDecoder(this); @@ -47,33 +45,6 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { alloc = chctx.alloc(); } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - fail(cause); - super.exceptionCaught(ctx, cause); - } - - private void fail(Throwable cause) { - if (failure == null) { - failure = cause; - for (Iterator> it = inflight.iterator(); it.hasNext(); ) { - MSSQLCommandCodec codec = it.next(); - it.remove(); - fail(codec, cause); - } - } - } - - private void fail(MSSQLCommandCodec codec, Throwable cause) { - codec.fail(cause); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - fail(ClosedConnectionException.INSTANCE); - super.channelInactive(ctx); - } - TdsMessageEncoder encoder() { return encoder; } @@ -98,22 +69,16 @@ void setTransactionDescriptor(long transactionDescriptor) { this.transactionDescriptor = transactionDescriptor; } - MSSQLCommandCodec peek() { + MSSQLCommandMessage peek() { return inflight.peek(); } - MSSQLCommandCodec poll() { + MSSQLCommandMessage poll() { return inflight.poll(); } - boolean add(MSSQLCommandCodec codec) { - if (failure == null) { - inflight.add(codec); - return true; - } else { - fail(codec, failure); - return false; - } + void add(MSSQLCommandMessage codec) { + inflight.add(codec); } CursorData getOrCreateCursorData(String cursorId) { diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageDecoder.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageDecoder.java index b34eb02f58..2638b0b9cf 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageDecoder.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageDecoder.java @@ -54,8 +54,7 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } void fireCommandResponse(CommandResponse commandResponse) { - MSSQLCommandCodec c = tdsMessageCodec.poll(); - commandResponse.handler = (Completable) c.handler; + MSSQLCommandMessage c = tdsMessageCodec.poll(); chctx.fireChannelRead(commandResponse); } @@ -68,7 +67,7 @@ private void releaseMessage() { private void decodeMessage() { try { - MSSQLCommandCodec commandCodec = tdsMessageCodec.peek(); + MSSQLCommandMessage commandCodec = tdsMessageCodec.peek(); if (commandCodec == null) { throw new IllegalStateException("No command codec for message of type [" + message.type() + "]"); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageEncoder.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageEncoder.java index 87eb805aa3..f5c23f46f9 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageEncoder.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/TdsMessageEncoder.java @@ -40,8 +40,8 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof MSSQLCommandCodec) { - MSSQLCommandCodec cmd = (MSSQLCommandCodec) msg; + if (msg instanceof MSSQLCommandMessage) { + MSSQLCommandMessage cmd = (MSSQLCommandMessage) msg; cmd.tdsMessageCodec = tdsMessageCodec; write(cmd); } else { @@ -49,10 +49,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - void write(MSSQLCommandCodec cmd) { - if (tdsMessageCodec.add(cmd)) { - cmd.encode(); - } + void write(MSSQLCommandMessage cmd) { + tdsMessageCodec.add(cmd); + cmd.encode(); } public int packetSize() { diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLSocketConnection.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLSocketConnection.java index 3d96849aed..60c102addc 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLSocketConnection.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLSocketConnection.java @@ -28,9 +28,9 @@ import io.vertx.mysqlclient.MySQLConnectOptions; import io.vertx.mysqlclient.SslMode; import io.vertx.mysqlclient.impl.codec.ClearCachedStatementsEvent; -import io.vertx.mysqlclient.impl.codec.CommandCodec; -import io.vertx.mysqlclient.impl.codec.ExtendedBatchQueryCommandCodec; -import io.vertx.mysqlclient.impl.codec.ExtendedQueryCommandCodec; +import io.vertx.mysqlclient.impl.codec.MySQLCommand; +import io.vertx.mysqlclient.impl.codec.ExtendedBatchQueryMySQLCommand; +import io.vertx.mysqlclient.impl.codec.ExtendedQueryMySQLCommand; import io.vertx.mysqlclient.impl.codec.MySQLCodec; import io.vertx.mysqlclient.impl.codec.MySQLPacketDecoder; import io.vertx.mysqlclient.impl.codec.MySQLPreparedStatement; @@ -105,15 +105,15 @@ public void init() { @Override protected CommandMessage toMessage(ExtendedQueryCommand command, PreparedStatement preparedStatement) { if (command.isBatch()) { - return new ExtendedBatchQueryCommandCodec<>(command, (MySQLPreparedStatement) preparedStatement); + return new ExtendedBatchQueryMySQLCommand<>(command, (MySQLPreparedStatement) preparedStatement); } else { - return new ExtendedQueryCommandCodec<>(command, (MySQLPreparedStatement) preparedStatement); + return new ExtendedQueryMySQLCommand<>(command, (MySQLPreparedStatement) preparedStatement); } } @Override protected CommandMessage toMessage(CommandBase command) { - return CommandCodec.wrap(command); + return MySQLCommand.wrap(command); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/AuthenticationCommandBaseCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/AuthenticationMySQLCommandBase.java similarity index 96% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/AuthenticationCommandBaseCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/AuthenticationMySQLCommandBase.java index 2fa894a492..e74983510b 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/AuthenticationCommandBaseCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/AuthenticationMySQLCommandBase.java @@ -22,7 +22,7 @@ import java.util.Arrays; import java.util.Map; -abstract class AuthenticationCommandBaseCodec> extends CommandCodec { +abstract class AuthenticationMySQLCommandBase> extends MySQLCommand { protected static final int NONCE_LENGTH = 20; protected static final int AUTH_SWITCH_REQUEST_STATUS_FLAG = 0xFE; @@ -35,7 +35,7 @@ abstract class AuthenticationCommandBaseCodec { - ChangeUserCommandCodec(ChangeUserCommand cmd) { +class ChangeUserMySQLCommand extends AuthenticationMySQLCommandBase { + ChangeUserMySQLCommand(ChangeUserCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseConnectionCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseConnectionMySQLCommand.java similarity index 90% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseConnectionCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseConnectionMySQLCommand.java index c26b618b05..c2fcc684e0 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseConnectionCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseConnectionMySQLCommand.java @@ -15,10 +15,10 @@ import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand; -class CloseConnectionCommandCodec extends CommandCodec { +class CloseConnectionMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 1; - CloseConnectionCommandCodec(CloseConnectionCommand cmd) { + CloseConnectionMySQLCommand(CloseConnectionCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseStatementCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseStatementMySQLCommand.java similarity index 93% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseStatementCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseStatementMySQLCommand.java index 2801b19159..c2c802aff8 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseStatementCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CloseStatementMySQLCommand.java @@ -16,10 +16,10 @@ import io.vertx.sqlclient.spi.protocol.CloseStatementCommand; import io.vertx.sqlclient.codec.CommandResponse; -class CloseStatementCommandCodec extends CommandCodec { +class CloseStatementMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 5; - CloseStatementCommandCodec(CloseStatementCommand cmd) { + CloseStatementMySQLCommand(CloseStatementCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/DebugCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/DebugMySQLCommand.java similarity index 92% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/DebugCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/DebugMySQLCommand.java index 5a3cb22b0b..313db7b8e3 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/DebugCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/DebugMySQLCommand.java @@ -15,10 +15,10 @@ import io.vertx.mysqlclient.impl.command.DebugCommand; import io.vertx.mysqlclient.impl.protocol.CommandType; -class DebugCommandCodec extends CommandCodec { +class DebugMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 1; - DebugCommandCodec(DebugCommand cmd) { + DebugMySQLCommand(DebugCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedBatchQueryMySQLCommand.java similarity index 95% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedBatchQueryMySQLCommand.java index f7a17fb8f2..9635adf037 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedBatchQueryCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedBatchQueryMySQLCommand.java @@ -25,7 +25,7 @@ import static io.vertx.mysqlclient.impl.protocol.Packets.EnumCursorType.*; -public class ExtendedBatchQueryCommandCodec extends ExtendedQueryCommandBaseCodec> { +public class ExtendedBatchQueryMySQLCommand extends ExtendedQueryMySQLCommandBase> { private final List params; private final BitSet bindingFailures; @@ -33,7 +33,7 @@ public class ExtendedBatchQueryCommandCodec extends ExtendedQueryCommandBaseC private int sent; private int received; - public ExtendedBatchQueryCommandCodec(ExtendedQueryCommand cmd, MySQLPreparedStatement statement) { + public ExtendedBatchQueryMySQLCommand(ExtendedQueryCommand cmd, MySQLPreparedStatement statement) { super(cmd, statement); params = cmd.paramsList(); bindingFailures = new BitSet(params.size()); diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommand.java similarity index 96% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommand.java index 619fa52815..093a19233f 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommand.java @@ -26,8 +26,8 @@ import static io.vertx.mysqlclient.impl.protocol.Packets.EnumCursorType.CURSOR_TYPE_NO_CURSOR; import static io.vertx.mysqlclient.impl.protocol.Packets.EnumCursorType.CURSOR_TYPE_READ_ONLY; -public class ExtendedQueryCommandCodec extends ExtendedQueryCommandBaseCodec> { - public ExtendedQueryCommandCodec(ExtendedQueryCommand cmd, MySQLPreparedStatement statement) { +public class ExtendedQueryMySQLCommand extends ExtendedQueryMySQLCommandBase> { + public ExtendedQueryMySQLCommand(ExtendedQueryCommand cmd, MySQLPreparedStatement statement) { super(cmd, statement); if (cmd.fetch() > 0 && statement.isCursorOpen) { // restore the state we need for decoding fetch response based on the prepared statement diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandBaseCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommandBase.java similarity index 95% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandBaseCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommandBase.java index 27677e206b..781b7c4ab2 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandBaseCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommandBase.java @@ -21,11 +21,11 @@ import static io.vertx.mysqlclient.impl.protocol.Packets.*; -abstract class ExtendedQueryCommandBaseCodec> extends QueryCommandBaseCodec { +abstract class ExtendedQueryMySQLCommandBase> extends QueryMySQLCommandBase { protected final MySQLPreparedStatement statement; - ExtendedQueryCommandBaseCodec(C cmd, MySQLPreparedStatement statement) { + ExtendedQueryMySQLCommandBase(C cmd, MySQLPreparedStatement statement) { super(cmd, DataFormat.BINARY); this.statement = statement; } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitDbCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitDbMySQLCommand.java similarity index 93% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitDbCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitDbMySQLCommand.java index a71d5f13e2..6949937e71 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitDbCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitDbMySQLCommand.java @@ -17,8 +17,8 @@ import java.nio.charset.StandardCharsets; -class InitDbCommandCodec extends CommandCodec { - InitDbCommandCodec(InitDbCommand cmd) { +class InitDbMySQLCommand extends MySQLCommand { + InitDbMySQLCommand(InitDbCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeMySQLCommand.java similarity index 98% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeMySQLCommand.java index fd6ad21524..95f94c9515 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeMySQLCommand.java @@ -36,9 +36,9 @@ import static io.vertx.mysqlclient.impl.protocol.CapabilitiesFlag.*; import static io.vertx.mysqlclient.impl.protocol.Packets.*; -class InitialHandshakeCommandCodec extends AuthenticationCommandBaseCodec { +class InitialHandshakeMySQLCommand extends AuthenticationMySQLCommandBase { - private static final Logger LOGGER = LoggerFactory.getLogger(InitialHandshakeCommandCodec.class); + private static final Logger LOGGER = LoggerFactory.getLogger(InitialHandshakeMySQLCommand.class); private static final int AUTH_PLUGIN_DATA_PART1_LENGTH = 8; @@ -48,7 +48,7 @@ class InitialHandshakeCommandCodec extends AuthenticationCommandBaseCodec { - private final ArrayDeque> inflight; - private ChannelHandlerContext chctx; - private Throwable failure; + private final ArrayDeque> inflight; public MySQLCodec(MySQLSocketConnection mySQLSocketConnection) { inflight = new ArrayDeque<>(); @@ -37,59 +35,26 @@ public MySQLCodec(MySQLSocketConnection mySQLSocketConnection) { init(decoder, encoder); } - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - chctx = ctx; - super.handlerAdded(ctx); + public void add(MySQLCommand codec) { + inflight.add(codec); } - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - clearInflightCommands(ClosedConnectionException.INSTANCE); - super.channelInactive(ctx); - } - - public boolean add(CommandCodec codec) { - if (failure == null) { - inflight.add(codec); - return true; - } else { - fail(codec, failure); - return false; - } - } - - public CommandCodec poll() { + public MySQLCommand poll() { return inflight.poll(); } - public CommandCodec peek() { + public MySQLCommand peek() { return inflight.peek(); } - private void clearInflightCommands(Throwable cause) { - for (Iterator> it = inflight.iterator(); it.hasNext(); ) { - CommandCodec codec = it.next(); - it.remove(); - fail(codec, cause); - } - } - - private void fail(CommandCodec codec, Throwable cause) { - if (failure == null) { - failure = cause; - codec.fail(cause); - } - } - /** * check the pending command queue and complete handling the command directly if the command request will not receive a server response */ void checkFireAndForgetCommands() { // check if there is any completed command - CommandCodec commandCodec; - while ((commandCodec = inflight.peek()) != null && commandCodec.expectNoResponsePacket()) { - commandCodec.decodePayload(null, 0); + MySQLCommand commandMsg; + while ((commandMsg = inflight.peek()) != null && commandMsg.expectNoResponsePacket()) { + commandMsg.decodePayload(null, 0); } } } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLCommand.java similarity index 90% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLCommand.java index 0449c71938..f5d7900ea7 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/CommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLCommand.java @@ -44,44 +44,44 @@ import static io.vertx.mysqlclient.impl.protocol.Packets.*; -public abstract class CommandCodec> extends CommandMessage { +public abstract class MySQLCommand> extends CommandMessage { public Throwable failure; public R result; MySQLEncoder encoder; int sequenceId; - CommandCodec(C cmd) { + MySQLCommand(C cmd) { super(cmd); } - public static CommandCodec wrap(CommandBase cmd) { + public static MySQLCommand wrap(CommandBase cmd) { if (cmd instanceof InitialHandshakeCommand) { - return new InitialHandshakeCommandCodec((InitialHandshakeCommand) cmd); + return new InitialHandshakeMySQLCommand((InitialHandshakeCommand) cmd); } else if (cmd instanceof SimpleQueryCommand) { - return new SimpleQueryCommandCodec<>((SimpleQueryCommand) cmd); + return new SimpleQueryMySQLCommand<>((SimpleQueryCommand) cmd); } else if (cmd instanceof CloseConnectionCommand) { - return new CloseConnectionCommandCodec((CloseConnectionCommand) cmd); + return new CloseConnectionMySQLCommand((CloseConnectionCommand) cmd); } else if (cmd instanceof PrepareStatementCommand) { - return new PrepareStatementCodec((PrepareStatementCommand) cmd); + return new PrepareStatementMySQLCommand((PrepareStatementCommand) cmd); } else if (cmd instanceof CloseStatementCommand) { - return new CloseStatementCommandCodec((CloseStatementCommand) cmd); + return new CloseStatementMySQLCommand((CloseStatementCommand) cmd); } else if (cmd instanceof CloseCursorCommand) { - return new ResetStatementCommandCodec((CloseCursorCommand) cmd); + return new ResetStatementMySQLCommand((CloseCursorCommand) cmd); } else if (cmd instanceof PingCommand) { - return new PingCommandCodec((PingCommand) cmd); + return new PingMySQLCommand((PingCommand) cmd); } else if (cmd instanceof InitDbCommand) { - return new InitDbCommandCodec((InitDbCommand) cmd); + return new InitDbMySQLCommand((InitDbCommand) cmd); } else if (cmd instanceof StatisticsCommand) { - return new StatisticsCommandCodec((StatisticsCommand) cmd); + return new StatisticsMySQLCommand((StatisticsCommand) cmd); } else if (cmd instanceof SetOptionCommand) { - return new SetOptionCommandCodec((SetOptionCommand) cmd); + return new SetOptionMySQLCommand((SetOptionCommand) cmd); } else if (cmd instanceof ResetConnectionCommand) { - return new ResetConnectionCommandCodec((ResetConnectionCommand) cmd); + return new ResetConnectionMySQLCommand((ResetConnectionCommand) cmd); } else if (cmd instanceof DebugCommand) { - return new DebugCommandCodec((DebugCommand) cmd); + return new DebugMySQLCommand((DebugCommand) cmd); } else if (cmd instanceof ChangeUserCommand) { - return new ChangeUserCommandCodec((ChangeUserCommand) cmd); + return new ChangeUserMySQLCommand((ChangeUserCommand) cmd); } else { System.out.println("Unsupported command " + cmd); throw new UnsupportedOperationException("Todo"); diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java index 752d24396a..87afc0d7f1 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java @@ -61,7 +61,7 @@ private void releaseMessage() { private void decodePackets() { try { codec.checkFireAndForgetCommands(); - CommandCodec ctx = codec.peek(); + MySQLCommand ctx = codec.peek(); if (ctx == null) { throw new IllegalStateException("No command codec for packet"); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLEncoder.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLEncoder.java index 02ca755adb..fc5b323a2a 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLEncoder.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLEncoder.java @@ -14,7 +14,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; -import io.vertx.core.Completable; import io.vertx.mysqlclient.impl.MySQLSocketConnection; import io.vertx.sqlclient.codec.CommandResponse; @@ -41,8 +40,8 @@ public void handlerAdded(ChannelHandlerContext ctx) { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof CommandCodec) { - CommandCodec cmd = (CommandCodec) msg; + if (msg instanceof MySQLCommand) { + MySQLCommand cmd = (MySQLCommand) msg; write(cmd); codec.checkFireAndForgetCommands(); } else { @@ -50,15 +49,13 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - void write(CommandCodec cmd) { - if (codec.add(cmd)) { - cmd.encode(this); - } + void write(MySQLCommand cmd) { + codec.add(cmd); + cmd.encode(this); } final void fireCommandResponse(CommandResponse commandResponse) { - CommandCodec c = codec.poll(); - commandResponse.handler = (Completable) c.handler; + codec.poll(); chctx.fireChannelRead(commandResponse); } } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PingCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PingMySQLCommand.java similarity index 92% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PingCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PingMySQLCommand.java index ee096ea30e..6f5e53adc8 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PingCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PingMySQLCommand.java @@ -16,10 +16,10 @@ import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.codec.CommandResponse; -class PingCommandCodec extends CommandCodec { +class PingMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 1; - PingCommandCodec(PingCommand cmd) { + PingMySQLCommand(PingCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementMySQLCommand.java similarity index 97% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementMySQLCommand.java index 1303050053..13ebf33356 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementMySQLCommand.java @@ -28,14 +28,14 @@ import static io.vertx.mysqlclient.impl.protocol.Packets.ERROR_PACKET_HEADER; -class PrepareStatementCodec extends CommandCodec { +class PrepareStatementMySQLCommand extends MySQLCommand { private CommandHandlerState commandHandlerState = CommandHandlerState.INIT; private long statementId; private int processingIndex; private ColumnDefinition[] paramDescs; private ColumnDefinition[] columnDescs; - PrepareStatementCodec(PrepareStatementCommand cmd) { + PrepareStatementMySQLCommand(PrepareStatementCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryMySQLCommandBase.java similarity index 97% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryMySQLCommandBase.java index a410528f85..b956fe49f7 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryMySQLCommandBase.java @@ -31,7 +31,7 @@ import static io.vertx.mysqlclient.impl.protocol.Packets.*; -abstract class QueryCommandBaseCodec> extends CommandCodec { +abstract class QueryMySQLCommandBase> extends MySQLCommand { private final DataFormat format; @@ -40,7 +40,7 @@ abstract class QueryCommandBaseCodec> extends C protected RowResultDecoder decoder; private int currentColumn; - QueryCommandBaseCodec(C cmd, DataFormat format) { + QueryMySQLCommandBase(C cmd, DataFormat format) { super(cmd); this.format = format; } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetConnectionCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetConnectionMySQLCommand.java similarity index 91% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetConnectionCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetConnectionMySQLCommand.java index c9bcefe41e..e7138d69f8 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetConnectionCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetConnectionMySQLCommand.java @@ -15,10 +15,10 @@ import io.vertx.mysqlclient.impl.command.ResetConnectionCommand; import io.vertx.mysqlclient.impl.protocol.CommandType; -class ResetConnectionCommandCodec extends CommandCodec { +class ResetConnectionMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 1; - ResetConnectionCommandCodec(ResetConnectionCommand cmd) { + ResetConnectionMySQLCommand(ResetConnectionCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementMySQLCommand.java similarity index 92% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementMySQLCommand.java index 1e3ed6b5ec..998b6a0d99 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementMySQLCommand.java @@ -15,10 +15,10 @@ import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.spi.protocol.CloseCursorCommand; -class ResetStatementCommandCodec extends CommandCodec { +class ResetStatementMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 5; - ResetStatementCommandCodec(CloseCursorCommand cmd) { + ResetStatementMySQLCommand(CloseCursorCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SetOptionCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SetOptionMySQLCommand.java similarity index 92% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SetOptionCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SetOptionMySQLCommand.java index 054eae8ab7..fa8ff9030a 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SetOptionCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SetOptionMySQLCommand.java @@ -15,10 +15,10 @@ import io.vertx.mysqlclient.impl.command.SetOptionCommand; import io.vertx.mysqlclient.impl.protocol.CommandType; -class SetOptionCommandCodec extends CommandCodec { +class SetOptionMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 3; - SetOptionCommandCodec(SetOptionCommand cmd) { + SetOptionMySQLCommand(SetOptionCommand cmd) { super(cmd); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SimpleQueryCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SimpleQueryMySQLCommand.java similarity index 97% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SimpleQueryCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SimpleQueryMySQLCommand.java index b6a1779878..982c68c710 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SimpleQueryCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/SimpleQueryMySQLCommand.java @@ -30,9 +30,9 @@ import static io.vertx.mysqlclient.impl.protocol.Packets.*; -class SimpleQueryCommandCodec extends QueryCommandBaseCodec> { +class SimpleQueryMySQLCommand extends QueryMySQLCommandBase> { - SimpleQueryCommandCodec(SimpleQueryCommand cmd) { + SimpleQueryMySQLCommand(SimpleQueryCommand cmd) { super(cmd, DataFormat.TEXT); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/StatisticsCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/StatisticsMySQLCommand.java similarity index 92% rename from vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/StatisticsCommandCodec.java rename to vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/StatisticsMySQLCommand.java index 64d3cf554c..89f181ec45 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/StatisticsCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/StatisticsMySQLCommand.java @@ -16,10 +16,10 @@ import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.codec.CommandResponse; -class StatisticsCommandCodec extends CommandCodec { +class StatisticsMySQLCommand extends MySQLCommand { private static final int PAYLOAD_LENGTH = 1; - StatisticsCommandCodec(StatisticsCommand cmd) { + StatisticsMySQLCommand(StatisticsCommand cmd) { super(cmd); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java index 263cdb4dc2..0f332a1356 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java @@ -27,10 +27,10 @@ import io.vertx.core.internal.net.NetSocketInternal; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.PgException; -import io.vertx.pgclient.impl.codec.ExtendedQueryCommandCodec; +import io.vertx.pgclient.impl.codec.ExtendedQueryPgCommandMessage; import io.vertx.pgclient.impl.codec.NoticeResponse; import io.vertx.pgclient.impl.codec.PgCodec; -import io.vertx.pgclient.impl.codec.PgCommandCodec; +import io.vertx.pgclient.impl.codec.PgCommandMessage; import io.vertx.pgclient.impl.codec.TxFailedEvent; import io.vertx.sqlclient.codec.CommandMessage; import io.vertx.sqlclient.codec.SocketConnectionBase; @@ -179,12 +179,12 @@ protected void doSchedule(CommandBase cmd, Completable handler) { @Override protected CommandMessage toMessage(ExtendedQueryCommand command, PreparedStatement preparedStatement) { - return new ExtendedQueryCommandCodec<>((ExtendedQueryCommand) command, preparedStatement); + return new ExtendedQueryPgCommandMessage<>((ExtendedQueryCommand) command, preparedStatement); } @Override protected CommandMessage toMessage(CommandBase command) { - return PgCommandCodec.wrap(command); + return PgCommandMessage.wrap(command); } @Override diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/BindMessage.java similarity index 87% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/BindMessage.java index 50f2ba6cec..ae46cc5894 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/BindMessage.java @@ -20,13 +20,13 @@ /** * @author Emad Alblueshi */ -final class Bind extends OutboundMessage { +final class BindMessage extends OutboundMessage { final byte[] statement; final DataType[] paramTypes; final PgColumnDesc[] resultColumns; - Bind(byte[] statement, DataType[] paramTypes, PgColumnDesc[] resultColumns) { + BindMessage(byte[] statement, DataType[] paramTypes, PgColumnDesc[] resultColumns) { this.statement = statement; this.paramTypes = paramTypes; this.resultColumns = resultColumns; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseConnectionCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseConnectionPgCommandMessage.java similarity index 79% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseConnectionCommandCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseConnectionPgCommandMessage.java index fdec716081..87fd4b292b 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseConnectionCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseConnectionPgCommandMessage.java @@ -18,11 +18,11 @@ import io.vertx.sqlclient.spi.protocol.CloseConnectionCommand; -class CloseConnectionCommandCodec extends PgCommandCodec { +class CloseConnectionPgCommandMessage extends PgCommandMessage { - static final CloseConnectionCommandCodec INSTANCE = new CloseConnectionCommandCodec(); + static final CloseConnectionPgCommandMessage INSTANCE = new CloseConnectionPgCommandMessage(); - private CloseConnectionCommandCodec() { + private CloseConnectionPgCommandMessage() { super(CloseConnectionCommand.INSTANCE); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalPgCommandMessage.java similarity index 86% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalCommandCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalPgCommandMessage.java index 6584c89394..007175e1b4 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalPgCommandMessage.java @@ -18,9 +18,9 @@ import io.vertx.sqlclient.spi.protocol.CloseCursorCommand; -class ClosePortalCommandCodec extends PgCommandCodec { +class ClosePortalPgCommandMessage extends PgCommandMessage { - ClosePortalCommandCodec(CloseCursorCommand cmd) { + ClosePortalPgCommandMessage(CloseCursorCommand cmd) { super(cmd); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseStatementCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseStatementPgCommandMessage.java similarity index 86% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseStatementCommandCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseStatementPgCommandMessage.java index f9aefe8a65..854d999e52 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseStatementCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/CloseStatementPgCommandMessage.java @@ -18,9 +18,9 @@ import io.vertx.sqlclient.spi.protocol.CloseStatementCommand; -class CloseStatementCommandCodec extends PgCommandCodec { +class CloseStatementPgCommandMessage extends PgCommandMessage { - CloseStatementCommandCodec(CloseStatementCommand cmd) { + CloseStatementPgCommandMessage(CloseStatementCommand cmd) { super(cmd); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DescribeMessage.java similarity index 89% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DescribeMessage.java index e8341dc815..ff29201796 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DescribeMessage.java @@ -20,12 +20,12 @@ /** * @author Emad Alblueshi */ -class Describe extends OutboundMessage { +class DescribeMessage extends OutboundMessage { final byte[] statement; final String portal; - Describe(byte[] statement, String portal) { + DescribeMessage(byte[] statement, String portal) { this.statement = statement; this.portal = portal; } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryPgCommandMessage.java similarity index 94% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryCommandCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryPgCommandMessage.java index 2713cb2d9a..5d5673d540 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryPgCommandMessage.java @@ -23,7 +23,7 @@ import io.vertx.sqlclient.codec.CommandResponse; import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; -public class ExtendedQueryCommandCodec> extends QueryCommandBaseCodec { +public class ExtendedQueryPgCommandMessage> extends QueryBasePgCommandMessage { private PgEncoder encoder; @@ -31,7 +31,7 @@ public class ExtendedQueryCommandCodec> ext private PgPreparedStatement ps; - public ExtendedQueryCommandCodec(C cmd, PreparedStatement ps) { + public ExtendedQueryPgCommandMessage(C cmd, PreparedStatement ps) { super(cmd); this.rowDecoder = new RowResultDecoder<>(cmd.collector(), ((PgPreparedStatement)ps).rowDesc()); this.ps = (PgPreparedStatement) ps; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java similarity index 97% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitCommandCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java index 9bcb563818..4f1c11bce5 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/InitPgCommandMessage.java @@ -29,13 +29,13 @@ import io.vertx.sqlclient.codec.CommandResponse; import io.vertx.sqlclient.spi.protocol.InitCommand; -class InitCommandCodec extends PgCommandCodec { +class InitPgCommandMessage extends PgCommandMessage { private PgEncoder encoder; private String encoding; private ScramSession scramSession; - InitCommandCodec(InitCommand cmd) { + InitPgCommandMessage(InitCommand cmd) { super(cmd); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCodec.java index e33f27fa85..94fb47c6e9 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCodec.java @@ -16,76 +16,33 @@ */ package io.vertx.pgclient.impl.codec; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.CombinedChannelDuplexHandler; -import io.vertx.sqlclient.ClosedConnectionException; import java.util.ArrayDeque; -import java.util.Iterator; public class PgCodec extends CombinedChannelDuplexHandler { - private final ArrayDeque> inflight = new ArrayDeque<>(); + private final ArrayDeque> inflight; private final PgDecoder decoder; private final PgEncoder encoder; - private ChannelHandlerContext chctx; - private Throwable failure; public PgCodec(boolean useLayer7Proxy) { + inflight = new ArrayDeque<>(); decoder = new PgDecoder(this); encoder = new PgEncoder(useLayer7Proxy, this); init(decoder, encoder); } - boolean add(PgCommandCodec codec) { - if (failure == null) { - codec.decoder = decoder; - inflight.add(codec); - return true; - } else { - fail(codec, failure); - return false; - } + void add(PgCommandMessage codec) { + codec.decoder = decoder; + inflight.add(codec); } - PgCommandCodec peek() { + PgCommandMessage peek() { return inflight.peek(); } - PgCommandCodec poll() { + PgCommandMessage poll() { return inflight.poll(); } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - chctx = ctx; - super.handlerAdded(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - fail(cause); - super.exceptionCaught(ctx, cause); - } - - private void fail(Throwable cause) { - if (failure == null) { - failure = cause; - for (Iterator> it = inflight.iterator(); it.hasNext();) { - PgCommandCodec cmdCodec = it.next(); - it.remove(); - fail(cmdCodec, cause); - } - } - } - - private void fail(PgCommandCodec codec, Throwable cause) { - codec.fail(cause); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - fail(ClosedConnectionException.INSTANCE); - super.channelInactive(ctx); - } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCommandMessage.java similarity index 88% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCommandCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCommandMessage.java index 7bb1849bd9..5ba457b4ab 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCommandMessage.java @@ -32,31 +32,31 @@ import java.util.Arrays; -public abstract class PgCommandCodec> extends CommandMessage { +public abstract class PgCommandMessage> extends CommandMessage { - private static final Logger logger = LoggerFactory.getLogger(PgCommandCodec.class); + private static final Logger logger = LoggerFactory.getLogger(PgCommandMessage.class); PgDecoder decoder; PgException failure; R result; - PgCommandCodec(C cmd) { + PgCommandMessage(C cmd) { super(cmd); } - public static PgCommandCodec wrap(CommandBase cmd) { + public static PgCommandMessage wrap(CommandBase cmd) { if (cmd instanceof InitCommand) { - return new InitCommandCodec((InitCommand) cmd); + return new InitPgCommandMessage((InitCommand) cmd); } else if (cmd instanceof SimpleQueryCommand) { - return new SimpleQueryCodec<>((SimpleQueryCommand) cmd); + return new SimpleQueryPgCommandMessage<>((SimpleQueryCommand) cmd); } else if (cmd instanceof PrepareStatementCommand) { - return new PrepareStatementCommandCodec((PrepareStatementCommand) cmd); + return new PrepareStatementPgCommandMessage((PrepareStatementCommand) cmd); } else if (cmd instanceof CloseConnectionCommand) { - return CloseConnectionCommandCodec.INSTANCE; + return CloseConnectionPgCommandMessage.INSTANCE; } else if (cmd instanceof CloseCursorCommand) { - return new ClosePortalCommandCodec((CloseCursorCommand) cmd); + return new ClosePortalPgCommandMessage((CloseCursorCommand) cmd); } else if (cmd instanceof CloseStatementCommand) { - return new CloseStatementCommandCodec((CloseStatementCommand) cmd); + return new CloseStatementPgCommandMessage((CloseStatementCommand) cmd); } throw new AssertionError("Invalid command " + cmd); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java index aca5fcb116..d244d2b178 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java @@ -47,8 +47,7 @@ class PgDecoder extends ChannelInboundHandlerAdapter { } void fireCommandResponse(CommandResponse commandResponse) { - PgCommandCodec c = codec.poll(); - commandResponse.handler = (Completable) c.handler; + PgCommandMessage c = codec.poll(); chctx.fireChannelRead(commandResponse); } @@ -203,8 +202,8 @@ private void decodeCommandComplete(ByteBuf in) { } private void decodeDataRow(ByteBuf in) { - PgCommandCodec cmdCodec = codec.peek(); - QueryCommandBaseCodec cmd = (QueryCommandBaseCodec) cmdCodec; + PgCommandMessage cmdCodec = codec.peek(); + QueryBasePgCommandMessage cmd = (QueryBasePgCommandMessage) cmdCodec; int len = in.readUnsignedShort(); cmd.rowDecoder.handleRow(len, in); } @@ -253,7 +252,7 @@ private void decodeError(ChannelHandlerContext ctx, ByteBuf in) { decodeErrorOrNotice(response, in); switch (response.getCode()) { default: - PgCommandCodec cmd = codec.peek(); + PgCommandMessage cmd = codec.peek(); if (cmd != null) { cmd.handleErrorResponse(response); } @@ -361,7 +360,7 @@ private void decodeErrorOrNotice(Response response, ByteBuf in) { private void decodeAuthentication(ByteBuf in) { int type = in.readInt(); - PgCommandCodec pending = codec.peek(); + PgCommandMessage pending = codec.peek(); switch (type) { case PgProtocolConstants.AUTHENTICATION_TYPE_OK: { pending.handleAuthenticationOk(); diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java index fdd08e9a60..c47b1723ac 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java @@ -96,7 +96,7 @@ private void enqueueMessage(Object msg, Object p1, Object p2, Object p3, int est public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { if (!closeSent) { CloseConnectionCommand cmd = CloseConnectionCommand.INSTANCE; - PgCommandCodec codec = PgCommandCodec.wrap(cmd); + PgCommandMessage codec = PgCommandMessage.wrap(cmd); codec.encode(this); } } @@ -107,10 +107,9 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { capacityEstimate = 0; } - void write(PgCommandCodec cmd) { - if (codec.add(cmd)) { - cmd.encode(this); - } + void write(PgCommandMessage cmd) { + codec.add(cmd); + cmd.encode(this); } @Override @@ -120,8 +119,8 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof PgCommandCodec) { - write((PgCommandCodec) msg); + if (msg instanceof PgCommandMessage) { + write((PgCommandMessage) msg); } else { super.write(ctx, msg, promise); } @@ -160,10 +159,10 @@ private ByteBuf renderPendingMessages() { renderScramInitialMessage((ScramClientInitialMessage) msg, out); } else if (msg.getClass() == ScramClientFinalMessage.class) { renderScramFinalMessage((ScramClientFinalMessage) msg, out); - } else if (msg.getClass() == Query.class) { - renderQueryMessage((Query) msg, out); - } else if (msg.getClass() == Describe.class) { - renderDescribe((Describe) msg, out); + } else if (msg.getClass() == QueryMessage.class) { + renderQueryMessage((QueryMessage) msg, out); + } else if (msg.getClass() == DescribeMessage.class) { + renderDescribe((DescribeMessage) msg, out); } else if (msg.getClass() == ParseMessage.class) { String sql = (String) pendingMessages.get(index++); byte[] statement = (byte[]) pendingMessages.get(index++); @@ -173,10 +172,10 @@ private ByteBuf renderPendingMessages() { String portal = (String) pendingMessages.get(index++); int rowCount = (Integer) pendingMessages.get(index++); renderExecute(portal, rowCount, out); - } else if (msg.getClass() == Bind.class) { + } else if (msg.getClass() == BindMessage.class) { String portal = (String) pendingMessages.get(index++); Tuple paramValues = (Tuple) pendingMessages.get(index++); - renderBind((Bind) msg, portal, paramValues, out); + renderBind((BindMessage) msg, portal, paramValues, out); } else { throw new AssertionError(); } @@ -186,7 +185,7 @@ private ByteBuf renderPendingMessages() { return out; } - private static void renderQueryMessage(Query query, ByteBuf out) { + private static void renderQueryMessage(QueryMessage query, ByteBuf out) { int pos = out.writerIndex(); out.writeByte(QUERY); out.writeInt(0); @@ -194,7 +193,7 @@ private static void renderQueryMessage(Query query, ByteBuf out) { out.setInt(pos + 1, out.writerIndex() - pos - 1); } - private static int estimateQueryMessage(Query query) { + private static int estimateQueryMessage(QueryMessage query) { return 1 + 4 + DataTypeEstimator.estimateCStringUTF8(query.sql); } @@ -327,7 +326,7 @@ private static int estimateExecute(String portal, int rowCount) { return 1 + 4 + (portal != null ? DataTypeEstimator.estimateUTF8(portal) : 0) + 1 + 4; } - private static void renderDescribe(Describe describe, ByteBuf out) { + private static void renderDescribe(DescribeMessage describe, ByteBuf out) { int pos = out.writerIndex(); out.writeByte(DESCRIBE); out.writeInt(0); @@ -344,7 +343,7 @@ private static void renderDescribe(Describe describe, ByteBuf out) { out.setInt(pos + 1, out.writerIndex() - pos- 1); } - private static int estimateDescribe(Describe describe) { + private static int estimateDescribe(DescribeMessage describe) { int length = 1 + 4; if (describe.statement.length > 1) { length += 1 + DataTypeEstimator.estimateByteArray(describe.statement); @@ -382,7 +381,7 @@ private static int estimateParse(String sql, byte[] statement, DataType[] parame 2 + (parameterTypes == null ? 0 : parameterTypes.length * 4); } - private static void renderBind(Bind bind, String portal, Tuple paramValues, ByteBuf out) { + private static void renderBind(BindMessage bind, String portal, Tuple paramValues, ByteBuf out) { int pos = out.writerIndex(); out.writeByte(BIND); out.writeInt(0); @@ -432,7 +431,7 @@ private static void renderBind(Bind bind, String portal, Tuple paramValues, Byte out.setInt(pos + 1, out.writerIndex() - pos - 1); } - private static int estimateBind(Bind bind, String portal, Tuple paramValues) { + private static int estimateBind(BindMessage bind, String portal, Tuple paramValues) { int paramLen = paramValues.size(); @@ -580,7 +579,7 @@ void writeScramClientFinalMessage(ScramClientFinalMessage msg) { * {@link CommandComplete}, {@link RowDescriptorBase}, {@link DataRow}, {@link EmptyQueryResponse}, {@link ErrorResponse}, * {@link ReadyForQuery} and {@link NoticeResponse} */ - void writeQuery(Query query) { + void writeQuery(QueryMessage query) { enqueueMessage(query, estimateQueryMessage(query)); } @@ -593,7 +592,7 @@ void writeQuery(Query query) { * executed or a {@link NoData} message if the statement will not return rows. * {@link ErrorResponse} is issued if there is no such prepared statement. *

- * Note that since {@link Bind} has not yet been issued, the formats to be used for returned columns are not yet known to + * Note that since {@link BindMessage} has not yet been issued, the formats to be used for returned columns are not yet known to * the backend; the format code fields in the {@link RowDescriptorBase} message will be zeroes in this case. *

* The message that using "portal" variant specifies the name of an existing portal. @@ -602,7 +601,7 @@ void writeQuery(Query query) { * or a {@link NoData} message if the portal does not contain a query that will return rows; or {@link ErrorResponse} * if there is no such portal. */ - void writeDescribe(Describe describe) { + void writeDescribe(DescribeMessage describe) { enqueueMessage(describe, estimateDescribe(describe)); } @@ -616,7 +615,7 @@ void writeParse(String sql, byte[] statement, DataType[] parameterTypes) { * The row count of the result is only meaningful for portals containing commands that return row sets; * in other cases the command is always executed to completion, and the row count of the result is ignored. *

- * The possible responses to this message are the same as {@link Query} message, except that + * The possible responses to this message are the same as {@link QueryMessage} message, except that * it doesn't cause {@link ReadyForQuery} or {@link RowDescriptorBase} to be issued. *

* If Execute terminates before completing the execution of a portal, it will send a {@link PortalSuspended} message; @@ -640,7 +639,7 @@ void writeExecute(String portal, int rowCount) { *

* The response is either {@link BindComplete} or {@link ErrorResponse}. */ - void writeBind(Bind bind, String portal, Tuple paramValues) { + void writeBind(BindMessage bind, String portal, Tuple paramValues) { enqueueMessage(bind, portal, paramValues, estimateBind(bind, portal, paramValues)); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java index 98580843cd..adeb371887 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java @@ -23,7 +23,7 @@ class PgPreparedStatement implements PreparedStatement { final String sql; - final Bind bind; + final BindMessage bind; final PgParamDesc paramDesc; final PgRowDescriptor rowDesc; final boolean cached; @@ -32,7 +32,7 @@ class PgPreparedStatement implements PreparedStatement { this.paramDesc = paramDesc; this.rowDesc = rowDesc; this.sql = sql; - this.bind = new Bind(statement, paramDesc != null ? paramDesc.paramDataTypes() : null, rowDesc != null ? rowDesc.columns : PgColumnDesc.EMPTY_COLUMNS); + this.bind = new BindMessage(statement, paramDesc != null ? paramDesc.paramDataTypes() : null, rowDesc != null ? rowDesc.columns : PgColumnDesc.EMPTY_COLUMNS); this.cached = cached; } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PrepareStatementCommandCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PrepareStatementPgCommandMessage.java similarity index 91% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PrepareStatementCommandCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PrepareStatementPgCommandMessage.java index afa648d890..0b5a8f504e 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PrepareStatementCommandCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PrepareStatementPgCommandMessage.java @@ -21,7 +21,7 @@ import java.util.List; -class PrepareStatementCommandCodec extends PgCommandCodec { +class PrepareStatementPgCommandMessage extends PgCommandMessage { private static final byte[] EMPTY_STRING = { 0 }; @@ -30,7 +30,7 @@ class PrepareStatementCommandCodec extends PgCommandCodec> parameterTypes = cmd.parameterTypes(); DataType[] parameterTypes2 = parameterTypes != null ? build(parameterTypes) : null; encoder.writeParse(cmd.sql(), statement, parameterTypes2); - encoder.writeDescribe(new Describe(statement, null)); + encoder.writeDescribe(new DescribeMessage(statement, null)); encoder.writeSync(); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryCommandBaseCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryBasePgCommandMessage.java similarity index 91% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryCommandBaseCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryBasePgCommandMessage.java index e75bbbadd6..62a5128f96 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryCommandBaseCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryBasePgCommandMessage.java @@ -22,11 +22,11 @@ import java.util.stream.Collector; -abstract class QueryCommandBaseCodec> extends PgCommandCodec { +abstract class QueryBasePgCommandMessage> extends PgCommandMessage { RowResultDecoder rowDecoder; - QueryCommandBaseCodec(C cmd) { + QueryBasePgCommandMessage(C cmd) { super(cmd); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Query.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryMessage.java similarity index 91% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Query.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryMessage.java index 72710b9ba9..e744b59951 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Query.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/QueryMessage.java @@ -20,11 +20,11 @@ /** * @author Emad Alblueshi */ -class Query extends OutboundMessage { +class QueryMessage extends OutboundMessage { final String sql; - Query(String sql) { + QueryMessage(String sql) { this.sql = sql; } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SimpleQueryCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SimpleQueryPgCommandMessage.java similarity index 82% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SimpleQueryCodec.java rename to vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SimpleQueryPgCommandMessage.java index 902ca43f8e..26bf3b4483 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SimpleQueryCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SimpleQueryPgCommandMessage.java @@ -20,17 +20,17 @@ import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand; -class SimpleQueryCodec extends QueryCommandBaseCodec> { +class SimpleQueryPgCommandMessage extends QueryBasePgCommandMessage> { - private static final Logger LOGGER = LoggerFactory.getLogger(PgCommandCodec.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PgCommandMessage.class); - SimpleQueryCodec(SimpleQueryCommand cmd) { + SimpleQueryPgCommandMessage(SimpleQueryCommand cmd) { super(cmd); } @Override void encode(PgEncoder encoder) { - encoder.writeQuery(new Query(cmd.sql())); + encoder.writeQuery(new QueryMessage(cmd.sql())); } @Override diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandMessage.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandMessage.java index 99ae87b6f5..f19065ed57 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandMessage.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandMessage.java @@ -10,31 +10,16 @@ */ package io.vertx.sqlclient.codec; -import io.vertx.core.AsyncResult; import io.vertx.core.Completable; -import io.vertx.core.Future; import io.vertx.sqlclient.spi.protocol.CommandBase; public class CommandMessage> { - public Completable handler; + Completable handler; // Should not leak outside of this package public final C cmd; public CommandMessage(C cmd) { this.cmd = cmd; } - public final void fail(Throwable err) { - complete(Future.failedFuture(err)); - } - - public final void fail(String failureMsg) { - complete(Future.failedFuture(failureMsg)); - } - - public final void complete(AsyncResult resp) { - if (handler != null) { - handler.complete(resp.result(), resp.cause()); - } - } } diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandResponse.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandResponse.java index 83634185ca..21494344e3 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandResponse.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/CommandResponse.java @@ -11,11 +11,7 @@ package io.vertx.sqlclient.codec; -import io.vertx.core.Completable; -import io.vertx.core.Future; -import io.vertx.core.AsyncResult; import io.vertx.core.VertxException; -import io.vertx.sqlclient.spi.protocol.CommandBase; public class CommandResponse { @@ -24,29 +20,19 @@ public static CommandResponse failure(String msg) { } public static CommandResponse failure(Throwable cause) { - return new CommandResponse<>(Future.failedFuture(cause)); + return new CommandResponse<>(null, cause); } public static CommandResponse success(R result) { - return new CommandResponse<>(Future.succeededFuture(result)); + return new CommandResponse<>(result, null); } // The connection that executed the command - public CommandBase cmd; - public Completable handler; - private final AsyncResult res; + final R result; + final Throwable failure; - public CommandResponse(AsyncResult res) { - this.res = res; - } - - public AsyncResult toAsyncResult() { - return res; - } - - public final void fire() { - if (handler != null) { - handler.complete(res.result(), res.cause()); - } + private CommandResponse(R result, Throwable failure) { + this.result = result; + this.failure = failure; } } diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 65ecb151a4..a0979c79ea 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -29,6 +29,7 @@ import io.vertx.core.tracing.TracingPolicy; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.net.NetSocketInternal; +import io.vertx.sqlclient.ClosedConnectionException; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.codec.impl.PreparedStatementCache; @@ -77,6 +78,8 @@ public enum Status { // Command pipeline state private final ArrayDeque> pending = new ArrayDeque<>(); private final ArrayDeque> handlers = new ArrayDeque<>(); + private final ArrayDeque> inflights = new ArrayDeque<>(); + private boolean executing; private int inflight; private boolean paused; @@ -256,7 +259,7 @@ private void checkPending() { if (closeCmd != null) { inflight++; written++; - ctx.write(closeCmd, ctx.voidPromise()); + fireCommandMessage(ctx, closeCmd); } } CommandMessage prepareCmd = prepareCommand(queryCmd, handler, cache, false); @@ -277,7 +280,7 @@ private void checkPending() { toSend = createMessage(cmd, handler); } written++; - ctx.write(toSend, ctx.voidPromise()); + fireCommandMessage(ctx, toSend); } if (written > 0) { ctx.flush(); @@ -287,6 +290,11 @@ private void checkPending() { } } + private void fireCommandMessage(ChannelHandlerContext chctx, CommandMessage msg) { + inflights.add(msg); + chctx.write(msg, chctx.voidPromise()); + } + private CommandMessage createMessage(ExtendedQueryCommand command, PreparedStatement preparedStatement, Completable handler) { CommandMessage msg = toMessage(command, preparedStatement); msg.handler = (Completable) handler; @@ -316,21 +324,20 @@ private void checkPending() { if (cache) { cacheStatement(ps); } -// queryCmd.ps = ps; String msg = queryCmd.prepare(ps); if (msg != null) { inflight--; handler.fail(VertxException.noStackTrace(msg)); } else { ChannelHandlerContext ctx = socket.channelHandlerContext(); - ctx.write(createMessage(queryCmd, ps, handler), ctx.voidPromise()); + fireCommandMessage(ctx, createMessage(queryCmd, ps, handler)); ctx.flush(); } } else { if (isIndeterminatePreparedStatementError(cause) && !sendParameterTypes) { ChannelHandlerContext ctx = socket.channelHandlerContext(); // We cannot cache this prepared statement because it might be executed with another type - ctx.write(prepareCommand(queryCmd, handler, false, true), ctx.voidPromise()); + fireCommandMessage(ctx, prepareCommand(queryCmd, handler, false, true)); ctx.flush(); } else { inflight--; @@ -345,13 +352,20 @@ protected void handleMessage(Object msg) { if (msg instanceof CommandResponse) { inflight--; CommandResponse resp =(CommandResponse) msg; - resp.fire(); + CommandMessage command = inflights.poll(); + fire(resp, command.handler); } else if (msg instanceof InvalidCachedStatementEvent) { InvalidCachedStatementEvent event = (InvalidCachedStatementEvent) msg; removeCachedStatement(event.sql()); } } + private static void fire(CommandResponse response, Completable handler) { + if (handler != null) { + handler.complete(response.result, response.failure); + } + } + private void handleReadComplete(Void v) { checkPending(); } @@ -420,12 +434,16 @@ protected void handleClose(Throwable t) { if (t != null) { reportException(t); } + CommandMessage msg; + while ((msg = inflights.poll()) != null) { + fail(msg, ClosedConnectionException.INSTANCE); + } Throwable cause = t == null ? VertxException.noStackTrace(PENDING_CMD_CONNECTION_CORRUPT_MSG) : new VertxException(PENDING_CMD_CONNECTION_CORRUPT_MSG, t); CommandBase cmd; while ((cmd = pending.poll()) != null) { - CommandBase c = cmd; - Completable handler = handlers.poll(); - context.runOnContext(v -> handler.fail(cause)); + CommandBase c = cmd; + Completable handler = handlers.poll(); + fail(c, handler, cause); } if (holder != null) { holder.handleClosed(); @@ -433,6 +451,14 @@ protected void handleClose(Throwable t) { } } + private void fail(CommandMessage msg, Throwable err) { + fail(msg.cmd, msg.handler, err); + } + + protected void fail(CommandBase command, Completable handler, Throwable err) { + context.runOnContext(v -> handler.fail(err)); + } + public boolean pipeliningEnabled() { return pipeliningLimit > 1; } diff --git a/vertx-sql-client/src/main/asciidoc/tracing.adoc b/vertx-sql-client/src/main/asciidoc/tracing.adoc index 6fc6b2b4c6..f4421d1283 100644 --- a/vertx-sql-client/src/main/asciidoc/tracing.adoc +++ b/vertx-sql-client/src/main/asciidoc/tracing.adoc @@ -2,7 +2,7 @@ The SQL client can trace query execution when Vert.x has tracing enabled. The client reports the following _client_ spans: -- `Query` operation name +- `QueryMessage` operation name - tags - `db.system`: the database management system product - `db.user`: the database username