|
2 | 2 |
|
3 | 3 | import com.clickhouse.client.api.Client; |
4 | 4 | import com.clickhouse.client.api.ClientException; |
5 | | -import com.clickhouse.client.api.data_formats.RowBinaryFormatSerializer; |
6 | | -import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter; |
7 | 5 | import com.clickhouse.client.api.query.POJOSetter; |
8 | 6 | import com.clickhouse.data.ClickHouseAggregateFunction; |
9 | 7 | import com.clickhouse.data.ClickHouseColumn; |
|
30 | 28 | import java.net.Inet4Address; |
31 | 29 | import java.net.Inet6Address; |
32 | 30 | import java.sql.Timestamp; |
| 31 | +import java.time.Instant; |
33 | 32 | import java.time.LocalDate; |
34 | 33 | import java.time.LocalDateTime; |
35 | 34 | import java.time.ZoneId; |
36 | 35 | import java.time.ZonedDateTime; |
37 | 36 | import java.util.Arrays; |
| 37 | +import java.util.Collections; |
| 38 | +import java.util.HashMap; |
38 | 39 | import java.util.HashSet; |
39 | 40 | import java.util.List; |
40 | 41 | import java.util.Map; |
41 | 42 | import java.util.Objects; |
42 | 43 | import java.util.Set; |
43 | 44 | import java.util.StringTokenizer; |
| 45 | +import java.util.TimeZone; |
44 | 46 | import java.util.UUID; |
45 | 47 | import java.util.stream.Collectors; |
46 | 48 |
|
@@ -90,14 +92,219 @@ public static void serializeData(OutputStream stream, Object value, ClickHouseCo |
90 | 92 | value = value instanceof ClickHouseGeoMultiPolygonValue ? ((ClickHouseGeoMultiPolygonValue)value).getValue() : value; |
91 | 93 | serializeArrayData(stream, value, GEO_MULTI_POLYGON_ARRAY); |
92 | 94 | break; |
| 95 | + case Dynamic: |
| 96 | + ClickHouseColumn typeColumn = valueToColumnForDynamicType(value); |
| 97 | + writeDynamicTypeTag(stream, typeColumn, value); |
| 98 | + serializeData(stream, value, typeColumn); |
| 99 | + break; |
93 | 100 | default: |
94 | 101 | serializePrimitiveData(stream, value, column); |
95 | 102 | break; |
96 | 103 |
|
97 | 104 | } |
98 | 105 | } |
99 | 106 |
|
100 | | - private static void serializeArrayData(OutputStream stream, Object value, ClickHouseColumn column) throws IOException { |
| 107 | + private static final Map<Class<?>, ClickHouseColumn> PREDEFINED_TYPE_COLUMNS = getPredefinedTypeColumnsMap(); |
| 108 | + |
| 109 | + private static Map<Class<?>, ClickHouseColumn> getPredefinedTypeColumnsMap() { |
| 110 | + HashMap<Class<?>, ClickHouseColumn> map = new HashMap<>(); |
| 111 | + map.put(Void.class, ClickHouseColumn.of("v", "Nothing")); |
| 112 | + map.put(Boolean.class, ClickHouseColumn.of("v", "Bool")); |
| 113 | + map.put(Byte.class, ClickHouseColumn.of("v", "Int8")); |
| 114 | + map.put(Short.class, ClickHouseColumn.of("v", "Int16")); |
| 115 | + map.put(Integer.class, ClickHouseColumn.of("v", "Int32")); |
| 116 | + map.put(Long.class, ClickHouseColumn.of("v", "Int64")); |
| 117 | + map.put(BigInteger.class, ClickHouseColumn.of("v", "Int256")); |
| 118 | + map.put(Float.class, ClickHouseColumn.of("v", "Float32")); |
| 119 | + map.put(Double.class, ClickHouseColumn.of("v", "Float64")); |
| 120 | + map.put(LocalDate.class, ClickHouseColumn.of("v", "Date")); |
| 121 | + map.put(UUID.class, ClickHouseColumn.of("v", "UUID")); |
| 122 | + map.put(Inet4Address.class, ClickHouseColumn.of("v", "IPv4")); |
| 123 | + map.put(Inet6Address.class, ClickHouseColumn.of("v", "IPv6")); |
| 124 | + map.put(String.class, ClickHouseColumn.of("v", "String")); |
| 125 | + map.put(LocalDateTime.class, ClickHouseColumn.of("v", "DateTime")); |
| 126 | + |
| 127 | + map.put(boolean[].class, ClickHouseColumn.of("v", "Array(Bool)")); |
| 128 | + map.put(boolean[][].class, ClickHouseColumn.of("v", "Array(Array(Bool))")); |
| 129 | + map.put(boolean[][][].class, ClickHouseColumn.of("v", "Array(Array(Array(Bool)))")); |
| 130 | + |
| 131 | + map.put(byte[].class, ClickHouseColumn.of("v", "Array(Int8)")); |
| 132 | + map.put(byte[][].class, ClickHouseColumn.of("v", "Array(Array(Int8))")); |
| 133 | + map.put(byte[][][].class, ClickHouseColumn.of("v", "Array(Array(Array(Int8)))")); |
| 134 | + |
| 135 | + map.put(short[].class, ClickHouseColumn.of("v", "Array(Int16)")); |
| 136 | + map.put(short[][].class, ClickHouseColumn.of("v", "Array(Array(Int16))")); |
| 137 | + map.put(short[][][].class, ClickHouseColumn.of("v", "Array(Array(Array(Int16)))")); |
| 138 | + |
| 139 | + map.put(int[].class, ClickHouseColumn.of("v", "Array(Int32)")); |
| 140 | + map.put(int[][].class, ClickHouseColumn.of("v", "Array(Array(Int32))")); |
| 141 | + map.put(int[][][].class, ClickHouseColumn.of("v", "Array(Array(Array(Int32)))")); |
| 142 | + |
| 143 | + map.put(long[].class, ClickHouseColumn.of("v", "Array(Int64)")); |
| 144 | + map.put(long[][].class, ClickHouseColumn.of("v", "Array(Array(Int64))")); |
| 145 | + map.put(long[][][].class, ClickHouseColumn.of("v", "Array(Array(Array(Int64)))")); |
| 146 | + |
| 147 | + map.put(float[].class, ClickHouseColumn.of("v", "Array(Float32)")); |
| 148 | + map.put(float[][].class, ClickHouseColumn.of("v", "Array(Array(Float32))")); |
| 149 | + map.put(float[][][].class, ClickHouseColumn.of("v", "Array(Array(Array(Float32)))")); |
| 150 | + |
| 151 | + map.put(double[].class, ClickHouseColumn.of("v", "Array(Float64)")); |
| 152 | + map.put(double[][].class, ClickHouseColumn.of("v", "Array(Array(Float64))")); |
| 153 | + map.put(double[][][].class, ClickHouseColumn.of("v", "Array(Array(Array(Float64)))")); |
| 154 | + |
| 155 | + return Collections.unmodifiableMap(map); |
| 156 | + } |
| 157 | + |
| 158 | + public static ClickHouseColumn valueToColumnForDynamicType(Object value) { |
| 159 | + ClickHouseColumn column; |
| 160 | + if (value instanceof ZonedDateTime) { |
| 161 | + ZonedDateTime dt = (ZonedDateTime) value; |
| 162 | + column = ClickHouseColumn.of("v", "DateTime(" + dt.getZone().getId() + ")"); |
| 163 | + } else if (value instanceof BigDecimal) { |
| 164 | + BigDecimal d = (BigDecimal) value; |
| 165 | + column = ClickHouseColumn.of("v", "Decimal256(" + d.precision() + ", " + d.scale() + ")"); |
| 166 | + } else if (value instanceof Map<?,?>) { |
| 167 | + Map<?, ?> map = (Map<?, ?>) value; |
| 168 | + // TODO: handle empty map? |
| 169 | + Map.Entry<?, ?> entry = map.entrySet().iterator().next(); |
| 170 | + ClickHouseColumn keyInfo = valueToColumnForDynamicType(entry.getKey()); |
| 171 | + ClickHouseColumn valueInfo = valueToColumnForDynamicType(entry.getValue()); |
| 172 | + column = ClickHouseColumn.of("v", "Map(" + keyInfo.getOriginalTypeName() + ", " + valueInfo.getOriginalTypeName() + ")"); |
| 173 | + } else if (value instanceof List<?>) { |
| 174 | + List<?> list = (List<?>) value; |
| 175 | + StringBuilder type = new StringBuilder("Array()"); |
| 176 | + int insertPos = type.length() - 2; |
| 177 | + while (!list.isEmpty() && list.get(0) instanceof List<?>) { |
| 178 | + type.insert(insertPos, "Array()"); |
| 179 | + insertPos += 6; // add len of 'Array(' string |
| 180 | + list = (List<?>) list.get(0); |
| 181 | + } |
| 182 | + if (list.isEmpty()) { |
| 183 | + type.insert(insertPos, "Nothing"); |
| 184 | + column = ClickHouseColumn.of("v", type.toString()); |
| 185 | + } else { |
| 186 | + ClickHouseColumn arrayBaseColumn = PREDEFINED_TYPE_COLUMNS.get(list.get(0)); |
| 187 | + if (arrayBaseColumn != null) { |
| 188 | + type.insert(insertPos, arrayBaseColumn.getOriginalTypeName()); |
| 189 | + column = ClickHouseColumn.of("v", type.toString()); |
| 190 | + } else { |
| 191 | + column = null; |
| 192 | + } |
| 193 | + } |
| 194 | + } else if (value == null) { |
| 195 | + column = PREDEFINED_TYPE_COLUMNS.get(Void.class); |
| 196 | + } else { |
| 197 | + column = PREDEFINED_TYPE_COLUMNS.get(value.getClass()); |
| 198 | + } |
| 199 | + |
| 200 | + if (column == null) { |
| 201 | + throw new ClientException("Unable to serialize value of " + value.getClass() + " because not supported yet"); |
| 202 | + } |
| 203 | + |
| 204 | + return column; |
| 205 | + } |
| 206 | + |
| 207 | + public static void writeDynamicTypeTag(OutputStream stream, ClickHouseColumn typeColumn, Object value) |
| 208 | + throws IOException { |
| 209 | + |
| 210 | + ClickHouseDataType dt = typeColumn.getDataType(); |
| 211 | + byte binTag = dt.getBinTag(); |
| 212 | + if (binTag == -1) { |
| 213 | + throw new ClientException("Type " + dt.name() +" serialization is not supported for Dynamic column"); |
| 214 | + } |
| 215 | + |
| 216 | + if (typeColumn.isNullable()) { |
| 217 | + stream.write(ClickHouseDataType.NULLABLE_BIN_TAG); |
| 218 | + } |
| 219 | + if (typeColumn.isLowCardinality()) { |
| 220 | + stream.write(ClickHouseDataType.LOW_CARDINALITY_BIN_TAG); |
| 221 | + } |
| 222 | + |
| 223 | + switch (dt) { |
| 224 | + case FixedString: |
| 225 | + stream.write(binTag); |
| 226 | + writeVarInt(stream, typeColumn.getEstimatedLength()); |
| 227 | + break; |
| 228 | + case Enum8: |
| 229 | + stream.write(binTag); |
| 230 | + /// 0x17<var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int8_value_1>...<var_uint_name_size_N><name_data_N><int8_value_N> |
| 231 | + break; |
| 232 | + case Enum16: |
| 233 | + stream.write(binTag); |
| 234 | + //0x18<var_uint_number_of_elements><var_uint_name_size_1><name_data_1><int16_little_endian_value_1>...><var_uint_name_size_N><name_data_N><int16_little_endian_value_N> |
| 235 | + break; |
| 236 | + case Decimal: |
| 237 | + case Decimal32: |
| 238 | + case Decimal64: |
| 239 | + case Decimal128: |
| 240 | + case Decimal256: |
| 241 | + stream.write(binTag); |
| 242 | + //<tag><uint8_precision><uint8_scale> |
| 243 | + break; |
| 244 | + |
| 245 | + case IntervalNanosecond: |
| 246 | + case IntervalMillisecond: |
| 247 | + case IntervalSecond: |
| 248 | + case IntervalMinute: |
| 249 | + case IntervalHour: |
| 250 | + case IntervalDay: |
| 251 | + case IntervalWeek: |
| 252 | + case IntervalMonth: |
| 253 | + case IntervalQuarter: |
| 254 | + case IntervalYear: |
| 255 | + stream.write(binTag); |
| 256 | + stream.write(ClickHouseDataType.IntervalKindBinTag.Day.getTag()); |
| 257 | + break; |
| 258 | + case DateTime32: |
| 259 | + stream.write(binTag); |
| 260 | + BinaryStreamUtils.writeString(stream, typeColumn.getTimeZoneOrDefault(TimeZone.getDefault()).getID()); |
| 261 | + break; |
| 262 | + case DateTime64: |
| 263 | + break; |
| 264 | + case Array: |
| 265 | + stream.write(binTag); |
| 266 | + // elements 0x1E<nested_type_encoding> |
| 267 | + break; |
| 268 | + case Map: |
| 269 | + stream.write(binTag); |
| 270 | + ///0x0F<var_uint_size><key_encoding_1><value_encoding_1>...<key_encoding_N><value_encoding_N> |
| 271 | + break; |
| 272 | + case Tuple: |
| 273 | + // Tuple(T1, ..., TN) |
| 274 | + // 0x1F<var_uint_number_of_elements><nested_type_encoding_1>...<nested_type_encoding_N> |
| 275 | + stream.write(0x1F); |
| 276 | + // or |
| 277 | + // Tuple(name1 T1, ..., nameN TN) |
| 278 | + // 0x20<var_uint_number_of_elements><var_uint_name_size_1><name_data_1><nested_type_encoding_1>...<var_uint_name_size_N><name_data_N><nested_type_encoding_N> |
| 279 | + stream.write(0x20); |
| 280 | + break; |
| 281 | + case Point: |
| 282 | + case Polygon: |
| 283 | + case Ring: |
| 284 | + case MultiPolygon: |
| 285 | + stream.write(ClickHouseDataType.CUSTOM_TYPE_BIN_TAG); |
| 286 | + break; |
| 287 | + case Variant: |
| 288 | + stream.write(binTag); |
| 289 | + break; |
| 290 | + case Dynamic: |
| 291 | + stream.write(binTag); |
| 292 | + break; |
| 293 | + case JSON: |
| 294 | + stream.write(binTag); |
| 295 | + break; |
| 296 | + case SimpleAggregateFunction: |
| 297 | + stream.write(binTag); |
| 298 | + break; |
| 299 | + case AggregateFunction: |
| 300 | + stream.write(binTag); |
| 301 | + break; |
| 302 | + default: |
| 303 | + stream.write(binTag); |
| 304 | + } |
| 305 | + } |
| 306 | + |
| 307 | + public static void serializeArrayData(OutputStream stream, Object value, ClickHouseColumn column) throws IOException { |
101 | 308 |
|
102 | 309 | if (value instanceof List<?>) { |
103 | 310 | //Serialize the array to the stream |
|
0 commit comments