diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommand.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommand.java index 093a19233..b10f22702 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommand.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryMySQLCommand.java @@ -17,6 +17,7 @@ package io.vertx.mysqlclient.impl.codec; import io.netty.buffer.ByteBuf; +import io.vertx.mysqlclient.impl.MySQLRowDescriptor; import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.codec.CommandResponse; @@ -31,7 +32,14 @@ public ExtendedQueryMySQLCommand(ExtendedQueryCommand cmd, MySQLPreparedState super(cmd, statement); if (cmd.fetch() > 0 && statement.isCursorOpen) { // restore the state we need for decoding fetch response based on the prepared statement - columnDefinitions = statement.rowDesc.columnDefinitions(); + columnDefinitions = statement.cursorRowDescriptor.columnDefinitions(); + } + } + + @Override + protected void handleRowDescriptorCreated(MySQLRowDescriptor mySQLRowDesc) { + if (cmd.fetch() > 0) { + statement.cursorRowDescriptor = mySQLRowDesc; } } @@ -42,7 +50,7 @@ void encode(MySQLEncoder encoder) { if (statement.isCursorOpen) { if (decoder == null) { // restore the state we need for decoding if column definitions are not included in the fetch response - decoder = new RowResultDecoder<>(cmd.collector(), statement.rowDesc); + decoder = new RowResultDecoder<>(cmd.collector(), statement.cursorRowDescriptor); } sendStatementFetchCommand(statement.statementId, cmd.fetch()); } else { diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java index f637a6543..765cc1b86 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java @@ -23,9 +23,8 @@ import io.vertx.mysqlclient.impl.datatype.DataType; import io.vertx.mysqlclient.impl.datatype.DataTypeCodec; import io.vertx.sqlclient.Tuple; -import io.vertx.sqlclient.impl.*; +import io.vertx.sqlclient.impl.ErrorMessageFactory; import io.vertx.sqlclient.internal.PreparedStatement; -import io.vertx.sqlclient.internal.RowDescriptorBase; import io.vertx.sqlclient.internal.TupleBase; import java.util.Arrays; @@ -35,18 +34,17 @@ public class MySQLPreparedStatement implements PreparedStatement { final long statementId; final String sql; final MySQLParamDesc paramDesc; - final MySQLRowDescriptor rowDesc; final boolean closeAfterUsage; private boolean sendTypesToServer; private final DataType[] bindingTypes; boolean isCursorOpen; + MySQLRowDescriptor cursorRowDescriptor; - MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, MySQLRowDescriptor rowDesc, boolean closeAfterUsage) { + MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, boolean closeAfterUsage) { this.statementId = statementId; this.paramDesc = paramDesc; - this.rowDesc = rowDesc; this.sql = sql; this.closeAfterUsage = closeAfterUsage; @@ -56,8 +54,8 @@ public class MySQLPreparedStatement implements PreparedStatement { } @Override - public RowDescriptorBase rowDesc() { - return rowDesc; + public MySQLRowDescriptor rowDesc() { + throw new UnsupportedOperationException("The client should use the column definitions provided by execute or fetch response instead of prepare response"); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementMySQLCommand.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementMySQLCommand.java index 13ebf3335..07e636f79 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementMySQLCommand.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementMySQLCommand.java @@ -18,12 +18,10 @@ import io.netty.buffer.ByteBuf; import io.vertx.mysqlclient.impl.MySQLParamDesc; -import io.vertx.mysqlclient.impl.MySQLRowDescriptor; -import io.vertx.mysqlclient.impl.datatype.DataFormat; import io.vertx.mysqlclient.impl.protocol.ColumnDefinition; import io.vertx.mysqlclient.impl.protocol.CommandType; -import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.codec.CommandResponse; +import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.spi.protocol.PrepareStatementCommand; import static io.vertx.mysqlclient.impl.protocol.Packets.ERROR_PACKET_HEADER; @@ -135,7 +133,6 @@ private void handleReadyForQuery() { cmd.sql(), this.statementId, new MySQLParamDesc(paramDescs), - MySQLRowDescriptor.create(columnDescs, DataFormat.BINARY), !cmd.isManaged()))); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryMySQLCommandBase.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryMySQLCommandBase.java index b956fe49f..0b702553f 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryMySQLCommandBase.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryMySQLCommandBase.java @@ -23,8 +23,8 @@ import io.vertx.mysqlclient.impl.protocol.ColumnDefinition; import io.vertx.mysqlclient.impl.util.BufferUtils; import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.internal.RowDescriptorBase; import io.vertx.sqlclient.codec.CommandResponse; +import io.vertx.sqlclient.internal.RowDescriptorBase; import io.vertx.sqlclient.spi.protocol.QueryCommandBase; import java.util.stream.Collector; @@ -94,9 +94,13 @@ protected void handleResultsetColumnDefinitions(ByteBuf payload) { protected void handleResultsetColumnDefinitionsDecodingCompleted() { commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET; MySQLRowDescriptor mySQLRowDesc = MySQLRowDescriptor.create(columnDefinitions, format); // use the column definitions if provided by execute or fetch response instead of prepare response + handleRowDescriptorCreated(mySQLRowDesc); decoder = new RowResultDecoder<>(cmd.collector(), mySQLRowDesc); } + protected void handleRowDescriptorCreated(MySQLRowDescriptor mySQLRowDesc) { + } + protected void handleRows(ByteBuf payload, int payloadLength) { /* Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff) diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementMySQLCommand.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementMySQLCommand.java index 998b6a0d9..e5ac8c309 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementMySQLCommand.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementMySQLCommand.java @@ -29,6 +29,7 @@ void encode(MySQLEncoder encoder) { statement.cleanBindings(); statement.isCursorOpen = false; + statement.cursorRowDescriptor = null; sendStatementResetCommand(statement.statementId); } diff --git a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java index 5a34ed8e0..89c1c3113 100644 --- a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java +++ b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java @@ -14,6 +14,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.FileSystem; +import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.mysqlclient.MySQLClient; @@ -470,4 +471,21 @@ public void testLocalInfileRequestInPackets(TestContext ctx) { })); })); } + + @Test + public void testColumnDefinitionChangeWithCursor(TestContext ctx) { + Async rows = ctx.async(100); + Async completion = ctx.async(); + MySQLConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + conn + .prepare("SELECT row_number() over () as \"Number\", case when 1 then 1 else 0 end as \"Case\" FROM mysql.help_relation limit 0,100") + .onComplete(ctx.asyncAssertSuccess(ps -> { + // Make sure to fetch from the database twice + RowStream stream = ps.createStream(50); + stream.exceptionHandler(ctx::fail); + stream.endHandler(v -> completion.complete()); + stream.handler(row -> rows.countDown()); + })); + })); + } } diff --git a/vertx-mysql-client/src/test/resources/init.sql b/vertx-mysql-client/src/test/resources/init.sql index ef68fdf9e..8e19a1398 100644 --- a/vertx-mysql-client/src/test/resources/init.sql +++ b/vertx-mysql-client/src/test/resources/init.sql @@ -1,3 +1,9 @@ +#allow +reading mysql schema +GRANT +SELECT +ON mysql.* TO 'mysql'; + # testing change schema CREATE DATABASE emptyschema; GRANT ALL ON emptyschema.* TO 'mysql'@'%';