Skip to content

Commit 125df2f

Browse files
authored
MySQL cursor fetch uses wrong row descriptor after initial fetch (#1562) (#1563)
See #1525 MySQL sends column definitions in response to a prepare command. But when executing the statement or fetching the first page of a cursor, it sends column definitions again. The row decoder should not use the definitions given in response to prepare command because they may not be accurate. The previous cursor implementation used them when fetching the second page. Signed-off-by: Thomas Segismont <[email protected]>
1 parent 532a4a2 commit 125df2f

File tree

8 files changed

+50
-12
lines changed

8 files changed

+50
-12
lines changed

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.vertx.mysqlclient.impl.codec;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.vertx.mysqlclient.impl.MySQLRowDesc;
2021
import io.vertx.mysqlclient.impl.protocol.CommandType;
2122
import io.vertx.sqlclient.Tuple;
2223
import io.vertx.sqlclient.internal.command.CommandResponse;
@@ -31,7 +32,14 @@ class ExtendedQueryCommandCodec<R> extends ExtendedQueryCommandBaseCodec<R, Exte
3132
super(cmd);
3233
if (cmd.fetch() > 0 && statement.isCursorOpen) {
3334
// restore the state we need for decoding fetch response based on the prepared statement
34-
columnDefinitions = statement.rowDesc.columnDefinitions();
35+
columnDefinitions = statement.cursorRowDescriptor.columnDefinitions();
36+
}
37+
}
38+
39+
@Override
40+
protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) {
41+
if (cmd.fetch() > 0) {
42+
statement.cursorRowDescriptor = mySQLRowDesc;
3543
}
3644
}
3745

@@ -42,7 +50,7 @@ void encode(MySQLEncoder encoder) {
4250
if (statement.isCursorOpen) {
4351
if (decoder == null) {
4452
// restore the state we need for decoding if column definitions are not included in the fetch response
45-
decoder = new RowResultDecoder<>(cmd.collector(), statement.rowDesc);
53+
decoder = new RowResultDecoder<>(cmd.collector(), statement.cursorRowDescriptor);
4654
}
4755
sendStatementFetchCommand(statement.statementId, cmd.fetch());
4856
} else {

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.vertx.mysqlclient.impl.datatype.DataType;
2424
import io.vertx.mysqlclient.impl.datatype.DataTypeCodec;
2525
import io.vertx.sqlclient.Tuple;
26-
import io.vertx.sqlclient.impl.*;
26+
import io.vertx.sqlclient.impl.ErrorMessageFactory;
2727
import io.vertx.sqlclient.internal.ParamDesc;
2828
import io.vertx.sqlclient.internal.PreparedStatement;
2929
import io.vertx.sqlclient.internal.RowDesc;
@@ -36,18 +36,17 @@ class MySQLPreparedStatement implements PreparedStatement {
3636
final long statementId;
3737
final String sql;
3838
final MySQLParamDesc paramDesc;
39-
final MySQLRowDesc rowDesc;
4039
final boolean closeAfterUsage;
4140

4241
private boolean sendTypesToServer;
4342
private final DataType[] bindingTypes;
4443

4544
boolean isCursorOpen;
45+
MySQLRowDesc cursorRowDescriptor;
4646

47-
MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, MySQLRowDesc rowDesc, boolean closeAfterUsage) {
47+
MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, boolean closeAfterUsage) {
4848
this.statementId = statementId;
4949
this.paramDesc = paramDesc;
50-
this.rowDesc = rowDesc;
5150
this.sql = sql;
5251
this.closeAfterUsage = closeAfterUsage;
5352

@@ -63,7 +62,7 @@ public ParamDesc paramDesc() {
6362

6463
@Override
6564
public RowDesc rowDesc() {
66-
return rowDesc;
65+
throw new UnsupportedOperationException("The client should use the column definitions provided by execute or fetch response instead of prepare response");
6766
}
6867

6968
@Override

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.vertx.mysqlclient.impl.MySQLParamDesc;
21-
import io.vertx.mysqlclient.impl.MySQLRowDesc;
22-
import io.vertx.mysqlclient.impl.datatype.DataFormat;
2321
import io.vertx.mysqlclient.impl.protocol.ColumnDefinition;
2422
import io.vertx.mysqlclient.impl.protocol.CommandType;
2523
import io.vertx.sqlclient.internal.PreparedStatement;
@@ -135,7 +133,6 @@ private void handleReadyForQuery() {
135133
cmd.sql(),
136134
this.statementId,
137135
new MySQLParamDesc(paramDescs),
138-
MySQLRowDesc.create(columnDescs, DataFormat.BINARY),
139136
!cmd.isManaged())));
140137
}
141138

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,13 @@ protected void handleResultsetColumnDefinitions(ByteBuf payload) {
9494
protected void handleResultsetColumnDefinitionsDecodingCompleted() {
9595
commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET;
9696
MySQLRowDesc mySQLRowDesc = MySQLRowDesc.create(columnDefinitions, format); // use the column definitions if provided by execute or fetch response instead of prepare response
97+
handleRowDescriptorCreated(mySQLRowDesc);
9798
decoder = new RowResultDecoder<>(cmd.collector(), mySQLRowDesc);
9899
}
99100

101+
protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) {
102+
}
103+
100104
protected void handleRows(ByteBuf payload, int payloadLength) {
101105
/*
102106
Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff)

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ void encode(MySQLEncoder encoder) {
2929
statement.cleanBindings();
3030

3131
statement.isCursorOpen = false;
32+
statement.cursorRowDescriptor = null;
3233
sendStatementResetCommand(statement.statementId);
3334
}
3435

vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
import io.vertx.core.Vertx;
1515
import io.vertx.core.buffer.Buffer;
1616
import io.vertx.core.file.FileSystem;
17+
import io.vertx.ext.unit.Async;
1718
import io.vertx.ext.unit.TestContext;
1819
import io.vertx.ext.unit.junit.VertxUnitRunner;
1920
import io.vertx.mysqlclient.MySQLClient;
2021
import io.vertx.mysqlclient.MySQLConnectOptions;
2122
import io.vertx.mysqlclient.MySQLConnection;
2223
import io.vertx.sqlclient.*;
2324
import org.junit.After;
24-
import org.junit.Assume;
2525
import org.junit.Before;
2626
import org.junit.Test;
2727
import org.junit.runner.RunWith;
@@ -32,6 +32,8 @@
3232
import java.util.List;
3333
import java.util.concurrent.ThreadLocalRandom;
3434

35+
import static org.junit.Assume.assumeFalse;
36+
3537
@RunWith(VertxUnitRunner.class)
3638
public class MySQLQueryTest extends MySQLTestBase {
3739

@@ -280,7 +282,7 @@ public void testDecodePacketSizeMoreThan16MB(TestContext ctx) {
280282

281283
@Test
282284
public void testEncodePacketSizeMoreThan16MB(TestContext ctx) {
283-
Assume.assumeFalse(rule.isUsingMySQL5_6());
285+
assumeFalse(rule.isUsingMySQL5_6());
284286
int dataSize = 20 * 1024 * 1024; // 20MB payload
285287
byte[] data = new byte[dataSize];
286288
ThreadLocalRandom.current().nextBytes(data);
@@ -470,4 +472,22 @@ public void testLocalInfileRequestInPackets(TestContext ctx) {
470472
}));
471473
}));
472474
}
475+
476+
@Test
477+
public void testColumnDefinitionChangeWithCursor(TestContext ctx) {
478+
assumeFalse("Query syntax not supported on MySQL 5", rule.isUsingMySQL5());
479+
Async rows = ctx.async(100);
480+
Async completion = ctx.async();
481+
MySQLConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> {
482+
conn
483+
.prepare("SELECT row_number() over () as \"Number\", case when 1 then 1 else 0 end as \"Case\" FROM mysql.help_relation limit 0,100")
484+
.onComplete(ctx.asyncAssertSuccess(ps -> {
485+
// Make sure to fetch from the database twice
486+
RowStream<Row> stream = ps.createStream(50);
487+
stream.exceptionHandler(ctx::fail);
488+
stream.endHandler(v -> completion.complete());
489+
stream.handler(row -> rows.countDown());
490+
}));
491+
}));
492+
}
473493
}

vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ public boolean isUsingMariaDB() {
143143
return databaseServerInfo.getDatabaseType() == DatabaseType.MariaDB;
144144
}
145145

146+
public boolean isUsingMySQL5() {
147+
return databaseServerInfo.databaseType == DatabaseType.MySQL && databaseServerInfo.dockerImageTag.startsWith("5.");
148+
}
149+
146150
public boolean isUsingMySQL5_6() {
147151
return databaseServerInfo == DatabaseServerInfo.MySQL_V5_6;
148152
}

vertx-mysql-client/src/test/resources/init.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
#allow reading mysql schema
2+
GRANT
3+
SELECT
4+
ON mysql.* TO 'mysql';
5+
16
# testing change schema
27
CREATE DATABASE emptyschema;
38
GRANT ALL ON emptyschema.* TO 'mysql'@'%';

0 commit comments

Comments
 (0)