diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java index 2b5bd675b..895d4b7f3 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java @@ -17,6 +17,7 @@ package io.vertx.mysqlclient.impl.codec; import io.netty.buffer.ByteBuf; +import io.vertx.mysqlclient.impl.MySQLRowDesc; import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.internal.command.CommandResponse; @@ -31,7 +32,14 @@ class ExtendedQueryCommandCodec extends ExtendedQueryCommandBaseCodec 0 && statement.isCursorOpen) { // restore the state we need for decoding fetch response based on the prepared statement - columnDefinitions = statement.rowDesc.columnDefinitions(); + columnDefinitions = statement.cursorRowDescriptor.columnDefinitions(); + } + } + + @Override + protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) { + if (cmd.fetch() > 0) { + statement.cursorRowDescriptor = mySQLRowDesc; } } @@ -42,7 +50,7 @@ void encode(MySQLEncoder encoder) { if (statement.isCursorOpen) { if (decoder == null) { // restore the state we need for decoding if column definitions are not included in the fetch response - decoder = new RowResultDecoder<>(cmd.collector(), statement.rowDesc); + decoder = new RowResultDecoder<>(cmd.collector(), statement.cursorRowDescriptor); } sendStatementFetchCommand(statement.statementId, cmd.fetch()); } else { diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java index ae5526856..140a0e605 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java @@ -23,7 +23,7 @@ import io.vertx.mysqlclient.impl.datatype.DataType; import io.vertx.mysqlclient.impl.datatype.DataTypeCodec; import io.vertx.sqlclient.Tuple; -import io.vertx.sqlclient.impl.*; +import io.vertx.sqlclient.impl.ErrorMessageFactory; import io.vertx.sqlclient.internal.ParamDesc; import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.internal.RowDesc; @@ -36,18 +36,17 @@ class MySQLPreparedStatement implements PreparedStatement { final long statementId; final String sql; final MySQLParamDesc paramDesc; - final MySQLRowDesc rowDesc; final boolean closeAfterUsage; private boolean sendTypesToServer; private final DataType[] bindingTypes; boolean isCursorOpen; + MySQLRowDesc cursorRowDescriptor; - MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, MySQLRowDesc rowDesc, boolean closeAfterUsage) { + MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, boolean closeAfterUsage) { this.statementId = statementId; this.paramDesc = paramDesc; - this.rowDesc = rowDesc; this.sql = sql; this.closeAfterUsage = closeAfterUsage; @@ -63,7 +62,7 @@ public ParamDesc paramDesc() { @Override public RowDesc rowDesc() { - return rowDesc; + throw new UnsupportedOperationException("The client should use the column definitions provided by execute or fetch response instead of prepare response"); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java index 24bb73ea3..6f98695be 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java @@ -18,8 +18,6 @@ import io.netty.buffer.ByteBuf; import io.vertx.mysqlclient.impl.MySQLParamDesc; -import io.vertx.mysqlclient.impl.MySQLRowDesc; -import io.vertx.mysqlclient.impl.datatype.DataFormat; import io.vertx.mysqlclient.impl.protocol.ColumnDefinition; import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.internal.PreparedStatement; @@ -135,7 +133,6 @@ private void handleReadyForQuery() { cmd.sql(), this.statementId, new MySQLParamDesc(paramDescs), - MySQLRowDesc.create(columnDescs, DataFormat.BINARY), !cmd.isManaged()))); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java index 37c6aa329..cc9bc1f73 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java @@ -94,9 +94,13 @@ protected void handleResultsetColumnDefinitions(ByteBuf payload) { protected void handleResultsetColumnDefinitionsDecodingCompleted() { commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET; MySQLRowDesc mySQLRowDesc = MySQLRowDesc.create(columnDefinitions, format); // use the column definitions if provided by execute or fetch response instead of prepare response + handleRowDescriptorCreated(mySQLRowDesc); decoder = new RowResultDecoder<>(cmd.collector(), mySQLRowDesc); } + protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) { + } + protected void handleRows(ByteBuf payload, int payloadLength) { /* Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff) diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java index 0ddc12aa0..5fbc98a90 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java @@ -29,6 +29,7 @@ void encode(MySQLEncoder encoder) { statement.cleanBindings(); statement.isCursorOpen = false; + statement.cursorRowDescriptor = null; sendStatementResetCommand(statement.statementId); } diff --git a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java index 5a34ed8e0..05ddda2aa 100644 --- a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java +++ b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java @@ -14,6 +14,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.FileSystem; +import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.mysqlclient.MySQLClient; @@ -21,7 +22,6 @@ import io.vertx.mysqlclient.MySQLConnection; import io.vertx.sqlclient.*; import org.junit.After; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,6 +32,8 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import static org.junit.Assume.assumeFalse; + @RunWith(VertxUnitRunner.class) public class MySQLQueryTest extends MySQLTestBase { @@ -280,7 +282,7 @@ public void testDecodePacketSizeMoreThan16MB(TestContext ctx) { @Test public void testEncodePacketSizeMoreThan16MB(TestContext ctx) { - Assume.assumeFalse(rule.isUsingMySQL5_6()); + assumeFalse(rule.isUsingMySQL5_6()); int dataSize = 20 * 1024 * 1024; // 20MB payload byte[] data = new byte[dataSize]; ThreadLocalRandom.current().nextBytes(data); @@ -470,4 +472,22 @@ public void testLocalInfileRequestInPackets(TestContext ctx) { })); })); } + + @Test + public void testColumnDefinitionChangeWithCursor(TestContext ctx) { + assumeFalse("Query syntax not supported on MySQL 5", rule.isUsingMySQL5()); + Async rows = ctx.async(100); + Async completion = ctx.async(); + MySQLConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + conn + .prepare("SELECT row_number() over () as \"Number\", case when 1 then 1 else 0 end as \"Case\" FROM mysql.help_relation limit 0,100") + .onComplete(ctx.asyncAssertSuccess(ps -> { + // Make sure to fetch from the database twice + RowStream stream = ps.createStream(50); + stream.exceptionHandler(ctx::fail); + stream.endHandler(v -> completion.complete()); + stream.handler(row -> rows.countDown()); + })); + })); + } } diff --git a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java index c56b477f1..7f583aa34 100644 --- a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java +++ b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java @@ -143,6 +143,10 @@ public boolean isUsingMariaDB() { return databaseServerInfo.getDatabaseType() == DatabaseType.MariaDB; } + public boolean isUsingMySQL5() { + return databaseServerInfo.databaseType == DatabaseType.MySQL && databaseServerInfo.dockerImageTag.startsWith("5."); + } + public boolean isUsingMySQL5_6() { return databaseServerInfo == DatabaseServerInfo.MySQL_V5_6; } diff --git a/vertx-mysql-client/src/test/resources/init.sql b/vertx-mysql-client/src/test/resources/init.sql index ef68fdf9e..9181a68f8 100644 --- a/vertx-mysql-client/src/test/resources/init.sql +++ b/vertx-mysql-client/src/test/resources/init.sql @@ -1,3 +1,8 @@ +#allow reading mysql schema +GRANT +SELECT +ON mysql.* TO 'mysql'; + # testing change schema CREATE DATABASE emptyschema; GRANT ALL ON emptyschema.* TO 'mysql'@'%';