From fbc2cbefd1249d526d393c21c339c659685d841b Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Fri, 1 Dec 2023 11:00:46 +0100 Subject: [PATCH] Add row recycling in collector --- .../io/vertx/db2client/impl/DB2RowImpl.java | 3 +- .../impl/codec/RowResultDecoder.java | 11 +++-- .../vertx/mssqlclient/impl/MSSQLRowImpl.java | 3 +- .../impl/codec/RowResultDecoder.java | 29 +++++++------- .../vertx/mysqlclient/impl/MySQLRowImpl.java | 3 +- .../mysqlclient/impl/codec/MySQLDecoder.java | 2 - .../impl/codec/RowResultDecoder.java | 11 +++-- .../io/vertx/oracleclient/impl/OracleRow.java | 3 +- .../test/tck/OracleCollectorTest.java | 11 +++++ .../java/io/vertx/pgclient/impl/RowImpl.java | 4 +- .../pgclient/impl/codec/RowResultDecoder.java | 11 +++-- .../src/main/java/io/vertx/sqlclient/Row.java | 6 +++ .../java/io/vertx/sqlclient/impl/RowBase.java | 39 ++++++++++++++++++ .../io/vertx/sqlclient/impl/RowDecoder.java | 20 ++++++++-- .../io/vertx/sqlclient/impl/RowInternal.java | 19 +++++++++ .../sqlclient/tck/CollectorTestBase.java | 40 +++++++++++++++++-- 16 files changed, 177 insertions(+), 38 deletions(-) create mode 100644 vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowBase.java create mode 100644 vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowInternal.java diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2RowImpl.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2RowImpl.java index c0956c15f..603c6ebb9 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2RowImpl.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2RowImpl.java @@ -32,9 +32,10 @@ import io.vertx.sqlclient.Row; import io.vertx.sqlclient.data.Numeric; import io.vertx.sqlclient.impl.ArrayTuple; +import io.vertx.sqlclient.impl.RowBase; import io.vertx.sqlclient.impl.RowDesc; -public class DB2RowImpl extends ArrayTuple implements Row { +public class DB2RowImpl extends RowBase { private final RowDesc rowDesc; diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/RowResultDecoder.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/RowResultDecoder.java index 3795460cd..758a4ab3b 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/RowResultDecoder.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/RowResultDecoder.java @@ -27,6 +27,7 @@ import io.vertx.sqlclient.Row; import io.vertx.sqlclient.data.Numeric; import io.vertx.sqlclient.impl.RowDecoder; +import io.vertx.sqlclient.impl.RowInternal; class RowResultDecoder extends RowDecoder { @@ -53,8 +54,12 @@ public boolean next() { } @Override - protected Row decodeRow(int len, ByteBuf in) { - Row row = new DB2RowImpl(rowDesc); + protected RowInternal row() { + return new DB2RowImpl(rowDesc); + } + + @Override + protected boolean decodeRow(int len, ByteBuf in, Row row) { for (int i = 1; i < rowDesc.columnDefinitions().columns_ + 1; i++) { int startingIdx = cursor.dataBuffer_.readerIndex(); Object o = cursor.getObject(i); @@ -73,6 +78,6 @@ protected Row decodeRow(int len, ByteBuf in) { if (LOG.isDebugEnabled()) { LOG.debug("decoded row values: " + row.deepToString()); } - return row; + return true; } } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLRowImpl.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLRowImpl.java index 8288b95e7..f24bae180 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLRowImpl.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLRowImpl.java @@ -14,6 +14,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.impl.ArrayTuple; +import io.vertx.sqlclient.impl.RowBase; import io.vertx.sqlclient.impl.RowDesc; import java.math.BigDecimal; @@ -25,7 +26,7 @@ import java.util.List; import java.util.UUID; -public class MSSQLRowImpl extends ArrayTuple implements Row { +public class MSSQLRowImpl extends RowBase { private final RowDesc rowDesc; public MSSQLRowImpl(RowDesc rowDesc) { diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/RowResultDecoder.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/RowResultDecoder.java index 13af3555a..8b4a2efd8 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/RowResultDecoder.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/RowResultDecoder.java @@ -15,8 +15,8 @@ import io.vertx.mssqlclient.impl.MSSQLRowImpl; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.impl.RowDecoder; +import io.vertx.sqlclient.impl.RowInternal; -import java.util.Objects; import java.util.stream.Collector; public class RowResultDecoder extends RowDecoder { @@ -36,18 +36,20 @@ public MSSQLRowDesc desc() { } @Override - public Row decodeRow(int len, ByteBuf in) { - Row decoded; + protected RowInternal row() { + return new MSSQLRowImpl(desc); + } + + @Override + protected boolean decodeRow(int len, ByteBuf in, Row row) { if (nbc) { - decoded = decodeMssqlNbcRow(in); + return decodeMssqlNbcRow(in, row); } else { - decoded = decodeMssqlRow(in); + return decodeMssqlRow(in, row); } - return decoded; } - private Row decodeMssqlRow(ByteBuf in) { - Row row = new MSSQLRowImpl(desc); + private boolean decodeMssqlRow(ByteBuf in, Row row) { int len = desc.size(); for (int c = 0; c < len; c++) { ColumnData columnData = desc.get(c); @@ -56,18 +58,15 @@ private Row decodeMssqlRow(ByteBuf in) { return ifNotMissing(in, row); } - private Row ifNotMissing(ByteBuf in, Row row) { - Row result; + private boolean ifNotMissing(ByteBuf in, Row row) { if (desc.hasRowStat() && in.readIntLE() == FETCH_MISSING) { - result = null; + return false; } else { - result = row; + return true; } - return result; } - private Row decodeMssqlNbcRow(ByteBuf in) { - Row row = new MSSQLRowImpl(desc); + private boolean decodeMssqlNbcRow(ByteBuf in, Row row) { int len = desc.size(); int nullBitmapByteCount = ((len - 1) >> 3) + 1; int nullBitMapStartIdx = in.readerIndex(); diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLRowImpl.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLRowImpl.java index 23418b566..3373af3f7 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLRowImpl.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLRowImpl.java @@ -20,13 +20,14 @@ import io.vertx.sqlclient.Row; import io.vertx.sqlclient.data.Numeric; import io.vertx.sqlclient.impl.ArrayTuple; +import io.vertx.sqlclient.impl.RowBase; import java.time.*; import java.time.temporal.Temporal; import java.util.List; import java.util.UUID; -public class MySQLRowImpl extends ArrayTuple implements Row { +public class MySQLRowImpl extends RowBase { private final MySQLRowDesc rowDesc; diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java index 2b0742e03..752d24396 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLDecoder.java @@ -16,8 +16,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import java.util.ArrayDeque; - import static io.vertx.mysqlclient.impl.protocol.Packets.PACKET_PAYLOAD_LENGTH_LIMIT; class MySQLDecoder extends ChannelInboundHandlerAdapter { diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/RowResultDecoder.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/RowResultDecoder.java index d3ad9a92a..103f0774a 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/RowResultDecoder.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/RowResultDecoder.java @@ -20,6 +20,7 @@ import io.vertx.mysqlclient.impl.protocol.ColumnDefinition; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.impl.RowDecoder; +import io.vertx.sqlclient.impl.RowInternal; import java.util.stream.Collector; @@ -34,8 +35,12 @@ class RowResultDecoder extends RowDecoder { } @Override - protected Row decodeRow(int len, ByteBuf in) { - Row row = new MySQLRowImpl(rowDesc); + protected RowInternal row() { + return new MySQLRowImpl(rowDesc); + } + + @Override + protected boolean decodeRow(int len, ByteBuf in, Row row) { if (rowDesc.dataFormat() == DataFormat.BINARY) { // BINARY row decoding // 0x00 packet header @@ -76,7 +81,7 @@ protected Row decodeRow(int len, ByteBuf in) { row.addValue(decoded); } } - return row; + return true; } } diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleRow.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleRow.java index b6b638c3a..6a4d8202e 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleRow.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleRow.java @@ -16,13 +16,14 @@ import io.vertx.sqlclient.Row; import io.vertx.sqlclient.data.Numeric; import io.vertx.sqlclient.impl.ArrayTuple; +import io.vertx.sqlclient.impl.RowBase; import io.vertx.sqlclient.impl.RowDesc; import java.time.*; import java.util.List; import java.util.UUID; -public class OracleRow extends ArrayTuple implements Row { +public class OracleRow extends RowBase { private final RowDesc desc; diff --git a/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleCollectorTest.java b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleCollectorTest.java index fedbd915b..b5d3bd662 100644 --- a/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleCollectorTest.java +++ b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleCollectorTest.java @@ -16,6 +16,7 @@ import io.vertx.oracleclient.test.junit.OracleRule; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.tck.CollectorTestBase; +import org.junit.Assume; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; @@ -265,4 +266,14 @@ public Set characteristics() { return Collections.emptySet(); } } + + @Override + public void testCollectorRecycle(TestContext ctx) { + Assume.assumeTrue(false); + } + + @Override + public void testCollectorNoRecycle(TestContext ctx) { + Assume.assumeTrue(false); + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/RowImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/RowImpl.java index e3484168c..4b42617a9 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/RowImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/RowImpl.java @@ -24,14 +24,16 @@ import io.vertx.sqlclient.Row; import io.vertx.sqlclient.data.Numeric; import io.vertx.sqlclient.impl.ArrayTuple; +import io.vertx.sqlclient.impl.RowBase; import io.vertx.sqlclient.impl.RowDesc; +import io.vertx.sqlclient.impl.RowInternal; import java.lang.reflect.Array; import java.time.*; import java.util.List; import java.util.UUID; -public class RowImpl extends ArrayTuple implements Row { +public class RowImpl extends RowBase { private final RowDesc desc; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/RowResultDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/RowResultDecoder.java index 6d2dc08f3..0502870e7 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/RowResultDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/RowResultDecoder.java @@ -21,6 +21,7 @@ import io.vertx.pgclient.impl.RowImpl; import io.netty.buffer.ByteBuf; import io.vertx.sqlclient.impl.RowDecoder; +import io.vertx.sqlclient.impl.RowInternal; import java.util.stream.Collector; @@ -34,8 +35,12 @@ class RowResultDecoder extends RowDecoder { } @Override - protected Row decodeRow(int len, ByteBuf in) { - Row row = new RowImpl(desc); + protected RowInternal row() { + return new RowImpl(desc); + } + + @Override + protected boolean decodeRow(int len, ByteBuf in, Row row) { for (int c = 0; c < len; ++c) { int length = in.readInt(); Object decoded = null; @@ -50,6 +55,6 @@ protected Row decodeRow(int len, ByteBuf in) { } row.addValue(decoded); } - return row; + return true; } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/Row.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/Row.java index b8b18f77a..ea554c7c2 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/Row.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/Row.java @@ -750,4 +750,10 @@ default JsonObject toJson() { return json; } + /** + * Signal the row can be recycled, this is only effective when dealing with a row in a collector + * query and the row has already been processed and transformed. + */ + default void release() { + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowBase.java new file mode 100644 index 000000000..e48f515ce --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowBase.java @@ -0,0 +1,39 @@ +package io.vertx.sqlclient.impl; + +import io.vertx.sqlclient.Tuple; + +import java.util.Collection; + +/** + * Base class for rows. + */ +public abstract class RowBase extends ArrayTuple implements RowInternal { + + private boolean released; + + public RowBase(int len) { + super(len); + } + + public RowBase(Collection c) { + super(c); + } + + public RowBase(Tuple tuple) { + super(tuple); + } + + @Override + public void release() { + released = true; + } + + @Override + public boolean tryRecycle() { + boolean ret = released; + if (ret) { + clear(); + } + return ret; + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowDecoder.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowDecoder.java index 16ab68ba5..8336094e7 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowDecoder.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowDecoder.java @@ -28,6 +28,7 @@ public abstract class RowDecoder { private final Collector collector; private BiConsumer accumulator; + private RowInternal row; private int size; private C container; private Throwable failure; @@ -39,15 +40,23 @@ protected RowDecoder(Collector collector) { reset(); } + protected abstract RowInternal row(); + public int size() { return size; } - protected abstract Row decodeRow(int len, ByteBuf in); + protected abstract boolean decodeRow(int len, ByteBuf in, Row row); public void handleRow(int len, ByteBuf in) { - Row row = decodeRow(len, in); - if (row != null && failure == null) { + RowInternal r = row; + if (r == null) { + r = row(); + } else { + row = null; + } + boolean decoded = decodeRow(len, in, r); + if (decoded && failure == null) { if (accumulator == null) { try { accumulator = collector.accumulator(); @@ -57,11 +66,14 @@ public void handleRow(int len, ByteBuf in) { } } try { - accumulator.accept(container, row); + accumulator.accept(container, r); } catch (Exception e) { failure = e; return; } + if (r.tryRecycle()) { + row = r; + } size++; } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowInternal.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowInternal.java new file mode 100644 index 000000000..a5ec91be0 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowInternal.java @@ -0,0 +1,19 @@ +package io.vertx.sqlclient.impl; + +import io.vertx.sqlclient.Row; + +/** + * Row internal API + */ +public interface RowInternal extends Row { + + /** + * Try to recycle the row, this shall be called by the row decoder to check whether the row + * instance can be reused. + * + * @return whether the row can be reused safely + */ + default boolean tryRecycle() { + return false; + } +} diff --git a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/CollectorTestBase.java b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/CollectorTestBase.java index d1cf6a69e..edbc5fe1a 100644 --- a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/CollectorTestBase.java +++ b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/CollectorTestBase.java @@ -14,6 +14,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; import io.vertx.ext.unit.TestContext; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.SqlConnection; @@ -21,9 +22,8 @@ import org.junit.Before; import org.junit.Test; -import java.util.Collections; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; @@ -260,4 +260,38 @@ public Set characteristics() { return Collections.emptySet(); } } + + @Test + public void testCollectorRecycle(TestContext ctx) { + testCollectorRecycle(ctx, true); + } + + @Test + public void testCollectorNoRecycle(TestContext ctx) { + testCollectorRecycle(ctx, false); + } + + private void testCollectorRecycle(TestContext ctx, boolean release) { + + Set hashCodes = ConcurrentHashMap.newKeySet(); + Collector recyclingCollector = Collector.of(JsonArray::new, (array, row) -> { + hashCodes.add(System.identityHashCode(row)); + array.add(row.getString("test_varchar")); + if (release) { + row.release(); + } + }, (a, b) -> null, Function.identity()); + + connector.connect(ctx.asyncAssertSuccess(conn -> { + conn.query("SELECT * FROM collector_test") + .collecting(recyclingCollector) + .execute() + .onComplete(ctx.asyncAssertSuccess(result -> { + ctx.assertEquals(release ? 1 : result.size(), hashCodes.size()); + ctx.assertEquals(new JsonArray().add("HELLO,WORLD").add("hello,world"), result.value()); + conn.close(); + })); + })); + + } }