diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2ParamDesc.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2ParamDesc.java index 88cecc1c8a..06269430e9 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2ParamDesc.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2ParamDesc.java @@ -15,6 +15,7 @@ */ package io.vertx.db2client.impl.codec; +import io.vertx.core.VertxException; import io.vertx.core.buffer.Buffer; import io.vertx.db2client.impl.drda.ClientTypes; import io.vertx.db2client.impl.drda.ColumnMetaData; @@ -35,19 +36,19 @@ ColumnMetaData paramDefinitions() { return paramDefinitions; } - public String prepare(TupleInternal values) { + public TupleInternal prepare(TupleInternal values) { if (values.size() != paramDefinitions.columns_) { - return ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDefinitions.columns_, values.size()); + throw new VertxException(ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDefinitions.columns_, values.size()), true); } for (int i = 0; i < paramDefinitions.columns_; i++) { Object val = values.getValue(i); int type = paramDefinitions.types_[i]; if (!canConvert(val, type)) { Class preferredType = ClientTypes.preferredJavaType(type); - return ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(preferredType, i, val); + throw new VertxException(ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(preferredType, i, val), true); } } - return null; + return values; } private static boolean canConvert(Object val, int type) { diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2PreparedStatement.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2PreparedStatement.java index 9eb1185c94..f70ba58e4a 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2PreparedStatement.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2PreparedStatement.java @@ -74,7 +74,7 @@ public String sql() { } @Override - public String prepare(TupleInternal values) { + public TupleInternal prepare(TupleInternal values) { return paramDesc.prepare(values); } diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLPreparedStatement.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLPreparedStatement.java index 957fe4efb5..ff282d3734 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLPreparedStatement.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLPreparedStatement.java @@ -40,8 +40,4 @@ public String sql() { return sql; } - @Override - public String prepare(TupleInternal values) { - return null; - } } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java index 8e5b77193f..ae5526856f 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java @@ -17,6 +17,7 @@ package io.vertx.mysqlclient.impl.codec; +import io.vertx.core.VertxException; import io.vertx.mysqlclient.impl.MySQLParamDesc; import io.vertx.mysqlclient.impl.MySQLRowDesc; import io.vertx.mysqlclient.impl.datatype.DataType; @@ -71,13 +72,13 @@ public String sql() { } @Override - public String prepare(TupleInternal values) { + public TupleInternal prepare(TupleInternal values) { int numberOfParameters = values.size(); int paramDescLength = paramDesc.paramDefinitions().length; if (numberOfParameters != paramDescLength) { - return ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParameters); + throw new VertxException(ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParameters), true); } else { - return null; + return values; } } diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/commands/OraclePreparedStatement.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/commands/OraclePreparedStatement.java index 7adb21dd10..fc01fecea2 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/commands/OraclePreparedStatement.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/commands/OraclePreparedStatement.java @@ -50,10 +50,4 @@ public RowDesc rowDesc() { public String sql() { return sql; } - - @Override - public String prepare(TupleInternal values) { - return null; - } - } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java index 3b5d0b2bdb..50f2ba6cec 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java @@ -20,7 +20,7 @@ /** * @author Emad Alblueshi */ -final class Bind { +final class Bind extends OutboundMessage { final byte[] statement; final DataType[] paramTypes; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalMessage.java new file mode 100644 index 0000000000..e5ad66068d --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePortalMessage.java @@ -0,0 +1,7 @@ +package io.vertx.pgclient.impl.codec; + +class ClosePortalMessage extends OutboundMessage { + + static final ClosePortalMessage INSTANCE = new ClosePortalMessage(); + +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePreparedStatementMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePreparedStatementMessage.java new file mode 100644 index 0000000000..6bff622777 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ClosePreparedStatementMessage.java @@ -0,0 +1,7 @@ +package io.vertx.pgclient.impl.codec; + +class ClosePreparedStatementMessage extends OutboundMessage { + + static final ClosePreparedStatementMessage INSTANCE = new ClosePreparedStatementMessage(); + +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataType.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataType.java index e65c3baf4b..a637387425 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataType.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataType.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.function.Function; /** * PostgreSQL object @@ -44,86 +45,86 @@ */ public enum DataType { - BOOL(16, true, Boolean.class, JDBCType.BOOLEAN, Tuple::getBoolean), - BOOL_ARRAY(1000, true, Boolean[].class, JDBCType.BOOLEAN, Tuple::getArrayOfBooleans), - INT2(21, true, Short.class, Number.class, JDBCType.SMALLINT, Tuple::getShort), - INT2_ARRAY(1005, true, Short[].class, Number[].class, JDBCType.SMALLINT, Tuple::getArrayOfShorts), - INT4(23, true, Integer.class, Number.class, JDBCType.INTEGER, Tuple::getInteger), - INT4_ARRAY(1007, true, Integer[].class, Number[].class, JDBCType.INTEGER, Tuple::getArrayOfIntegers), - INT8(20, true, Long.class, Number.class, JDBCType.BIGINT, Tuple::getLong), - INT8_ARRAY(1016, true, Long[].class, Number[].class, JDBCType.BIGINT, Tuple::getArrayOfLongs), - FLOAT4(700, true, Float.class, Number.class, JDBCType.REAL, Tuple::getFloat), - FLOAT4_ARRAY(1021, true, Float[].class, Number[].class, JDBCType.REAL, Tuple::getArrayOfFloats), - FLOAT8(701, true, Double.class, Number.class, JDBCType.DOUBLE, Tuple::getDouble), - FLOAT8_ARRAY(1022, true, Double[].class, Number[].class, JDBCType.DOUBLE, Tuple::getArrayOfDoubles), - NUMERIC(1700, false, Numeric.class, Number.class, JDBCType.NUMERIC, Tuple::getNumeric), - NUMERIC_ARRAY(1231, false, Numeric[].class, Number[].class, JDBCType.NUMERIC, Tuple::getArrayOfNumerics), - MONEY(790, true, Money.class, null), - MONEY_ARRAY(791, true, Money[].class, null), - BIT(1560, true, Object.class, JDBCType.BIT), - BIT_ARRAY(1561, true, Object[].class, JDBCType.BIT), - VARBIT(1562, true, Object.class, JDBCType.OTHER), - VARBIT_ARRAY(1563, true, Object[].class, JDBCType.BIT), - CHAR(18, true, String.class, JDBCType.BIT, Tuple::getString), - CHAR_ARRAY(1002, true, String[].class, JDBCType.CHAR, Tuple::getArrayOfStrings), - VARCHAR(1043, true, String.class, JDBCType.VARCHAR, Tuple::getString), - VARCHAR_ARRAY(1015, true, String[].class, JDBCType.VARCHAR, Tuple::getArrayOfStrings), - BPCHAR(1042, true, String.class, JDBCType.VARCHAR, Tuple::getString), - BPCHAR_ARRAY(1014, true, String[].class, JDBCType.VARCHAR, Tuple::getArrayOfStrings), - TEXT(25, true, String.class, JDBCType.LONGVARCHAR, Tuple::getString), - TEXT_ARRAY(1009, true, String[].class, JDBCType.LONGVARCHAR, Tuple::getArrayOfStrings), - NAME(19, true, String.class, JDBCType.VARCHAR, Tuple::getString), - NAME_ARRAY(1003, true, String[].class, JDBCType.VARCHAR, Tuple::getArrayOfStrings), - DATE(1082, true, LocalDate.class, JDBCType.DATE, Tuple::getLocalDate), - DATE_ARRAY(1182, true, LocalDate[].class, JDBCType.DATE, Tuple::getArrayOfLocalDates), - TIME(1083, true, LocalTime.class, JDBCType.TIME, Tuple::getLocalTime), - TIME_ARRAY(1183, true, LocalTime[].class, JDBCType.TIME, Tuple::getArrayOfLocalTimes), - TIMETZ(1266, true, OffsetTime.class, JDBCType.TIME_WITH_TIMEZONE, Tuple::getOffsetTime), - TIMETZ_ARRAY(1270, true, OffsetTime[].class, JDBCType.TIME_WITH_TIMEZONE, Tuple::getArrayOfOffsetTimes), - TIMESTAMP(1114, true, LocalDateTime.class, JDBCType.TIMESTAMP, Tuple::getLocalDateTime), - TIMESTAMP_ARRAY(1115, true, LocalDateTime[].class, JDBCType.TIMESTAMP, Tuple::getArrayOfLocalDateTimes), - TIMESTAMPTZ(1184, true, OffsetDateTime.class, JDBCType.TIMESTAMP_WITH_TIMEZONE, Tuple::getOffsetDateTime), - TIMESTAMPTZ_ARRAY(1185, true, OffsetDateTime[].class, JDBCType.TIMESTAMP_WITH_TIMEZONE, Tuple::getArrayOfOffsetDateTimes), - INTERVAL(1186, true, Interval.class, JDBCType.DATE), - INTERVAL_ARRAY(1187, true, Interval[].class, JDBCType.DATE), - BYTEA(17, true, Buffer.class, JDBCType.BINARY, Tuple::getBuffer), - BYTEA_ARRAY(1001, true, Buffer[].class, JDBCType.BINARY, Tuple::getArrayOfBuffers), - MACADDR(829, true, Object.class, JDBCType.OTHER), - INET(869, true, Inet.class, JDBCType.OTHER), - INET_ARRAY(1041, true, Inet[].class, JDBCType.OTHER), - CIDR(650, true, Cidr.class, JDBCType.OTHER), - MACADDR8(774, true, Object[].class, JDBCType.OTHER), - UUID(2950, true, UUID.class, JDBCType.OTHER, Tuple::getUUID), - UUID_ARRAY(2951, true, UUID[].class, JDBCType.OTHER, Tuple::getArrayOfUUIDs), - JSON(114, true, Object.class, JDBCType.OTHER, Tuple::getJson), - JSON_ARRAY(199, true, Object[].class, JDBCType.OTHER, Tuple::getArrayOfJsons), - JSONB(3802, true, Object.class, JDBCType.OTHER, Tuple::getJson), - JSONB_ARRAY(3807, true, Object[].class, JDBCType.OTHER, Tuple::getArrayOfJsons), - XML(142, true, Object.class, JDBCType.OTHER), - XML_ARRAY(143, true, Object[].class, JDBCType.OTHER), - POINT(600, true, Point.class, JDBCType.OTHER), - POINT_ARRAY(1017, true, Point[].class, JDBCType.OTHER), - LINE(628, true, Line.class, JDBCType.OTHER), - LINE_ARRAY(629, true, Line[].class, JDBCType.OTHER), - LSEG(601, true, LineSegment.class, JDBCType.OTHER), - LSEG_ARRAY(1018, true, LineSegment[].class, JDBCType.OTHER), - BOX(603, true, Box.class, JDBCType.OTHER), - BOX_ARRAY(1020, true, Box[].class, JDBCType.OTHER), - PATH(602, true, Path.class, JDBCType.OTHER), - PATH_ARRAY(1019, true, Path[].class, JDBCType.OTHER), - POLYGON(604, true, Polygon.class, JDBCType.OTHER), - POLYGON_ARRAY(1027, true, Polygon[].class, JDBCType.OTHER), - CIRCLE(718, true, Circle.class, JDBCType.OTHER), - CIRCLE_ARRAY(719, true, Circle[].class, JDBCType.OTHER), - HSTORE(33670, true, Object.class, JDBCType.OTHER), - OID(26, true, Object.class, JDBCType.OTHER), - OID_ARRAY(1028, true, Object[].class, JDBCType.OTHER), - VOID(2278, true, Object.class, JDBCType.OTHER), - UNKNOWN(705, false, String.class, JDBCType.OTHER, ParamExtractor::extractUnknownType), - TS_VECTOR(3614, false, String.class, JDBCType.OTHER), - TS_VECTOR_ARRAY(3643, false, String[].class, JDBCType.OTHER), - TS_QUERY(3615, false, String.class, JDBCType.OTHER), - TS_QUERY_ARRAY(3645, false, String[].class, JDBCType.OTHER); + BOOL(16, true, Boolean.class, JDBCType.BOOLEAN, null, Tuple::getBoolean, DataTypeEstimator.BOOL), + BOOL_ARRAY(1000, true, Boolean[].class, JDBCType.BOOLEAN, null, Tuple::getArrayOfBooleans, DataTypeEstimator.BOOL), + INT2(21, true, Short.class, Number.class, JDBCType.SMALLINT, Tuple::getShort, null, DataTypeEstimator.INT2), + INT2_ARRAY(1005, true, Short[].class, Number[].class, JDBCType.SMALLINT, Tuple::getArrayOfShorts, null, DataTypeEstimator.INT2), + INT4(23, true, Integer.class, Number.class, JDBCType.INTEGER, Tuple::getInteger, null, DataTypeEstimator.INT4), + INT4_ARRAY(1007, true, Integer[].class, Number[].class, JDBCType.INTEGER, Tuple::getArrayOfIntegers, null, DataTypeEstimator.INT4), + INT8(20, true, Long.class, Number.class, JDBCType.BIGINT, Tuple::getLong, null, DataTypeEstimator.INT8), + INT8_ARRAY(1016, true, Long[].class, Number[].class, JDBCType.BIGINT, Tuple::getArrayOfLongs, null, DataTypeEstimator.INT8), + FLOAT4(700, true, Float.class, Number.class, JDBCType.REAL, Tuple::getFloat, null, DataTypeEstimator.FLOAT4), + FLOAT4_ARRAY(1021, true, Float[].class, Number[].class, JDBCType.REAL, Tuple::getArrayOfFloats, null, DataTypeEstimator.FLOAT4), + FLOAT8(701, true, Double.class, Number.class, JDBCType.DOUBLE, Tuple::getDouble, null, DataTypeEstimator.FLOAT8), + FLOAT8_ARRAY(1022, true, Double[].class, Number[].class, JDBCType.DOUBLE, Tuple::getArrayOfDoubles, null, DataTypeEstimator.FLOAT8), + NUMERIC(1700, false, Numeric.class, Number.class, JDBCType.NUMERIC, Tuple::getNumeric, ParamExtractor::prepareNumeric, DataTypeEstimator.NUMERIC), + NUMERIC_ARRAY(1231, false, Numeric[].class, Number[].class, JDBCType.NUMERIC, Tuple::getArrayOfNumerics, ParamExtractor::prepareNumeric, DataTypeEstimator.NUMERIC_ARRAY), + MONEY(790, true, Money.class, null, DataTypeEstimator.MONEY), + MONEY_ARRAY(791, true, Money[].class, null, DataTypeEstimator.MONEY), + BIT(1560, true, Object.class, JDBCType.BIT, DataTypeEstimator.UNSUPPORTED), + BIT_ARRAY(1561, true, Object[].class, JDBCType.BIT, DataTypeEstimator.UNSUPPORTED), + VARBIT(1562, true, Object.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + VARBIT_ARRAY(1563, true, Object[].class, JDBCType.BIT, DataTypeEstimator.UNSUPPORTED), + CHAR(18, true, String.class, JDBCType.BIT, null, Tuple::getString, DataTypeEstimator.CHAR), + CHAR_ARRAY(1002, true, String[].class, JDBCType.CHAR, null, Tuple::getArrayOfStrings, DataTypeEstimator.CHAR), + VARCHAR(1043, true, String.class, JDBCType.VARCHAR, null, Tuple::getString, DataTypeEstimator.VARCHAR), + VARCHAR_ARRAY(1015, true, String[].class, JDBCType.VARCHAR, null, Tuple::getArrayOfStrings, DataTypeEstimator.VARCHAR), + BPCHAR(1042, true, String.class, JDBCType.VARCHAR, null, Tuple::getString, DataTypeEstimator.BPCHAR), + BPCHAR_ARRAY(1014, true, String[].class, JDBCType.VARCHAR, null, Tuple::getArrayOfStrings, DataTypeEstimator.BPCHAR), + TEXT(25, true, String.class, JDBCType.LONGVARCHAR, null, Tuple::getString, DataTypeEstimator.TEXT), + TEXT_ARRAY(1009, true, String[].class, JDBCType.LONGVARCHAR, null, Tuple::getArrayOfStrings, DataTypeEstimator.TEXT), + NAME(19, true, String.class, JDBCType.VARCHAR, null, Tuple::getString, DataTypeEstimator.NAME), + NAME_ARRAY(1003, true, String[].class, JDBCType.VARCHAR, null, Tuple::getArrayOfStrings, DataTypeEstimator.NAME), + DATE(1082, true, LocalDate.class, JDBCType.DATE, null, Tuple::getLocalDate, DataTypeEstimator.DATE), + DATE_ARRAY(1182, true, LocalDate[].class, JDBCType.DATE, null, Tuple::getArrayOfLocalDates, DataTypeEstimator.DATE), + TIME(1083, true, LocalTime.class, JDBCType.TIME, null, Tuple::getLocalTime, DataTypeEstimator.TIME), + TIME_ARRAY(1183, true, LocalTime[].class, JDBCType.TIME, null, Tuple::getArrayOfLocalTimes, DataTypeEstimator.TIME), + TIMETZ(1266, true, OffsetTime.class, JDBCType.TIME_WITH_TIMEZONE, null, Tuple::getOffsetTime, DataTypeEstimator.TIMETZ), + TIMETZ_ARRAY(1270, true, OffsetTime[].class, JDBCType.TIME_WITH_TIMEZONE, null, Tuple::getArrayOfOffsetTimes, DataTypeEstimator.TIMETZ), + TIMESTAMP(1114, true, LocalDateTime.class, JDBCType.TIMESTAMP, null, Tuple::getLocalDateTime, DataTypeEstimator.TIMESTAMP), + TIMESTAMP_ARRAY(1115, true, LocalDateTime[].class, JDBCType.TIMESTAMP, null, Tuple::getArrayOfLocalDateTimes, DataTypeEstimator.TIMESTAMP), + TIMESTAMPTZ(1184, true, OffsetDateTime.class, JDBCType.TIMESTAMP_WITH_TIMEZONE, null, Tuple::getOffsetDateTime, DataTypeEstimator.TIMESTAMPTZ), + TIMESTAMPTZ_ARRAY(1185, true, OffsetDateTime[].class, JDBCType.TIMESTAMP_WITH_TIMEZONE, null, Tuple::getArrayOfOffsetDateTimes, DataTypeEstimator.TIMESTAMPTZ), + INTERVAL(1186, true, Interval.class, JDBCType.DATE, DataTypeEstimator.INTERVAL), + INTERVAL_ARRAY(1187, true, Interval[].class, JDBCType.DATE, DataTypeEstimator.INTERVAL), + BYTEA(17, true, Buffer.class, JDBCType.BINARY, null, Tuple::getBuffer, DataTypeEstimator.BYTEA), + BYTEA_ARRAY(1001, true, Buffer[].class, JDBCType.BINARY, null, Tuple::getArrayOfBuffers, DataTypeEstimator.BYTEA), + MACADDR(829, true, Object.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + INET(869, true, Inet.class, JDBCType.OTHER, DataTypeEstimator.INET), + INET_ARRAY(1041, true, Inet[].class, JDBCType.OTHER, DataTypeEstimator.INET), + CIDR(650, true, Cidr.class, JDBCType.OTHER, DataTypeEstimator.CIDR), + MACADDR8(774, true, Object[].class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + UUID(2950, true, UUID.class, JDBCType.OTHER, null, Tuple::getUUID, DataTypeEstimator.UUID), + UUID_ARRAY(2951, true, UUID[].class, JDBCType.OTHER, null, Tuple::getArrayOfUUIDs, DataTypeEstimator.UUID), + JSON(114, true, Object.class, JDBCType.OTHER, ParamExtractor::prepareJson, Tuple::getJson, DataTypeEstimator.JSON), + JSON_ARRAY(199, true, Object[].class, JDBCType.OTHER, ParamExtractor::prepareJson, Tuple::getArrayOfJsons, DataTypeEstimator.JSON), + JSONB(3802, true, Object.class, JDBCType.OTHER, ParamExtractor::prepareJson, Tuple::getJson, DataTypeEstimator.JSONB), + JSONB_ARRAY(3807, true, Object[].class, JDBCType.OTHER, ParamExtractor::prepareJson, Tuple::getArrayOfJsons, DataTypeEstimator.JSONB), + XML(142, true, Object.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + XML_ARRAY(143, true, Object[].class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + POINT(600, true, Point.class, JDBCType.OTHER, DataTypeEstimator.POINT), + POINT_ARRAY(1017, true, Point[].class, JDBCType.OTHER, DataTypeEstimator.POINT), + LINE(628, true, Line.class, JDBCType.OTHER, DataTypeEstimator.LINE), + LINE_ARRAY(629, true, Line[].class, JDBCType.OTHER, DataTypeEstimator.LINE), + LSEG(601, true, LineSegment.class, JDBCType.OTHER, DataTypeEstimator.LSEG), + LSEG_ARRAY(1018, true, LineSegment[].class, JDBCType.OTHER, DataTypeEstimator.LSEG), + BOX(603, true, Box.class, JDBCType.OTHER, DataTypeEstimator.BOX), + BOX_ARRAY(1020, true, Box[].class, JDBCType.OTHER, DataTypeEstimator.BOX), + PATH(602, true, Path.class, JDBCType.OTHER, DataTypeEstimator.PATH), + PATH_ARRAY(1019, true, Path[].class, JDBCType.OTHER, DataTypeEstimator.PATH), + POLYGON(604, true, Polygon.class, JDBCType.OTHER, DataTypeEstimator.POLYGON), + POLYGON_ARRAY(1027, true, Polygon[].class, JDBCType.OTHER, DataTypeEstimator.POLYGON), + CIRCLE(718, true, Circle.class, JDBCType.OTHER, DataTypeEstimator.CIRCLE), + CIRCLE_ARRAY(719, true, Circle[].class, JDBCType.OTHER, DataTypeEstimator.CIRCLE), + HSTORE(33670, true, Object.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + OID(26, true, Object.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + OID_ARRAY(1028, true, Object[].class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + VOID(2278, true, Object.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + UNKNOWN(705, false, String.class, JDBCType.OTHER, ParamExtractor::prepareUnknown, ParamExtractor::extractUnknownType, DataTypeEstimator.UNKNOWN), + TS_VECTOR(3614, false, String.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + TS_VECTOR_ARRAY(3643, false, String[].class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + TS_QUERY(3615, false, String.class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED), + TS_QUERY_ARRAY(3645, false, String[].class, JDBCType.OTHER, DataTypeEstimator.UNSUPPORTED); private static final Logger logger = LoggerFactory.getLogger(DataType.class); private static final IntObjectMap oidToDataType = new IntObjectHashMap<>(); @@ -136,16 +137,21 @@ public enum DataType { final Class decodingType; final JDBCType jdbcType; final ParamExtractor paramExtractor; + final Function preEncoder; - DataType(int id, boolean supportsBinary, Class type, JDBCType jdbcType, ParamExtractor paramExtractor) { - this(id, supportsBinary, type, type, jdbcType, paramExtractor); + // > 0 : size + // < 0 : switch + final int lengthEstimator; // + + DataType(int id, boolean supportsBinary, Class type, JDBCType jdbcType, Function preEncoder, ParamExtractor paramExtractor, int lengthEstimator) { + this(id, supportsBinary, type, type, jdbcType, paramExtractor, preEncoder, lengthEstimator); } - DataType(int id, boolean supportsBinary, Class type, JDBCType jdbcType) { - this(id, supportsBinary, type, type, jdbcType, null); + DataType(int id, boolean supportsBinary, Class type, JDBCType jdbcType, int lengthEstimator) { + this(id, supportsBinary, type, type, jdbcType, null, null, lengthEstimator); } - DataType(int id, boolean supportsBinary, Class encodingType, Class decodingType, JDBCType jdbcType, ParamExtractor paramExtractor) { + DataType(int id, boolean supportsBinary, Class encodingType, Class decodingType, JDBCType jdbcType, ParamExtractor paramExtractor, Function preEncoder, int lengthEstimator) { this.id = id; this.supportsBinary = supportsBinary; this.encodingType = Objects.requireNonNull(encodingType); @@ -153,6 +159,8 @@ DataType(int id, boolean supportsBinary, Class encodingType, Class dec this.jdbcType = jdbcType; this.array = decodingType.isArray(); this.paramExtractor = paramExtractor != null ? paramExtractor : new DefaultParamExtractor<>(encodingType); + this.preEncoder = preEncoder; + this.lengthEstimator = lengthEstimator; } static DataType valueOf(int oid) { diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeCodec.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeCodec.java index f3f630657d..1b0a0aecdc 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeCodec.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeCodec.java @@ -154,14 +154,14 @@ static void encodeText(DataType id, Object value, ByteBuf buff) { private static void textEncode(DataType id, Object value, ByteBuf buff) { switch (id) { case NUMERIC: - textEncodeNUMERIC((Number) value, buff); + textEncodeNUMERIC((String) value, buff); break; case NUMERIC_ARRAY: - textEncodeNUMERIC_ARRAY((Number[]) value, buff); + textEncodeNUMERIC_ARRAY((Object[]) value, buff); break; case UNKNOWN: //default to treating unknown as a string - buff.writeCharSequence(String.valueOf(value), StandardCharsets.UTF_8); + buff.writeCharSequence((CharSequence) value, StandardCharsets.UTF_8); break; default: logger.debug("Data type " + id + " does not support text encoding"); @@ -281,13 +281,13 @@ public static void encodeBinary(DataType id, Object value, ByteBuf buff) { binaryEncodeArray((UUID[]) value, DataType.UUID, buff); break; case JSON: - binaryEncodeJSON((Object) value, buff); + binaryEncodeJSON((CharSequence) value, buff); break; case JSON_ARRAY: binaryEncodeArray((Object[]) value, DataType.JSON, buff); break; case JSONB: - binaryEncodeJSONB((Object) value, buff); + binaryEncodeJSONB((CharSequence) value, buff); break; case JSONB_ARRAY: binaryEncodeArray((Object[]) value, DataType.JSONB, buff); @@ -938,12 +938,11 @@ private static Interval textDecodeINTERVAL(int index, int len, ByteBuf buff) { return new Interval(years, months, days, hours, minutes, seconds, microseconds); } - private static void textEncodeNUMERIC(Number value, ByteBuf buff) { - String s = value.toString(); - buff.writeCharSequence(s, StandardCharsets.UTF_8); + private static void textEncodeNUMERIC(String value, ByteBuf buff) { + buff.writeCharSequence(value, StandardCharsets.UTF_8); } - private static void textEncodeNUMERIC_ARRAY(Number[] value, ByteBuf buff) { + private static void textEncodeNUMERIC_ARRAY(Object[] value, ByteBuf buff) { textEncodeArray(value, DataType.NUMERIC, buff); } @@ -1366,14 +1365,8 @@ private static Object binaryDecodeJSON(int index, int len, ByteBuf buff) { return textDecodeJSONB(index, len, buff); } - private static void binaryEncodeJSON(Object value, ByteBuf buff) { - String s; - if (value == Tuple.JSON_NULL) { - s = "null"; - } else { - s = Json.encode(value); - } - buff.writeCharSequence(s, StandardCharsets.UTF_8); + private static void binaryEncodeJSON(CharSequence value, ByteBuf buff) { + buff.writeCharSequence(value, StandardCharsets.UTF_8); } private static Object textDecodeJSONB(int index, int len, ByteBuf buff) { @@ -1410,7 +1403,7 @@ private static Object binaryDecodeJSONB(int index, int len, ByteBuf buff) { return textDecodeJSONB(index + 1, len - 1, buff); } - private static void binaryEncodeJSONB(Object value, ByteBuf buff) { + private static void binaryEncodeJSONB(CharSequence value, ByteBuf buff) { buff.writeByte(1); // version binaryEncodeJSON(value, buff); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeEstimator.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeEstimator.java new file mode 100644 index 0000000000..9ea2ff459a --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeEstimator.java @@ -0,0 +1,168 @@ +package io.vertx.pgclient.impl.codec; + +import io.netty.handler.codec.DecoderException; +import io.netty.util.CharsetUtil; +import io.vertx.core.buffer.Buffer; +import io.vertx.pgclient.data.Cidr; +import io.vertx.pgclient.data.Inet; +import io.vertx.pgclient.data.Path; +import io.vertx.pgclient.data.Polygon; + +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; + +/** + */ +class DataTypeEstimator { + + static final int UNSUPPORTED = 0; + private static final int UTF8 = -1; + + static final int NUMERIC = -2; + static final int NUMERIC_ARRAY = -7; + static final int BUFFER = -3; + + static final int UNKNOWN = -6; + + static final int BOOL = 1; + static final int INT2 = 2; + static final int INT4 = 4; + static final int INT8 = 8; + static final int FLOAT4 = 4; + static final int FLOAT8 = 8; + + static final int CHAR = UTF8; + static final int VARCHAR = UTF8; + static final int BPCHAR = UTF8; + static final int TEXT = UTF8; + static final int NAME = UTF8; + + static final int DATE = 4; + static final int TIME = 8; + static final int TIMETZ = 12; + static final int TIMESTAMP = 8; + static final int TIMESTAMPTZ = 8; + static final int INTERVAL = 16; + + static final int BYTEA = BUFFER; + + static final int INET = -10; + static final int CIDR = -9; + static final int UUID = 16; + + static final int JSON = UTF8; + static final int JSONB = -8; + + static final int MONEY = 8; + + static final int POINT = 16; + static final int LINE = 24; + static final int LSEG = 32; + static final int BOX = 32; + static final int CIRCLE = 24; + static final int POLYGON = -4; + static final int PATH = -5; + + // Eventually make this configurable per options + private static final float AVG_BYTES_PER_CHAR_UTF8 = CharsetUtil.encoder(CharsetUtil.UTF_8).averageBytesPerChar(); + + static int estimateUTF8(String s) { + return (int)(s.length() * AVG_BYTES_PER_CHAR_UTF8); + } + + static int estimateByteArray(byte[] b) { + return b.length; + } + + static int estimateCStringUTF8(String s) { + return estimateUTF8(s) + 1; + } + + private static int estimateUnknown(String value) { + return estimateUTF8(value); + } + + private static int estimateJSONB(String value) { + return 1 + estimateUTF8(value); + } + + private static int estimateNumeric(String value) { + return estimateUTF8(value); + } + + private static int estimateInetOrCidr(Cidr value) { + return estimateInetOrCidr(value.getAddress()); + } + + private static int estimateInetOrCidr(Inet value) { + return estimateInetOrCidr(value.getAddress()); + } + + private static int estimateInetOrCidr(InetAddress address) { + int len; + if (address instanceof Inet6Address) { + Inet6Address inet6Address = (Inet6Address) address; + len = inet6Address.getAddress().length; + } else if (address instanceof Inet4Address) { + Inet4Address inet4Address = (Inet4Address) address; + len = inet4Address.getAddress().length; + } else { + // Invalid + len = 0; + } + return 1 + 1 + 1 + 1 + len; + } + + private static int estimateNumericArray(Object[] value) { + int length = 1; + for (Object elt : value) { + length += elt == null ? 4 : estimateNumeric((String) elt); + } + length += value.length; + return length; + } + + private static int estimateBuffer(Buffer b) { + return b.length(); + } + + private static int estimatePolygon(Polygon p) { + return 4 + p.getPoints().size() * 16; + } + + private static int estimatePath(Path p) { + return 1 + 4 + p.getPoints().size() * 16; + } + + static int estimate(int estimator, Object o) { + if (estimator > 0) { + return estimator; + } else { + switch (estimator) { + case DataTypeEstimator.CIDR: + return estimateInetOrCidr((Cidr) o); + case DataTypeEstimator.INET: + return estimateInetOrCidr((Inet) o); + case DataTypeEstimator.JSONB: + return estimateJSONB((String) o); + case DataTypeEstimator.UNKNOWN: + return estimateUnknown((String) o); + case DataTypeEstimator.NUMERIC: + return estimateNumeric((String) o); + case DataTypeEstimator.NUMERIC_ARRAY: + return estimateNumericArray((Object[]) o); + case DataTypeEstimator.UTF8: + return estimateUTF8((String) o); + case DataTypeEstimator.BUFFER: + return estimateBuffer((Buffer) o); + case DataTypeEstimator.POLYGON: + return estimatePolygon((Polygon) o); + case DataTypeEstimator.PATH: + return estimatePath((Path) o); + default: + throw new UnsupportedOperationException(); + } + } + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java index b475694f6c..e8341dc815 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Describe.java @@ -20,7 +20,7 @@ /** * @author Emad Alblueshi */ -class Describe { +class Describe extends OutboundMessage { final byte[] statement; final String portal; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExecuteMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExecuteMessage.java new file mode 100644 index 0000000000..40686f4124 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExecuteMessage.java @@ -0,0 +1,7 @@ +package io.vertx.pgclient.impl.codec; + +class ExecuteMessage extends OutboundMessage { + + static final ExecuteMessage INSTANCE = new ExecuteMessage(); + +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/OutboundMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/OutboundMessage.java new file mode 100644 index 0000000000..7fdb6ba1c0 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/OutboundMessage.java @@ -0,0 +1,4 @@ +package io.vertx.pgclient.impl.codec; + +public class OutboundMessage { +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ParamExtractor.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ParamExtractor.java index 7cafd860a6..92f4de9a93 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ParamExtractor.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ParamExtractor.java @@ -10,6 +10,8 @@ */ package io.vertx.pgclient.impl.codec; +import io.vertx.core.json.Json; +import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.internal.TupleInternal; import java.util.Arrays; @@ -29,4 +31,24 @@ static String extractUnknownType(TupleInternal tuple, int pos) { T get(TupleInternal tuple, int idx); + private static String encodeJsonToString(Object value) { + if (value == Tuple.JSON_NULL) { + return "null"; + } else { + return Json.encode(value); + } + } + + static Object prepareUnknown(Object value) { + return String.valueOf(value); + } + + static Object prepareJson(Object value) { + return encodeJsonToString(value); + } + + static Object prepareNumeric(Object value) { + assert value instanceof Number; + return value.toString(); + } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ParseMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ParseMessage.java new file mode 100644 index 0000000000..11d26e500b --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ParseMessage.java @@ -0,0 +1,7 @@ +package io.vertx.pgclient.impl.codec; + +class ParseMessage extends OutboundMessage { + + static final ParseMessage INSTANCE = new ParseMessage(); + +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PasswordMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PasswordMessage.java index b181e88692..3a3d9df6a9 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PasswordMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PasswordMessage.java @@ -22,7 +22,7 @@ /** * @author Emad Alblueshi */ -class PasswordMessage { +class PasswordMessage extends OutboundMessage { final String hash; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java index 3e9ae27160..e4e9723498 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java @@ -30,7 +30,7 @@ import io.vertx.sqlclient.internal.RowDesc; import io.vertx.sqlclient.internal.command.*; -import java.util.Map; +import java.util.*; import static io.vertx.pgclient.impl.util.Util.writeCString; import static java.nio.charset.StandardCharsets.UTF_8; @@ -56,15 +56,42 @@ final class PgEncoder extends ChannelOutboundHandlerAdapter { private final PgCodec codec; final boolean useLayer7Proxy; private ChannelHandlerContext ctx; - private ByteBuf out; private final HexSequence psSeq = new HexSequence(); // used for generating named prepared statement name boolean closeSent; + private ArrayList pendingMessages = new ArrayList<>(); + private int capacityEstimate = 0; + PgEncoder(boolean useLayer7Proxy, PgCodec codec) { this.useLayer7Proxy = useLayer7Proxy; this.codec = codec; } + private void enqueueMessage(Object msg, int estimate) { + pendingMessages.add(msg); + capacityEstimate += estimate; + } + + private void enqueueMessage(Object msg, Object p1, int estimate) { + pendingMessages.add(msg); + pendingMessages.add(p1); + capacityEstimate += estimate; + } + + private void enqueueMessage(Object msg, Object p1, Object p2, int estimate) { + pendingMessages.add(msg); + pendingMessages.add(p1); + pendingMessages.add(p2); + capacityEstimate += estimate; + } + + private void enqueueMessage(Object msg, Object p1, Object p2, Object p3, int estimate) { + pendingMessages.add(msg); + pendingMessages.add(p1); + pendingMessages.add(p2); + pendingMessages.add(p3); + capacityEstimate += estimate; + } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -77,11 +104,8 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - if (out != null) { - ByteBuf buff = this.out; - this.out = null; - buff.release(); - } + pendingMessages.clear(); + capacityEstimate = 0; } void write(CommandBase cmd) { @@ -130,94 +154,85 @@ public void flush(ChannelHandlerContext ctx) { flush(); } - void close() { - ByteBuf buff; - if (out != null) { - buff = out; - out = null; - } else { - buff = Unpooled.EMPTY_BUFFER; + private ByteBuf renderPendingMessages() { + if (pendingMessages.isEmpty()) { + return Unpooled.EMPTY_BUFFER; } - ctx.writeAndFlush(buff).addListener(v -> { - Channel ch = channelHandlerContext().channel(); - if (ch instanceof SocketChannel) { - SocketChannel channel = (SocketChannel) ch; - channel.shutdownOutput(); - } - }); - } - void flush() { - if (out != null) { - ByteBuf buff = out; - out = null; - ctx.writeAndFlush(buff, ctx.voidPromise()); - } else { - ctx.flush(); + ByteBuf out = ctx.alloc().ioBuffer(capacityEstimate); + int index = 0; + int size = pendingMessages.size(); + while (index < size) { + Object msg = pendingMessages.get(index++); + if (msg.getClass() == SyncMessage.class) { + renderSync(out); + } else if (msg.getClass() == TerminateMessage.class) { + renderTerminate(out); + } else if (msg.getClass() == ClosePortalMessage.class) { + String portal = (String) pendingMessages.get(index++); + renderClosePortal(portal, out); + } else if (msg.getClass() == ClosePreparedStatementMessage.class) { + byte[] statementName = (byte[]) pendingMessages.get(index++); + renderClosePreparedStatement(statementName, out); + } else if (msg.getClass() == StartupMessage.class) { + renderStartupMessage((StartupMessage) msg, out); + } else if (msg.getClass() == PasswordMessage.class) { + renderPasswordMessage((PasswordMessage) msg, out); + } else if (msg.getClass() == ScramClientInitialMessage.class) { + renderScramInitialMessage((ScramClientInitialMessage) msg, out); + } else if (msg.getClass() == ScramClientFinalMessage.class) { + renderScramFinalMessage((ScramClientFinalMessage) msg, out); + } else if (msg.getClass() == Query.class) { + renderQueryMessage((Query) msg, out); + } else if (msg.getClass() == Describe.class) { + renderDescribe((Describe) msg, out); + } else if (msg.getClass() == ParseMessage.class) { + String sql = (String) pendingMessages.get(index++); + byte[] statement = (byte[]) pendingMessages.get(index++); + DataType[] parameterTypes = (DataType[]) pendingMessages.get(index++); + renderParse(sql, statement, parameterTypes, out); + } else if (msg.getClass() == ExecuteMessage.class) { + String portal = (String) pendingMessages.get(index++); + int rowCount = (Integer) pendingMessages.get(index++); + renderExecute(portal, rowCount, out); + } else if (msg.getClass() == Bind.class) { + String portal = (String) pendingMessages.get(index++); + Tuple paramValues = (Tuple) pendingMessages.get(index++); + renderBind((Bind) msg, portal, paramValues, out); + } else { + throw new AssertionError(); + } } + pendingMessages.clear(); + capacityEstimate = 0; + return out; } - /** - * This message immediately closes the connection. On receipt of this message, - * the backend closes the connection and terminates. - */ - void writeTerminate() { - ensureBuffer(); - out.writeByte(TERMINATE); - out.writeInt(4); - } - - /** - *

- * The purpose of this message is to provide a resynchronization point for error recovery. - * When an error is detected while processing any extended-query message, the backend issues {@link ErrorResponse}, - * then reads and discards messages until this message is reached, then issues {@link ReadyForQuery} and returns to normal - * message processing. - *

- * Note that no skipping occurs if an error is detected while processing this message which ensures that there is one - * and only one {@link ReadyForQuery} sent for each of this message. - *

- * Note this message does not cause a transaction block opened with BEGIN to be closed. It is possible to detect this - * situation in {@link ReadyForQuery#txStatus()} that includes {@link TxStatus} information. - */ - void writeSync() { - ensureBuffer(); - out.writeByte(SYNC); - out.writeInt(4); - } - - /** - *

- * The message closes an existing prepared statement or portal and releases resources. - * Note that closing a prepared statement implicitly closes any open portals that were constructed from that statement. - *

- * The response is either {@link CloseComplete} or {@link ErrorResponse} - * - * @param portal - */ - void writeClosePortal(String portal) { - ensureBuffer(); + private static void renderQueryMessage(Query query, ByteBuf out) { int pos = out.writerIndex(); - out.writeByte(CLOSE); + out.writeByte(QUERY); out.writeInt(0); - out.writeByte('P'); // 'S' to close a prepared statement or 'P' to close a portal - Util.writeCStringUTF8(out, portal); + Util.writeCStringUTF8(out, query.sql); out.setInt(pos + 1, out.writerIndex() - pos - 1); } - void writeClosePreparedStatement(byte[] statementName) { - ensureBuffer(); + private static int estimateQueryMessage(Query query) { + return 1 + 4 + DataTypeEstimator.estimateCStringUTF8(query.sql); + } + + private static void renderPasswordMessage(PasswordMessage msg, ByteBuf out) { int pos = out.writerIndex(); - out.writeByte(CLOSE); + out.writeByte(PASSWORD_MESSAGE); out.writeInt(0); - out.writeByte('S'); // 'S' to close a prepared statement or 'P' to close a portal - out.writeBytes(statementName); - out.setInt(pos + 1, out.writerIndex() - pos - 1); + Util.writeCStringUTF8(out, msg.hash); + out.setInt(pos + 1, out.writerIndex() - pos- 1); } - void writeStartupMessage(StartupMessage msg) { - ensureBuffer(); + private static int estimatePasswordMessage(PasswordMessage msg) { + return 1 + 4 + DataTypeEstimator.estimateCStringUTF8(msg.hash); + } + private static void renderStartupMessage(StartupMessage msg, ByteBuf out) { int pos = out.writerIndex(); out.writeInt(0); @@ -238,17 +253,23 @@ void writeStartupMessage(StartupMessage msg) { out.setInt(pos, out.writerIndex() - pos); } - void writePasswordMessage(PasswordMessage msg) { - ensureBuffer(); - int pos = out.writerIndex(); - out.writeByte(PASSWORD_MESSAGE); - out.writeInt(0); - Util.writeCStringUTF8(out, msg.hash); - out.setInt(pos + 1, out.writerIndex() - pos- 1); + private static int estimateStartupMessage(StartupMessage msg) { + int length = 4 + + 2 + + 2 + + (StartupMessage.BUFF_USER_LENGTH + 1) + + DataTypeEstimator.estimateCStringUTF8(msg.username) + + (StartupMessage.BUFF_DATABASE_LENGTH + 1) + + DataTypeEstimator.estimateCStringUTF8(msg.database); + for (Map.Entry property : msg.properties.entrySet()) { + length += DataTypeEstimator.estimateCStringUTF8(property.getKey()); + length += DataTypeEstimator.estimateCStringUTF8(property.getValue()); + } + length++; + return length; } - void writeScramClientInitialMessage(ScramClientInitialMessage msg) { - ensureBuffer(); + private static void renderScramInitialMessage(ScramClientInitialMessage msg, ByteBuf out) { out.writeByte(PASSWORD_MESSAGE); int totalLengthPosition = out.writerIndex(); out.writeInt(0); // message length -> will be set later @@ -263,8 +284,11 @@ void writeScramClientInitialMessage(ScramClientInitialMessage msg) { out.setInt(totalLengthPosition, out.writerIndex() - totalLengthPosition); } - void writeScramClientFinalMessage(ScramClientFinalMessage msg) { - ensureBuffer(); + private static int estimateScramInitialMessage(ScramClientInitialMessage msg) { + return 1 + 4 + DataTypeEstimator.estimateCStringUTF8(msg.mechanism) + 4 + DataTypeEstimator.estimateUTF8(msg.message); + } + + private static void renderScramFinalMessage(ScramClientFinalMessage msg, ByteBuf out) { out.writeByte(PASSWORD_MESSAGE); int totalLengthPosition = out.writerIndex(); out.writeInt(0); // message length -> will be set later @@ -274,43 +298,58 @@ void writeScramClientFinalMessage(ScramClientFinalMessage msg) { out.setInt(totalLengthPosition, out.writerIndex() - totalLengthPosition); } - /** - *

- * This message includes an SQL command (or commands) expressed as a text string. - *

- * The possible response messages from the backend are - * {@link CommandComplete}, {@link RowDesc}, {@link DataRow}, {@link EmptyQueryResponse}, {@link ErrorResponse}, - * {@link ReadyForQuery} and {@link NoticeResponse} - */ - void writeQuery(Query query) { - ensureBuffer(); + private static int estimateScramFinalMessage(ScramClientFinalMessage msg) { + return 1 + 4 + DataTypeEstimator.estimateUTF8(msg.message); + } + + private static void renderClosePreparedStatement(byte[] statementName, ByteBuf out) { int pos = out.writerIndex(); - out.writeByte(QUERY); + out.writeByte(CLOSE); out.writeInt(0); - Util.writeCStringUTF8(out, query.sql); + out.writeByte('S'); // 'S' to close a prepared statement or 'P' to close a portal + out.writeBytes(statementName); out.setInt(pos + 1, out.writerIndex() - pos - 1); } - /** - *

- * The message that using "statement" variant specifies the name of an existing prepared statement. - *

- * The response is a {@link ParamDesc} message describing the parameters needed by the statement, - * followed by a {@link RowDesc} message describing the rows that will be returned when the statement is eventually - * executed or a {@link NoData} message if the statement will not return rows. - * {@link ErrorResponse} is issued if there is no such prepared statement. - *

- * Note that since {@link Bind} has not yet been issued, the formats to be used for returned columns are not yet known to - * the backend; the format code fields in the {@link RowDesc} message will be zeroes in this case. - *

- * The message that using "portal" variant specifies the name of an existing portal. - *

- * The response is a {@link RowDesc} message describing the rows that will be returned by executing the portal; - * or a {@link NoData} message if the portal does not contain a query that will return rows; or {@link ErrorResponse} - * if there is no such portal. - */ - void writeDescribe(Describe describe) { - ensureBuffer(); + private static int estimateClosePreparedStatement(byte[] statementName) { + return 1 + 4 + 1 + DataTypeEstimator.estimateByteArray(statementName); + } + + private static void renderTerminate(ByteBuf out) { + out.writeByte(TERMINATE); + out.writeInt(4); + } + + private static int estimateTerminate() { + return 1 + 4; + } + + private static void renderSync(ByteBuf out) { + out.writeByte(SYNC); + out.writeInt(4); + } + + private static int estimateSync() { + return 1 + 4; + } + + private static void renderExecute(String portal, int rowCount, ByteBuf out) { + int pos = out.writerIndex(); + out.writeByte(EXECUTE); + out.writeInt(0); + if (portal != null) { + out.writeCharSequence(portal, UTF_8); + } + out.writeByte(0); + out.writeInt(rowCount); // Zero denotes "no limit" maybe for ReadStream + out.setInt(pos + 1, out.writerIndex() - pos - 1); + } + + private static int estimateExecute(String portal, int rowCount) { + return 1 + 4 + (portal != null ? DataTypeEstimator.estimateUTF8(portal) : 0) + 1 + 4; + } + + private static void renderDescribe(Describe describe, ByteBuf out) { int pos = out.writerIndex(); out.writeByte(DESCRIBE); out.writeInt(0); @@ -327,8 +366,19 @@ void writeDescribe(Describe describe) { out.setInt(pos + 1, out.writerIndex() - pos- 1); } - void writeParse(String sql, byte[] statement, DataType[] parameterTypes) { - ensureBuffer(); + private static int estimateDescribe(Describe describe) { + int length = 1 + 4; + if (describe.statement.length > 1) { + length += 1 + DataTypeEstimator.estimateByteArray(describe.statement); + } else if (describe.portal != null) { + length += 1 + DataTypeEstimator.estimateCStringUTF8(describe.portal); + } else { + length += 1 + DataTypeEstimator.estimateCStringUTF8(""); + } + return length; + } + + private static void renderParse(String sql, byte[] statement, DataType[] parameterTypes, ByteBuf out) { int pos = out.writerIndex(); out.writeByte(PARSE); out.writeInt(0); @@ -346,47 +396,15 @@ void writeParse(String sql, byte[] statement, DataType[] parameterTypes) { out.setInt(pos + 1, out.writerIndex() - pos - 1); } - /** - * The message specifies the portal and a maximum row count (zero meaning "fetch all rows") of the result. - *

- * The row count of the result is only meaningful for portals containing commands that return row sets; - * in other cases the command is always executed to completion, and the row count of the result is ignored. - *

- * The possible responses to this message are the same as {@link Query} message, except that - * it doesn't cause {@link ReadyForQuery} or {@link RowDesc} to be issued. - *

- * If Execute terminates before completing the execution of a portal, it will send a {@link PortalSuspended} message; - * the appearance of this message tells the frontend that another Execute should be issued against the same portal to - * complete the operation. The {@link CommandComplete} message indicating completion of the source SQL command - * is not sent until the portal's execution is completed. Therefore, This message is always terminated by - * the appearance of exactly one of these messages: {@link CommandComplete}, - * {@link EmptyQueryResponse}, {@link ErrorResponse} or {@link PortalSuspended}. - * - * @author Emad Alblueshi - */ - void writeExecute(String portal, int rowCount) { - ensureBuffer(); - int pos = out.writerIndex(); - out.writeByte(EXECUTE); - out.writeInt(0); - if (portal != null) { - out.writeCharSequence(portal, UTF_8); - } - out.writeByte(0); - out.writeInt(rowCount); // Zero denotes "no limit" maybe for ReadStream - out.setInt(pos + 1, out.writerIndex() - pos - 1); + private static int estimateParse(String sql, byte[] statement, DataType[] parameterTypes) { + return 1 + + 4 + + DataTypeEstimator.estimateByteArray(statement) + + DataTypeEstimator.estimateCStringUTF8(sql) + + 2 + (parameterTypes == null ? 0 : parameterTypes.length * 4); } - /** - *

- * The message gives the name of the prepared statement, the name of portal, - * and the values to use for any parameter values present in the prepared statement. - * The supplied parameter set must match those needed by the prepared statement. - *

- * The response is either {@link BindComplete} or {@link ErrorResponse}. - */ - void writeBind(Bind bind, String portal, Tuple paramValues) { - ensureBuffer(); + private static void renderBind(Bind bind, String portal, Tuple paramValues, ByteBuf out) { int pos = out.writerIndex(); out.writeByte(BIND); out.writeInt(0); @@ -436,12 +454,218 @@ void writeBind(Bind bind, String portal, Tuple paramValues) { out.setInt(pos + 1, out.writerIndex() - pos - 1); } - private void ensureBuffer() { - if (out == null) { - out = ctx.alloc().ioBuffer(); + private static int estimateBind(Bind bind, String portal, Tuple paramValues) { + + int paramLen = paramValues.size(); + + int length = 1 + + 4 + + (portal != null ? DataTypeEstimator.estimateUTF8(portal) : 0) + + 1 + + DataTypeEstimator.estimateByteArray(bind.statement) + + 2 + + paramLen * 2 + + 2; + + for (int c = 0;c < paramLen;c++) { + Object param = paramValues.getValue(c); + if (param == null) { + length += 4; + } else { + DataType dataType = bind.paramTypes[c]; + length += 4; + if (dataType.supportsBinary) { + int estimator = dataType.lengthEstimator; + if (dataType.array) { + length += 4 + 4 + 4 + 4 + 4; + Object[] array = (Object[]) param; + for (Object elt : array) { + length += 4; + if (elt != null) { + length += DataTypeEstimator.estimate(estimator, elt); + } + } + } else { + length += DataTypeEstimator.estimate(estimator, param); + } + } else { + length += DataTypeEstimator.estimate(dataType.lengthEstimator, param); + } + } + } + + // Result columns are all in Binary format + if (bind.resultColumns.length > 0) { + length += 2 + bind.resultColumns.length * 2; + } else { + length += 2 + 2; + } + return length; + } + + private static void renderClosePortal(String portal, ByteBuf out) { + int pos = out.writerIndex(); + out.writeByte(CLOSE); + out.writeInt(0); + out.writeByte('P'); // 'S' to close a prepared statement or 'P' to close a portal + Util.writeCStringUTF8(out, portal); + out.setInt(pos + 1, out.writerIndex() - pos - 1); + } + + private static int estimateClosePortal(String portal) { + return 1 + 4 + 1 + DataTypeEstimator.estimateCStringUTF8(portal); + } + + void close() { + ByteBuf buff = renderPendingMessages(); + ctx.writeAndFlush(buff).addListener(v -> { + Channel ch = channelHandlerContext().channel(); + if (ch instanceof SocketChannel) { + SocketChannel channel = (SocketChannel) ch; + channel.shutdownOutput(); + } + }); + } + + void flush() { + ByteBuf buff = renderPendingMessages(); + if (buff == Unpooled.EMPTY_BUFFER) { + ctx.flush(); + } else { + ctx.writeAndFlush(buff, ctx.voidPromise()); } } + /** + * This message immediately closes the connection. On receipt of this message, + * the backend closes the connection and terminates. + */ + void writeTerminate() { + enqueueMessage(TerminateMessage.INSTANCE, estimateTerminate()); + } + + /** + *

+ * The purpose of this message is to provide a resynchronization point for error recovery. + * When an error is detected while processing any extended-query message, the backend issues {@link ErrorResponse}, + * then reads and discards messages until this message is reached, then issues {@link ReadyForQuery} and returns to normal + * message processing. + *

+ * Note that no skipping occurs if an error is detected while processing this message which ensures that there is one + * and only one {@link ReadyForQuery} sent for each of this message. + *

+ * Note this message does not cause a transaction block opened with BEGIN to be closed. It is possible to detect this + * situation in {@link ReadyForQuery#txStatus()} that includes {@link TxStatus} information. + */ + void writeSync() { + enqueueMessage(SyncMessage.INSTANCE, estimateSync()); + } + + /** + *

+ * The message closes an existing prepared statement or portal and releases resources. + * Note that closing a prepared statement implicitly closes any open portals that were constructed from that statement. + *

+ * The response is either {@link CloseComplete} or {@link ErrorResponse} + * + * @param portal + */ + void writeClosePortal(String portal) { + enqueueMessage(ClosePortalMessage.INSTANCE, portal, estimateClosePortal(portal)); + } + + void writeClosePreparedStatement(byte[] statementName) { + enqueueMessage(ClosePreparedStatementMessage.INSTANCE, statementName, estimateClosePreparedStatement(statementName)); + } + + void writeStartupMessage(StartupMessage msg) { + enqueueMessage(msg, estimateStartupMessage(msg)); + } + + void writePasswordMessage(PasswordMessage msg) { + enqueueMessage(msg, estimatePasswordMessage(msg)); + } + + void writeScramClientInitialMessage(ScramClientInitialMessage msg) { + enqueueMessage(msg, estimateScramInitialMessage(msg)); + } + + void writeScramClientFinalMessage(ScramClientFinalMessage msg) { + enqueueMessage(msg, estimateScramFinalMessage(msg)); + } + + /** + *

+ * This message includes an SQL command (or commands) expressed as a text string. + *

+ * The possible response messages from the backend are + * {@link CommandComplete}, {@link RowDesc}, {@link DataRow}, {@link EmptyQueryResponse}, {@link ErrorResponse}, + * {@link ReadyForQuery} and {@link NoticeResponse} + */ + void writeQuery(Query query) { + enqueueMessage(query, estimateQueryMessage(query)); + } + + /** + *

+ * The message that using "statement" variant specifies the name of an existing prepared statement. + *

+ * The response is a {@link ParamDesc} message describing the parameters needed by the statement, + * followed by a {@link RowDesc} message describing the rows that will be returned when the statement is eventually + * executed or a {@link NoData} message if the statement will not return rows. + * {@link ErrorResponse} is issued if there is no such prepared statement. + *

+ * Note that since {@link Bind} has not yet been issued, the formats to be used for returned columns are not yet known to + * the backend; the format code fields in the {@link RowDesc} message will be zeroes in this case. + *

+ * The message that using "portal" variant specifies the name of an existing portal. + *

+ * The response is a {@link RowDesc} message describing the rows that will be returned by executing the portal; + * or a {@link NoData} message if the portal does not contain a query that will return rows; or {@link ErrorResponse} + * if there is no such portal. + */ + void writeDescribe(Describe describe) { + enqueueMessage(describe, estimateDescribe(describe)); + } + + void writeParse(String sql, byte[] statement, DataType[] parameterTypes) { + enqueueMessage(ParseMessage.INSTANCE, sql, statement, parameterTypes, estimateParse(sql, statement, parameterTypes)); + } + + /** + * The message specifies the portal and a maximum row count (zero meaning "fetch all rows") of the result. + *

+ * The row count of the result is only meaningful for portals containing commands that return row sets; + * in other cases the command is always executed to completion, and the row count of the result is ignored. + *

+ * The possible responses to this message are the same as {@link Query} message, except that + * it doesn't cause {@link ReadyForQuery} or {@link RowDesc} to be issued. + *

+ * If Execute terminates before completing the execution of a portal, it will send a {@link PortalSuspended} message; + * the appearance of this message tells the frontend that another Execute should be issued against the same portal to + * complete the operation. The {@link CommandComplete} message indicating completion of the source SQL command + * is not sent until the portal's execution is completed. Therefore, This message is always terminated by + * the appearance of exactly one of these messages: {@link CommandComplete}, + * {@link EmptyQueryResponse}, {@link ErrorResponse} or {@link PortalSuspended}. + * + * @author Emad Alblueshi + */ + void writeExecute(String portal, int rowCount) { + enqueueMessage(ExecuteMessage.INSTANCE, portal, rowCount, estimateExecute(portal, rowCount)); + } + + /** + *

+ * The message gives the name of the prepared statement, the name of portal, + * and the values to use for any parameter values present in the prepared statement. + * The supplied parameter set must match those needed by the prepared statement. + *

+ * The response is either {@link BindComplete} or {@link ErrorResponse}. + */ + void writeBind(Bind bind, String portal, Tuple paramValues) { + enqueueMessage(bind, portal, paramValues, estimateBind(bind, portal, paramValues)); + } + byte[] nextStatementName() { return psSeq.next(); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgParamDesc.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgParamDesc.java index 5c2d133db1..9f4fd9e98c 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgParamDesc.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgParamDesc.java @@ -16,11 +16,14 @@ */ package io.vertx.pgclient.impl.codec; +import io.vertx.core.VertxException; import io.vertx.sqlclient.impl.ErrorMessageFactory; +import io.vertx.sqlclient.internal.ArrayTuple; import io.vertx.sqlclient.internal.ParamDesc; import io.vertx.sqlclient.internal.TupleInternal; import java.util.Arrays; +import java.util.function.Function; /** * @author Emad Alblueshi @@ -38,15 +41,16 @@ DataType[] paramDataTypes() { return paramDataTypes; } - public String prepare(TupleInternal values) { + public TupleInternal prepare(TupleInternal values) { int numberOfParams = values.size(); if (numberOfParams > 65535) { - return "The number of parameters (" + numberOfParams + ") exceeds the maximum of 65535. Use arrays or split the query."; + throw new VertxException("The number of parameters (" + numberOfParams + ") exceeds the maximum of 65535. Use arrays or split the query.", true); } int paramDescLength = paramDataTypes.length; if (numberOfParams != paramDescLength) { - return ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParams); + throw new VertxException(ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParams), true); } + TupleInternal prepared = values; for (int i = 0; i < paramDescLength; i++) { DataType paramDataType = paramDataTypes[i]; ParamExtractor extractor = paramDataType.paramExtractor; @@ -54,11 +58,29 @@ public String prepare(TupleInternal values) { try { val = extractor.get(values, i); } catch (Exception e) { - return ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(paramDataType.decodingType, i, values.getValue(i)); + throw new VertxException(ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(paramDataType.decodingType, i, values.getValue(i)), true); } - values.setValue(i, val); + if (val != null) { + Function preparator = paramDataType.preEncoder; + if (preparator != null) { + if (prepared == values) { + prepared = new ArrayTuple(prepared); + } + if (paramDataType.array) { + Object[] array = (Object[]) val; + Object[] tmp = new Object[array.length]; + for (int j = 0;j < array.length;j++) { + tmp[j] = preparator.apply(array[j]); + } + val = tmp; + } else { + val = preparator.apply(val); + } + } + } + prepared.setValue(i, val); } - return null; + return prepared; } @Override diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java index be9c5036eb..1f078cb558 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgPreparedStatement.java @@ -53,7 +53,7 @@ public String sql() { } @Override - public String prepare(TupleInternal values) { + public TupleInternal prepare(TupleInternal values) { return paramDesc.prepare(values); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Query.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Query.java index f6fe07a152..72710b9ba9 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Query.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Query.java @@ -20,7 +20,7 @@ /** * @author Emad Alblueshi */ -class Query { +class Query extends OutboundMessage { final String sql; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientFinalMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientFinalMessage.java index 5861f1bd4f..adbdb5fe3b 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientFinalMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientFinalMessage.java @@ -19,7 +19,7 @@ /** */ -class ScramClientFinalMessage { +class ScramClientFinalMessage extends OutboundMessage { final String message; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientInitialMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientInitialMessage.java index 270991de88..d35930e10f 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientInitialMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ScramClientInitialMessage.java @@ -19,7 +19,7 @@ /** */ -public class ScramClientInitialMessage { +public class ScramClientInitialMessage extends OutboundMessage { final String mechanism; final String message; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/StartupMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/StartupMessage.java index a1d6ceff7e..31eb47144e 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/StartupMessage.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/StartupMessage.java @@ -27,11 +27,14 @@ /** * @author Emad Alblueshi */ -class StartupMessage { +class StartupMessage extends OutboundMessage { static final ByteBuf BUFF_USER = Unpooled.copiedBuffer("user", UTF_8).asReadOnly(); static final ByteBuf BUFF_DATABASE = Unpooled.copiedBuffer("database", UTF_8).asReadOnly(); + static final int BUFF_USER_LENGTH = 4; + static final int BUFF_DATABASE_LENGTH = 8; + final String username; final String database; final Map properties; diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SyncMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SyncMessage.java new file mode 100644 index 0000000000..a870d5e88d --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/SyncMessage.java @@ -0,0 +1,7 @@ +package io.vertx.pgclient.impl.codec; + +class SyncMessage extends OutboundMessage { + + static final SyncMessage INSTANCE = new SyncMessage(); + +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TerminateMessage.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TerminateMessage.java new file mode 100644 index 0000000000..abf0335e3b --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TerminateMessage.java @@ -0,0 +1,7 @@ +package io.vertx.pgclient.impl.codec; + +class TerminateMessage extends OutboundMessage { + + static final TerminateMessage INSTANCE = new TerminateMessage(); + +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/util/Util.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/util/Util.java index 188d0c0af6..87551f1b06 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/util/Util.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/util/Util.java @@ -18,6 +18,7 @@ package io.vertx.pgclient.impl.util; import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; import io.vertx.core.buffer.Buffer; import io.vertx.sqlclient.Tuple; diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java index cc9d6e456e..b60c1f2970 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/QueryExecutor.java @@ -29,6 +29,7 @@ import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.internal.TupleInternal; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; import java.util.stream.Collector; @@ -65,23 +66,24 @@ QueryResultBuilder executeExtendedQuery(CommandScheduler scheduler, PreparedStatement preparedStatement, PrepareOptions options, boolean autoCommit, - Tuple arguments, + Tuple values, int fetch, String cursorId, boolean suspended, PromiseInternal promise) { ContextInternal context = promise.context(); QueryResultBuilder handler = createHandler(promise); - String msg = preparedStatement.prepare((TupleInternal) arguments); - if (msg != null) { - handler.fail(msg); + try { + values = preparedStatement.prepare((TupleInternal) values); + } catch (Exception e) { + handler.fail(e); return null; } ExtendedQueryCommand cmd = ExtendedQueryCommand.createQuery( preparedStatement.sql(), options, preparedStatement, - arguments, + values, fetch, cursorId, suspended, @@ -122,12 +124,11 @@ void executeBatchQuery(CommandScheduler scheduler, PromiseInternal promise) { ContextInternal context = promise.context(); QueryResultBuilder handler = createHandler(promise); - for (Tuple args : batch) { - String msg = preparedStatement.prepare((TupleInternal)args); - if (msg != null) { - handler.fail(msg); - return; - } + try { + batch = preparedStatement.prepare((List) batch); + } catch (Exception e) { + handler.fail(e); + return; } ExtendedQueryCommand cmd = ExtendedQueryCommand.createBatch(preparedStatement.sql(), options, preparedStatement, batch, autoCommit, collector, handler); scheduler.schedule(cmd, handler); diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/PreparedStatement.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/PreparedStatement.java index c223728359..55591917f6 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/PreparedStatement.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/PreparedStatement.java @@ -17,6 +17,11 @@ package io.vertx.sqlclient.internal; +import io.vertx.sqlclient.Tuple; + +import java.util.ArrayList; +import java.util.List; + public interface PreparedStatement { ParamDesc paramDesc(); @@ -25,6 +30,23 @@ public interface PreparedStatement { String sql(); - String prepare(TupleInternal values); + default List prepare(List batch) throws Exception { + List actual = batch; + int size = actual.size(); + for (int idx = 0;idx < size;idx++) { + TupleInternal values = batch.get(idx); + TupleInternal prepared = prepare(values); + if (prepared != values) { + if (batch == actual) { + actual = new ArrayList<>(actual); + } + actual.set(idx, prepared); + } + } + return actual; + } + default TupleInternal prepare(TupleInternal values) throws Exception { + return values; + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/ExtendedQueryCommand.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/ExtendedQueryCommand.java index dd44fb5b02..ba3956f95a 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/ExtendedQueryCommand.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/command/ExtendedQueryCommand.java @@ -72,10 +72,11 @@ public static ExtendedQueryCommand createBatch( protected final PrepareOptions options; public PreparedStatement ps; protected final boolean batch; - protected final Object tuples; + private Object tuples; protected final int fetch; protected final String cursorId; protected final boolean suspended; + private boolean prepared; private ExtendedQueryCommand(String sql, PrepareOptions options, @@ -97,6 +98,7 @@ private ExtendedQueryCommand(String sql, this.fetch = fetch; this.cursorId = cursorId; this.suspended = suspended; + this.prepared = ps != null; } public PrepareOptions options() { @@ -109,17 +111,17 @@ public PrepareOptions options() { * @return {@code null} if the tuple preparation was successfull otherwise the validation error */ public String prepare() { - if (ps != null) { - if (batch) { - for (TupleInternal tuple : (List) tuples) { - String msg = ps.prepare(tuple); - if (msg != null) { - return msg; - } + if (ps != null && !prepared) { + prepared = true; // TODO : fix this + try { + if (batch) { + tuples = ps.prepare((List) tuples); + return null; + } else { + tuples = ps.prepare((TupleInternal) tuples); } - return null; - } else { - return ps.prepare((TupleInternal) tuples); + } catch (Exception e) { + return e.getMessage(); } } return null;