Skip to content

Commit 555d5e9

Browse files
committed
Let the vertx-pg-client encoder estimate the capacity of buffer it allocates.
Motivation: The pg encoder allocates Netty buffer without an initial capacity, when pipelining is used this buffer can be reallocated multiple times and in agressive scenario can lead to out of memory errors with the adapative allocator. Changes: Cumulate and encode all the outbound messages at once when flush happens instead of cumulating them in the outbound buffer. Capacity is estimated when a message is enqueued. At flush time, a single buffer of the estimated capacity is created and the outbound messages are written to this buffer. This requires to pre-render some messages to estimate their length, e.g. a json object is pre-rendered to a string that can be then estimated. The client has been modified to handle message preparation which takes care of this and lazy duplicate the tuple / list of tuples when this happens.
1 parent e5f39ea commit 555d5e9

File tree

30 files changed

+827
-325
lines changed

30 files changed

+827
-325
lines changed

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2ParamDesc.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.vertx.db2client.impl.codec;
1717

18+
import io.vertx.core.VertxException;
1819
import io.vertx.core.buffer.Buffer;
1920
import io.vertx.db2client.impl.drda.ClientTypes;
2021
import io.vertx.db2client.impl.drda.ColumnMetaData;
@@ -35,19 +36,19 @@ ColumnMetaData paramDefinitions() {
3536
return paramDefinitions;
3637
}
3738

38-
public String prepare(TupleInternal values) {
39+
public TupleInternal prepare(TupleInternal values) {
3940
if (values.size() != paramDefinitions.columns_) {
40-
return ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDefinitions.columns_, values.size());
41+
throw new VertxException(ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDefinitions.columns_, values.size()), true);
4142
}
4243
for (int i = 0; i < paramDefinitions.columns_; i++) {
4344
Object val = values.getValue(i);
4445
int type = paramDefinitions.types_[i];
4546
if (!canConvert(val, type)) {
4647
Class<?> preferredType = ClientTypes.preferredJavaType(type);
47-
return ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(preferredType, i, val);
48+
throw new VertxException(ErrorMessageFactory.buildWhenArgumentsTypeNotMatched(preferredType, i, val), true);
4849
}
4950
}
50-
return null;
51+
return values;
5152
}
5253

5354
private static boolean canConvert(Object val, int type) {

vertx-db2-client/src/main/java/io/vertx/db2client/impl/codec/DB2PreparedStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public String sql() {
7474
}
7575

7676
@Override
77-
public String prepare(TupleInternal values) {
77+
public TupleInternal prepare(TupleInternal values) {
7878
return paramDesc.prepare(values);
7979
}
8080

vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/codec/MSSQLPreparedStatement.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,4 @@ public String sql() {
4040
return sql;
4141
}
4242

43-
@Override
44-
public String prepare(TupleInternal values) {
45-
return null;
46-
}
4743
}

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package io.vertx.mysqlclient.impl.codec;
1919

20+
import io.vertx.core.VertxException;
2021
import io.vertx.mysqlclient.impl.MySQLParamDesc;
2122
import io.vertx.mysqlclient.impl.MySQLRowDesc;
2223
import io.vertx.mysqlclient.impl.datatype.DataType;
@@ -71,13 +72,13 @@ public String sql() {
7172
}
7273

7374
@Override
74-
public String prepare(TupleInternal values) {
75+
public TupleInternal prepare(TupleInternal values) {
7576
int numberOfParameters = values.size();
7677
int paramDescLength = paramDesc.paramDefinitions().length;
7778
if (numberOfParameters != paramDescLength) {
78-
return ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParameters);
79+
throw new VertxException(ErrorMessageFactory.buildWhenArgumentsLengthNotMatched(paramDescLength, numberOfParameters), true);
7980
} else {
80-
return null;
81+
return values;
8182
}
8283
}
8384

vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/commands/OraclePreparedStatement.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,4 @@ public RowDesc rowDesc() {
5050
public String sql() {
5151
return sql;
5252
}
53-
54-
@Override
55-
public String prepare(TupleInternal values) {
56-
return null;
57-
}
58-
5953
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/Bind.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
/**
2121
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
2222
*/
23-
final class Bind {
23+
final class Bind extends OutboundMessage {
2424

2525
final byte[] statement;
2626
final DataType[] paramTypes;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
class ClosePortalMessage extends OutboundMessage {
4+
5+
static final ClosePortalMessage INSTANCE = new ClosePortalMessage();
6+
7+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.vertx.pgclient.impl.codec;
2+
3+
class ClosePreparedStatementMessage extends OutboundMessage {
4+
5+
static final ClosePreparedStatementMessage INSTANCE = new ClosePreparedStatementMessage();
6+
7+
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataType.java

Lines changed: 93 additions & 85 deletions
Large diffs are not rendered by default.

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/DataTypeCodec.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,14 @@ static void encodeText(DataType id, Object value, ByteBuf buff) {
154154
private static void textEncode(DataType id, Object value, ByteBuf buff) {
155155
switch (id) {
156156
case NUMERIC:
157-
textEncodeNUMERIC((Number) value, buff);
157+
textEncodeNUMERIC((String) value, buff);
158158
break;
159159
case NUMERIC_ARRAY:
160-
textEncodeNUMERIC_ARRAY((Number[]) value, buff);
160+
textEncodeNUMERIC_ARRAY((Object[]) value, buff);
161161
break;
162162
case UNKNOWN:
163163
//default to treating unknown as a string
164-
buff.writeCharSequence(String.valueOf(value), StandardCharsets.UTF_8);
164+
buff.writeCharSequence((CharSequence) value, StandardCharsets.UTF_8);
165165
break;
166166
default:
167167
logger.debug("Data type " + id + " does not support text encoding");
@@ -281,13 +281,13 @@ public static void encodeBinary(DataType id, Object value, ByteBuf buff) {
281281
binaryEncodeArray((UUID[]) value, DataType.UUID, buff);
282282
break;
283283
case JSON:
284-
binaryEncodeJSON((Object) value, buff);
284+
binaryEncodeJSON((CharSequence) value, buff);
285285
break;
286286
case JSON_ARRAY:
287287
binaryEncodeArray((Object[]) value, DataType.JSON, buff);
288288
break;
289289
case JSONB:
290-
binaryEncodeJSONB((Object) value, buff);
290+
binaryEncodeJSONB((CharSequence) value, buff);
291291
break;
292292
case JSONB_ARRAY:
293293
binaryEncodeArray((Object[]) value, DataType.JSONB, buff);
@@ -938,12 +938,11 @@ private static Interval textDecodeINTERVAL(int index, int len, ByteBuf buff) {
938938
return new Interval(years, months, days, hours, minutes, seconds, microseconds);
939939
}
940940

941-
private static void textEncodeNUMERIC(Number value, ByteBuf buff) {
942-
String s = value.toString();
943-
buff.writeCharSequence(s, StandardCharsets.UTF_8);
941+
private static void textEncodeNUMERIC(String value, ByteBuf buff) {
942+
buff.writeCharSequence(value, StandardCharsets.UTF_8);
944943
}
945944

946-
private static void textEncodeNUMERIC_ARRAY(Number[] value, ByteBuf buff) {
945+
private static void textEncodeNUMERIC_ARRAY(Object[] value, ByteBuf buff) {
947946
textEncodeArray(value, DataType.NUMERIC, buff);
948947
}
949948

@@ -1366,14 +1365,8 @@ private static Object binaryDecodeJSON(int index, int len, ByteBuf buff) {
13661365
return textDecodeJSONB(index, len, buff);
13671366
}
13681367

1369-
private static void binaryEncodeJSON(Object value, ByteBuf buff) {
1370-
String s;
1371-
if (value == Tuple.JSON_NULL) {
1372-
s = "null";
1373-
} else {
1374-
s = Json.encode(value);
1375-
}
1376-
buff.writeCharSequence(s, StandardCharsets.UTF_8);
1368+
private static void binaryEncodeJSON(CharSequence value, ByteBuf buff) {
1369+
buff.writeCharSequence(value, StandardCharsets.UTF_8);
13771370
}
13781371

13791372
private static Object textDecodeJSONB(int index, int len, ByteBuf buff) {
@@ -1410,7 +1403,7 @@ private static Object binaryDecodeJSONB(int index, int len, ByteBuf buff) {
14101403
return textDecodeJSONB(index + 1, len - 1, buff);
14111404
}
14121405

1413-
private static void binaryEncodeJSONB(Object value, ByteBuf buff) {
1406+
private static void binaryEncodeJSONB(CharSequence value, ByteBuf buff) {
14141407
buff.writeByte(1); // version
14151408
binaryEncodeJSON(value, buff);
14161409
}

0 commit comments

Comments
 (0)