|
1 | | -package com.clickhouse.client.api.internal; |
| 1 | +package com.clickhouse.client.api.data_formats.internal; |
2 | 2 |
|
3 | 3 | import com.clickhouse.client.api.Client; |
4 | 4 | import com.clickhouse.client.api.ClientException; |
5 | | -import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader; |
6 | 5 | import com.clickhouse.client.api.query.POJOSetter; |
7 | 6 | import com.clickhouse.data.ClickHouseAggregateFunction; |
8 | 7 | import com.clickhouse.data.ClickHouseColumn; |
@@ -72,14 +71,14 @@ private static void serializeArrayData(OutputStream stream, Object value, ClickH |
72 | 71 | //Serialize the array to the stream |
73 | 72 | //The array is a list of values |
74 | 73 | List<?> values = (List<?>) value; |
75 | | - BinaryStreamUtils.writeVarInt(stream, values.size()); |
| 74 | + writeVarInt(stream, values.size()); |
76 | 75 | for (Object val : values) { |
77 | 76 | if (column.getArrayBaseColumn().isNullable()) { |
78 | 77 | if (val == null) { |
79 | | - BinaryStreamUtils.writeNull(stream); |
| 78 | + writeNull(stream); |
80 | 79 | continue; |
81 | 80 | } |
82 | | - BinaryStreamUtils.writeNonNull(stream); |
| 81 | + writeNonNull(stream); |
83 | 82 | } |
84 | 83 | serializeData(stream, val, column.getArrayBaseColumn()); |
85 | 84 | } |
@@ -107,7 +106,7 @@ private static void serializeMapData(OutputStream stream, Object value, ClickHou |
107 | 106 | //Serialize the map to the stream |
108 | 107 | //The map is a list of key-value pairs |
109 | 108 | Map<?, ?> map = (Map<?, ?>) value; |
110 | | - BinaryStreamUtils.writeVarInt(stream, map.size()); |
| 109 | + writeVarInt(stream, map.size()); |
111 | 110 | map.forEach((key, val) -> { |
112 | 111 | try { |
113 | 112 | serializePrimitiveData(stream, key, Objects.requireNonNull(column.getKeyInfo())); |
@@ -213,7 +212,13 @@ private static void serializePrimitiveData(OutputStream stream, Object value, Cl |
213 | 212 |
|
214 | 213 | private static void serializeAggregateFunction(OutputStream stream, Object value, ClickHouseColumn column) throws IOException { |
215 | 214 | if (column.getAggregateFunction() == ClickHouseAggregateFunction.groupBitmap) { |
216 | | - BinaryStreamUtils.writeBitmap(stream, (ClickHouseBitmap) value); |
| 215 | + if (value == null) { |
| 216 | + throw new IllegalArgumentException("Cannot serialize null value for aggregate function: " + column.getAggregateFunction()); |
| 217 | + } else if (value instanceof ClickHouseBitmap) { |
| 218 | + stream.write(((ClickHouseBitmap)value).toBytes()); // TODO: review toBytes() implementation - it can be simplified |
| 219 | + } else { |
| 220 | + throw new IllegalArgumentException("Cannot serialize value of type " + value.getClass() + " for aggregate function: " + column.getAggregateFunction()); |
| 221 | + } |
217 | 222 | } else { |
218 | 223 | throw new UnsupportedOperationException("Unsupported aggregate function: " + column.getAggregateFunction()); |
219 | 224 | } |
@@ -527,4 +532,34 @@ public Class<?> defineClass(String name, byte[] code) throws ClassNotFoundExcept |
527 | 532 | return super.defineClass(name, code, 0, code.length); |
528 | 533 | } |
529 | 534 | } |
| 535 | + |
| 536 | + public static void writeVarInt(OutputStream output, long value) throws IOException { |
| 537 | + // reference code https://github.com/ClickHouse/ClickHouse/blob/abe314feecd1647d7c2b952a25da7abf5c19f352/src/IO/VarInt.h#L187 |
| 538 | + for (int i = 0; i < 9; i++) { |
| 539 | + byte b = (byte) (value & 0x7F); |
| 540 | + |
| 541 | + if (value > 0x7F) { |
| 542 | + b |= 0x80; |
| 543 | + } |
| 544 | + |
| 545 | + output.write(b); |
| 546 | + value >>= 7; |
| 547 | + |
| 548 | + if (value == 0) { |
| 549 | + return; |
| 550 | + } |
| 551 | + } |
| 552 | + } |
| 553 | + |
| 554 | + public static void writeNull(OutputStream output) throws IOException { |
| 555 | + writeBoolean(output, true); |
| 556 | + } |
| 557 | + |
| 558 | + public static void writeNonNull(OutputStream output) throws IOException { |
| 559 | + writeBoolean(output, false); |
| 560 | + } |
| 561 | + |
| 562 | + public static void writeBoolean(OutputStream output, boolean value) throws IOException { |
| 563 | + output.write(value ? 1 : 0); |
| 564 | + } |
530 | 565 | } |
0 commit comments