From 57789a0df5d4eacf6d79df832873a84187b2acb4 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Tue, 4 Nov 2025 11:44:39 +0100 Subject: [PATCH] MySQL cursor fetch uses wrong row descriptor after initial fetch (#1562) See #1525 MySQL sends column definitions in response to a prepare command. But when executing the statement or fetching the first page of a cursor, it sends column definitions again. The row decoder should not use the definitions given in response to prepare command because they may not be accurate. The previous cursor implementation used them when fetching the second page. Signed-off-by: Thomas Segismont --- .../impl/codec/ExtendedQueryCommandCodec.java | 12 ++++++++-- .../impl/codec/MySQLPreparedStatement.java | 9 ++++--- .../impl/codec/PrepareStatementCodec.java | 3 --- .../impl/codec/QueryCommandBaseCodec.java | 4 ++++ .../codec/ResetStatementCommandCodec.java | 1 + .../tests/mysqlclient/MySQLQueryTest.java | 24 +++++++++++++++++-- .../tests/mysqlclient/junit/MySQLRule.java | 4 ++++ .../src/test/resources/init.sql | 5 ++++ 8 files changed, 50 insertions(+), 12 deletions(-) diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java index 2b5bd675b7..895d4b7f37 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java @@ -17,6 +17,7 @@ package io.vertx.mysqlclient.impl.codec; import io.netty.buffer.ByteBuf; +import io.vertx.mysqlclient.impl.MySQLRowDesc; import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.internal.command.CommandResponse; @@ -31,7 +32,14 @@ class ExtendedQueryCommandCodec extends ExtendedQueryCommandBaseCodec 0 && statement.isCursorOpen) { // restore the state we need for decoding fetch response based on the prepared statement - columnDefinitions = statement.rowDesc.columnDefinitions(); + columnDefinitions = statement.cursorRowDescriptor.columnDefinitions(); + } + } + + @Override + protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) { + if (cmd.fetch() > 0) { + statement.cursorRowDescriptor = mySQLRowDesc; } } @@ -42,7 +50,7 @@ void encode(MySQLEncoder encoder) { if (statement.isCursorOpen) { if (decoder == null) { // restore the state we need for decoding if column definitions are not included in the fetch response - decoder = new RowResultDecoder<>(cmd.collector(), statement.rowDesc); + decoder = new RowResultDecoder<>(cmd.collector(), statement.cursorRowDescriptor); } sendStatementFetchCommand(statement.statementId, cmd.fetch()); } else { diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java index ae5526856f..140a0e605d 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java @@ -23,7 +23,7 @@ import io.vertx.mysqlclient.impl.datatype.DataType; import io.vertx.mysqlclient.impl.datatype.DataTypeCodec; import io.vertx.sqlclient.Tuple; -import io.vertx.sqlclient.impl.*; +import io.vertx.sqlclient.impl.ErrorMessageFactory; import io.vertx.sqlclient.internal.ParamDesc; import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.internal.RowDesc; @@ -36,18 +36,17 @@ class MySQLPreparedStatement implements PreparedStatement { final long statementId; final String sql; final MySQLParamDesc paramDesc; - final MySQLRowDesc rowDesc; final boolean closeAfterUsage; private boolean sendTypesToServer; private final DataType[] bindingTypes; boolean isCursorOpen; + MySQLRowDesc cursorRowDescriptor; - MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, MySQLRowDesc rowDesc, boolean closeAfterUsage) { + MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, boolean closeAfterUsage) { this.statementId = statementId; this.paramDesc = paramDesc; - this.rowDesc = rowDesc; this.sql = sql; this.closeAfterUsage = closeAfterUsage; @@ -63,7 +62,7 @@ public ParamDesc paramDesc() { @Override public RowDesc rowDesc() { - return rowDesc; + throw new UnsupportedOperationException("The client should use the column definitions provided by execute or fetch response instead of prepare response"); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java index 24bb73ea36..6f98695be9 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java @@ -18,8 +18,6 @@ import io.netty.buffer.ByteBuf; import io.vertx.mysqlclient.impl.MySQLParamDesc; -import io.vertx.mysqlclient.impl.MySQLRowDesc; -import io.vertx.mysqlclient.impl.datatype.DataFormat; import io.vertx.mysqlclient.impl.protocol.ColumnDefinition; import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.internal.PreparedStatement; @@ -135,7 +133,6 @@ private void handleReadyForQuery() { cmd.sql(), this.statementId, new MySQLParamDesc(paramDescs), - MySQLRowDesc.create(columnDescs, DataFormat.BINARY), !cmd.isManaged()))); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java index 37c6aa3296..cc9bc1f730 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java @@ -94,9 +94,13 @@ protected void handleResultsetColumnDefinitions(ByteBuf payload) { protected void handleResultsetColumnDefinitionsDecodingCompleted() { commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET; MySQLRowDesc mySQLRowDesc = MySQLRowDesc.create(columnDefinitions, format); // use the column definitions if provided by execute or fetch response instead of prepare response + handleRowDescriptorCreated(mySQLRowDesc); decoder = new RowResultDecoder<>(cmd.collector(), mySQLRowDesc); } + protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) { + } + protected void handleRows(ByteBuf payload, int payloadLength) { /* Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff) diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java index 0ddc12aa08..5fbc98a90d 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java @@ -29,6 +29,7 @@ void encode(MySQLEncoder encoder) { statement.cleanBindings(); statement.isCursorOpen = false; + statement.cursorRowDescriptor = null; sendStatementResetCommand(statement.statementId); } diff --git a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java index 5a34ed8e07..05ddda2aa1 100644 --- a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java +++ b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java @@ -14,6 +14,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.FileSystem; +import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.mysqlclient.MySQLClient; @@ -21,7 +22,6 @@ import io.vertx.mysqlclient.MySQLConnection; import io.vertx.sqlclient.*; import org.junit.After; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,6 +32,8 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import static org.junit.Assume.assumeFalse; + @RunWith(VertxUnitRunner.class) public class MySQLQueryTest extends MySQLTestBase { @@ -280,7 +282,7 @@ public void testDecodePacketSizeMoreThan16MB(TestContext ctx) { @Test public void testEncodePacketSizeMoreThan16MB(TestContext ctx) { - Assume.assumeFalse(rule.isUsingMySQL5_6()); + assumeFalse(rule.isUsingMySQL5_6()); int dataSize = 20 * 1024 * 1024; // 20MB payload byte[] data = new byte[dataSize]; ThreadLocalRandom.current().nextBytes(data); @@ -470,4 +472,22 @@ public void testLocalInfileRequestInPackets(TestContext ctx) { })); })); } + + @Test + public void testColumnDefinitionChangeWithCursor(TestContext ctx) { + assumeFalse("Query syntax not supported on MySQL 5", rule.isUsingMySQL5()); + Async rows = ctx.async(100); + Async completion = ctx.async(); + MySQLConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + conn + .prepare("SELECT row_number() over () as \"Number\", case when 1 then 1 else 0 end as \"Case\" FROM mysql.help_relation limit 0,100") + .onComplete(ctx.asyncAssertSuccess(ps -> { + // Make sure to fetch from the database twice + RowStream stream = ps.createStream(50); + stream.exceptionHandler(ctx::fail); + stream.endHandler(v -> completion.complete()); + stream.handler(row -> rows.countDown()); + })); + })); + } } diff --git a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java index c56b477f15..7f583aa349 100644 --- a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java +++ b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java @@ -143,6 +143,10 @@ public boolean isUsingMariaDB() { return databaseServerInfo.getDatabaseType() == DatabaseType.MariaDB; } + public boolean isUsingMySQL5() { + return databaseServerInfo.databaseType == DatabaseType.MySQL && databaseServerInfo.dockerImageTag.startsWith("5."); + } + public boolean isUsingMySQL5_6() { return databaseServerInfo == DatabaseServerInfo.MySQL_V5_6; } diff --git a/vertx-mysql-client/src/test/resources/init.sql b/vertx-mysql-client/src/test/resources/init.sql index ef68fdf9e6..9181a68f8c 100644 --- a/vertx-mysql-client/src/test/resources/init.sql +++ b/vertx-mysql-client/src/test/resources/init.sql @@ -1,3 +1,8 @@ +#allow reading mysql schema +GRANT +SELECT +ON mysql.* TO 'mysql'; + # testing change schema CREATE DATABASE emptyschema; GRANT ALL ON emptyschema.* TO 'mysql'@'%';