diff --git a/CHANGELOG.md b/CHANGELOG.md index 0072b54..e811c2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1,4 @@ +## 0.1.1 +* Add support in FixedString, Date/Date32/DateTime/DateTime64, Uint8/16/32/64/128/256, Decimal, UUID ## 0.1.0 * ClickHouse Sink supports Apache Flink 1.17+ \ No newline at end of file diff --git a/README.md b/README.md index 3e88d89..c170df8 100644 --- a/README.md +++ b/README.md @@ -132,45 +132,47 @@ Planned for a future release — a complete end-to-end example will be added onc ## Supported ClickHouse Types -| Java Type | ClickHouse Type | Supported | Serialize Method | -|-----------------|-----------------|-----------|-------------------------| -| byte/Byte | Int8 | ✅ | Serialize.writeInt8 | -| short/Short | Int16 | ✅ | Serialize.writeInt16 | -| int/Integer | Int32 | ✅ | Serialize.writeInt32 | -| long/Long | Int64 | ✅ | Serialize.writeInt64 | -| BigInteger | Int128 | ✅ | Serialize.writeInt124 | -| BigInteger | Int256 | ✅ | Serialize.writeInt256 | -| byte/Byte | UInt8 | ❌ | N/A | -| short/Short | UInt16 | ❌ | N/A | -| int/Integer | UInt32 | ❌ | N/A | -| long/Long | UInt64 | ❌ | N/A | -| BigInteger | UInt128 | ❌ | N/A | -| BigInteger | UInt256 | ❌ | N/A | -| BigDecimal | Decimal | ❌ | N/A | -| BigDecimal | Decimal32 | ❌ | N/A | -| BigDecimal | Decimal64 | ❌ | N/A | -| BigDecimal | Decimal128 | ❌ | N/A | -| BigDecimal | Decimal256 | ❌ | N/A | -| float/Float | Float | ✅ | Serialize.writeFloat32 | -| double/Double | Double | ✅ | Serialize.writeFloat64 | -| boolean/Boolean | Boolean | ✅ | Serialize.writeBoolean | -| String | String | ✅ | Serialize.writeString | -| String | FixedString | ❌ | N/A | -| LocalDate | Date | ❌ | N/A | -| LocalDate | Date32 | ❌ | N/A | -| LocalDateTime | DateTime | ❌ | N/A | -| LocalDateTime | DateTime64 | ❌ | N/A | -| int/Integer | Time | ❌ | N/A | -| long/Long | Time64 | ❌ | N/A | -| byte/Byte | Enum8 | ✅ | Serialize.writeInt8 | -| int/Integer | Enum16 | ✅ | Serialize.writeInt16 | -| String | JSON | ❌ | N/A | -| Array | Array | ❌ | N/A | -| Map | Map | ❌ | N/A | -| Tuple | Map | ❌ | N/A | -| Object | Variant | ❌ | N/A | - -* For date operation need to provide ZoneId. +| Java Type | ClickHouse Type | Supported | Serialize Method | +|-----------------|-----------------|-----------|----------------------------| +| byte/Byte | Int8 | ✅ | Serialize.writeInt8 | +| short/Short | Int16 | ✅ | Serialize.writeInt16 | +| int/Integer | Int32 | ✅ | Serialize.writeInt32 | +| long/Long | Int64 | ✅ | Serialize.writeInt64 | +| BigInteger | Int128 | ✅ | Serialize.writeInt124 | +| BigInteger | Int256 | ✅ | Serialize.writeInt256 | +| int/Integer | UInt8 | ✅ | Serialize.writeUInt8 | +| int/Integer | UInt16 | ✅ | Serialize.writeUInt16 | +| long/Long | UInt32 | ✅ | Serialize.writeUInt32 | +| long/Long | UInt64 | ✅ | Serialize.writeUInt64 | +| BigInteger | UInt128 | ✅ | Serialize.writeUInt128 | +| BigInteger | UInt256 | ✅ | Serialize.writeUInt256 | +| BigDecimal | Decimal | ✅ | Serialize.writeDecimal | +| BigDecimal | Decimal32 | ✅ | Serialize.writeDecimal | +| BigDecimal | Decimal64 | ✅ | Serialize.writeDecimal | +| BigDecimal | Decimal128 | ✅ | Serialize.writeDecimal | +| BigDecimal | Decimal256 | ✅ | Serialize.writeDecimal | +| float/Float | Float | ✅ | Serialize.writeFloat32 | +| double/Double | Double | ✅ | Serialize.writeFloat64 | +| boolean/Boolean | Boolean | ✅ | Serialize.writeBoolean | +| String | String | ✅ | Serialize.writeString | +| String | FixedString | ✅ | Serialize.writeFixedString | +| LocalDate | Date | ✅ | Serialize.writeDate | +| LocalDate | Date32 | ✅ | Serialize.writeDate32 | +| LocalDateTime | DateTime | ✅ | Serialize.writeDateTime | +| LocalDateTime | DateTime64 | ✅ | Serialize.writeDateTime64 | +| int/Integer | Time | ❌ | N/A | +| long/Long | Time64 | ❌ | N/A | +| byte/Byte | Enum8 | ✅ | Serialize.writeInt8 | +| int/Integer | Enum16 | ✅ | Serialize.writeInt16 | +| java.util.UUID | UUID | ✅ | Serialize.writeIntUUID | +| String | JSON | ❌ | N/A | +| Array | Array | ❌ | N/A | +| Map | Map | ❌ | N/A | +| Tuple | Map | ❌ | N/A | +| Object | Variant | ❌ | N/A | + +* A ZoneId must also be provided when performing date operations. +* Precision and scale must also be provided when performing decimal operations. ## Configuration Options diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 2873279..e53f660 100644 --- a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -46,6 +46,56 @@ public class ClickHouseSinkTests extends FlinkClusterTests { static final int STREAM_PARALLELISM = 5; + private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) { + String createTable = createSimplePOJOTableSQL(database, tableName); + return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert); + } + + private String createSimplePOJOTableSQL(String database, String tableName) { + return "CREATE TABLE `" + database + "`.`" + tableName + "` (" + + "bytePrimitive Int8," + + "byteObject Int8," + + "shortPrimitive Int16," + + "shortObject Int16," + + "intPrimitive Int32," + + "integerObject Int32," + + "longPrimitive Int64," + + "longObject Int64," + + "bigInteger128 Int128," + + "bigInteger256 Int256," + + "uint8Primitive UInt8," + + "uint8Object UInt8," + + "uint16Primitive UInt16," + + "uint16Object UInt16," + + "uint32Primitive UInt32," + + "uint32Object UInt32," + + "uint64Primitive UInt64," + + "uint64Object UInt64," + + "uint128Object UInt128," + + "uint256Object UInt256," + + "decimal Decimal(10,5)," + + "decimal32 Decimal32(9)," + + "decimal64 Decimal64(18)," + + "decimal128 Decimal128(38)," + + "decimal256 Decimal256(76)," + + "floatPrimitive Float," + + "floatObject Float," + + "doublePrimitive Double," + + "doubleObject Double," + + "booleanPrimitive Boolean," + + "booleanObject Boolean," + + "str String," + + "fixedStr FixedString(10)," + + "v_date Date," + + "v_date32 Date32," + + "v_dateTime DateTime," + + "v_dateTime64 DateTime64," + + "uuid UUID," + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (longPrimitive); "; + } + private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception { JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource"); int rows = 0; @@ -197,27 +247,7 @@ void SimplePOJODataTest() throws Exception { String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); ClickHouseServerForTests.executeSql(dropTable); // create table - String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + - "bytePrimitive Int8," + - "byteObject Int8," + - "shortPrimitive Int16," + - "shortObject Int16," + - "intPrimitive Int32," + - "integerObject Int32," + - "longPrimitive Int64," + - "longObject Int64," + - "bigInteger128 Int128," + - "bigInteger256 Int256," + - "floatPrimitive Float," + - "floatObject Float," + - "doublePrimitive Double," + - "doubleObject Double," + - "booleanPrimitive Boolean," + - "booleanObject Boolean," + - "str String," + - ") " + - "ENGINE = MergeTree " + - "ORDER BY (longPrimitive); "; + String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName); ClickHouseServerForTests.executeSql(tableSql); @@ -465,28 +495,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception { String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); ClickHouseServerForTests.executeSql(dropTable); // create table - String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + - "bytePrimitive Int8," + - "byteObject Int8," + - "shortPrimitive Int16," + - "shortObject Int16," + - "intPrimitive Int32," + - "integerObject Int32," + - "longPrimitive Int64," + - "longObject Int64," + - "bigInteger128 Int128," + - "bigInteger256 Int256," + - "floatPrimitive Float," + - "floatObject Float," + - "doublePrimitive Double," + - "doubleObject Double," + - "booleanPrimitive Boolean," + - "booleanObject Boolean," + - "str String," + - ") " + - "ENGINE = MergeTree " + - "ORDER BY (longPrimitive) " + - "SETTINGS parts_to_throw_insert = 10;"; + String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10); ClickHouseServerForTests.executeSql(tableSql); //ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName)); diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java index 80d6a3d..cea9544 100644 --- a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java +++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java @@ -26,6 +26,28 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException { Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128"); Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256"); + // UIntX + Serialize.writeUInt8(out, input.getUint8Primitive(), false, false, ClickHouseDataType.UInt8, false, "uint8Primitive"); + Serialize.writeUInt8(out, input.getUint8Object(), false, false, ClickHouseDataType.UInt8, false, "uint8Object"); + + Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive"); + Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object"); + + Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive"); + Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object"); + + Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive"); + Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object"); + + Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128"); + Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256"); + + Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal, false, "decimal", 10, 5); + Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1); + Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10); + Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19); + Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39); + Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive"); Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject"); @@ -35,7 +57,14 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException { Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive"); Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject"); - Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "String"); + Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str"); + Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, "fixedStr", 10); + + Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date"); + Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32"); + Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime"); + Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1); + Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid"); } } diff --git a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java index 1f50509..a79940d 100644 --- a/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java +++ b/flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java @@ -1,6 +1,10 @@ package org.apache.flink.connector.clickhouse.sink.pojo; +import java.math.BigDecimal; import java.math.BigInteger; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.UUID; public class SimplePOJO { @@ -16,6 +20,27 @@ public class SimplePOJO { private long longPrimitive; private Long longObject; + private int uint8Primitive; + private Integer uint8Object; + + private int uint16Primitive; + private Integer uint16Object; + + private long uint32Primitive; + private Long uint32Object; + + private long uint64Primitive; + private Long uint64Object; + + private BigInteger uint128Object; + private BigInteger uint256Object; + + private BigDecimal bigDecimal; + private BigDecimal bigDecimal32; + private BigDecimal bigDecimal64; + private BigDecimal bigDecimal128; + private BigDecimal bigDecimal256; + private float floatPrimitive; private Float floatObject; @@ -27,9 +52,18 @@ public class SimplePOJO { private String str; + private String fixedStr; + private BigInteger bigInteger128; private BigInteger bigInteger256; + private LocalDate date; + private LocalDate date32; + private LocalDateTime dateTime; + private LocalDateTime dateTime64; + + private UUID uuid; + public SimplePOJO(int index) { this.bytePrimitive = Byte.MIN_VALUE; this.byteObject = Byte.MAX_VALUE; @@ -43,6 +77,27 @@ public SimplePOJO(int index) { this.longPrimitive = index; this.longObject = Long.MAX_VALUE; + this.uint8Primitive = Byte.MAX_VALUE; + this.uint8Object = (int)Byte.MAX_VALUE; + + this.uint16Primitive = Short.MAX_VALUE; + this.uint16Object = (int)Short.MAX_VALUE; + + this.uint32Primitive = Integer.MAX_VALUE; + this.uint32Object = (long)Integer.MAX_VALUE; + + this.uint64Primitive = Long.MAX_VALUE; + this.uint64Object = Long.MAX_VALUE; + + this.uint128Object = BigInteger.valueOf(index); + this.uint256Object = BigInteger.valueOf(index); + + this.bigDecimal = new BigDecimal(index); + this.bigDecimal32 = new BigDecimal(index); + this.bigDecimal64 = new BigDecimal(index); + this.bigDecimal128 = new BigDecimal(index); + this.bigDecimal256 = new BigDecimal(index); + this.floatPrimitive = Float.MIN_VALUE; this.floatObject = Float.MAX_VALUE; @@ -53,9 +108,17 @@ public SimplePOJO(int index) { this.booleanObject = Boolean.FALSE; this.str = "str" + longPrimitive; + this.fixedStr = (str + "_FixedString").substring(0, 10); this.bigInteger128 = BigInteger.valueOf(longPrimitive); this.bigInteger256 = BigInteger.valueOf(longPrimitive); + + this.date = LocalDate.ofEpochDay(0); + this.date32 = LocalDate.ofEpochDay(0); + this.dateTime = LocalDateTime.now(); + this.dateTime64 = LocalDateTime.now(); + + this.uuid = UUID.randomUUID(); } public byte getBytePrimitive() { @@ -90,6 +153,36 @@ public Long getLongObject() { return longObject; } + public int getUint8Primitive() { return uint8Primitive; } + + public Integer getUint8Object() { return uint8Object; } + + public int getUint16Primitive() { return uint16Primitive; } + + public Integer getUint16Object() { return uint16Object; } + + public long getUint32Primitive() { return uint32Primitive; } + + public Long getUint32Object() { return uint32Object; } + + public long getUint64Primitive() { return uint64Primitive; } + + public Long getUint64Object() { return uint64Object; } + + public BigInteger getUint128Object() { return uint128Object; } + + public BigInteger getUint256Object() { return uint256Object; } + + public BigDecimal getBigDecimal() { return bigDecimal; } + + public BigDecimal getBigDecimal32() { return bigDecimal32; } + + public BigDecimal getBigDecimal64() { return bigDecimal64; } + + public BigDecimal getBigDecimal128() { return bigDecimal128; } + + public BigDecimal getBigDecimal256() { return bigDecimal256; } + public float getFloatPrimitive() { return floatPrimitive; } @@ -115,4 +208,17 @@ public Double getDoubleObject() { public BigInteger getBigInteger128() { return bigInteger128; } public BigInteger getBigInteger256() { return bigInteger256; } + + public String getFixedStr() { return fixedStr; } + + public LocalDate getDate() { return date; } + + public LocalDate getDate32() { return date32; } + + public LocalDateTime getDateTime() { return dateTime; } + + public LocalDateTime getDateTime64() { return dateTime64; } + + public UUID getUuid() { return uuid; } + } diff --git a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java index 8a809cd..e3983f5 100644 --- a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java +++ b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java @@ -10,12 +10,15 @@ import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Method; +import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.HashMap; import java.util.Map; +import java.util.UUID; public class Serialize { private static final Logger LOG = LoggerFactory.getLogger(Serialize.class); @@ -128,6 +131,43 @@ public static void writeDate(OutputStream out, ZonedDateTime value, boolean defa } } + public static void writeDate32(OutputStream out, LocalDate value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + SerializerUtils.writeDate32(out, value, ZoneId.of("UTC")); // TODO: check + } + } + + public static void writeDate32(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + SerializerUtils.writeDate32(out, value, ZoneId.of("UTC")); // TODO: check + } + } + + // Support for DateTime section + public static void writeTimeDate(OutputStream out, LocalDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + SerializerUtils.writeDateTime(out, value, ZoneId.of("UTC")); // TODO: check + } + } + + public static void writeTimeDate(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + SerializerUtils.writeDateTime(out, value, ZoneId.of("UTC")); // TODO: check + } + } + + public static void writeTimeDate64(OutputStream out, LocalDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int scale) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + SerializerUtils.writeDateTime64(out, value, scale, ZoneId.of("UTC")); // TODO: check + } + } + + public static void writeTimeDate64(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int scale) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + SerializerUtils.writeDateTime64(out, value, scale, ZoneId.of("UTC")); // TODO: check + } + } + // clickhouse type String support public static void writeString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { @@ -135,7 +175,8 @@ public static void writeString(OutputStream out, String value, boolean defaultsS } } - public static void writeFixedString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, int size, String column) throws IOException { + // Add a boundary check before inserting + public static void writeFixedString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int size) throws IOException { if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { BinaryStreamUtils.writeFixedString(out, convertToString(value), size); } @@ -183,6 +224,48 @@ public static void writeInt256(OutputStream out, BigInteger value, boolean defau } } + public static void writeUInt8(OutputStream out, int value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeUnsignedInt8(out, value); + } + } + + public static void writeUInt16(OutputStream out, int value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeUnsignedInt16(out, value); + } + } + + public static void writeUInt32(OutputStream out, long value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeUnsignedInt32(out, value); + } + } + + public static void writeUInt64(OutputStream out, long value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeUnsignedInt64(out, value); + } + } + + public static void writeUInt128(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeUnsignedInt128(out, value); + } + } + + public static void writeUInt256(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeUnsignedInt256(out, value); + } + } + // Decimal + public static void writeDecimal(OutputStream out, BigDecimal value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, int precision, int scale) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeDecimal(out, value, precision, scale); + } + } + // Float32 public static void writeFloat32(OutputStream out, Float value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { @@ -204,4 +287,11 @@ public static void writeBoolean(OutputStream out, Boolean value, boolean default } } + // UUID + public static void writeUUID(OutputStream out, UUID value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException { + if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) { + BinaryStreamUtils.writeUuid(out, value); + } + } + }