Skip to content

Commit d625225

Browse files
committed
Add support in uint8/16/32/64/128/256
1 parent 2f36900 commit d625225

File tree

5 files changed

+159
-37
lines changed

5 files changed

+159
-37
lines changed

README.md

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -132,43 +132,43 @@ Planned for a future release — a complete end-to-end example will be added onc
132132

133133
## Supported ClickHouse Types
134134

135-
| Java Type | ClickHouse Type | Supported | Serialize Method |
136-
|-----------------|-----------------|-----------|-------------------------|
137-
| byte/Byte | Int8 || Serialize.writeInt8 |
138-
| short/Short | Int16 || Serialize.writeInt16 |
139-
| int/Integer | Int32 || Serialize.writeInt32 |
140-
| long/Long | Int64 || Serialize.writeInt64 |
141-
| BigInteger | Int128 || Serialize.writeInt124 |
142-
| BigInteger | Int256 || Serialize.writeInt256 |
143-
| byte/Byte | UInt8 | | N/A |
144-
| short/Short | UInt16 | | N/A |
145-
| int/Integer | UInt32 | | N/A |
146-
| long/Long | UInt64 | | N/A |
147-
| BigInteger | UInt128 | | N/A |
148-
| BigInteger | UInt256 | | N/A |
149-
| BigDecimal | Decimal || N/A |
150-
| BigDecimal | Decimal32 || N/A |
151-
| BigDecimal | Decimal64 || N/A |
152-
| BigDecimal | Decimal128 || N/A |
153-
| BigDecimal | Decimal256 || N/A |
154-
| float/Float | Float || Serialize.writeFloat32 |
155-
| double/Double | Double || Serialize.writeFloat64 |
156-
| boolean/Boolean | Boolean || Serialize.writeBoolean |
157-
| String | String || Serialize.writeString |
158-
| String | FixedString | | N/A |
159-
| LocalDate | Date | | N/A |
160-
| LocalDate | Date32 | | N/A |
161-
| LocalDateTime | DateTime | | N/A |
162-
| LocalDateTime | DateTime64 | | N/A |
163-
| int/Integer | Time || N/A |
164-
| long/Long | Time64 || N/A |
165-
| byte/Byte | Enum8 || Serialize.writeInt8 |
166-
| int/Integer | Enum16 || Serialize.writeInt16 |
167-
| String | JSON || N/A |
168-
| Array<Type> | Array<Type> || N/A |
169-
| Map<K,V> | Map<K,V> || N/A |
170-
| Tuple<Type,..> | Map<T1,T2,..> || N/A |
171-
| Object | Variant || N/A |
135+
| Java Type | ClickHouse Type | Supported | Serialize Method |
136+
|-----------------|-----------------|-----------|----------------------------|
137+
| byte/Byte | Int8 || Serialize.writeInt8 |
138+
| short/Short | Int16 || Serialize.writeInt16 |
139+
| int/Integer | Int32 || Serialize.writeInt32 |
140+
| long/Long | Int64 || Serialize.writeInt64 |
141+
| BigInteger | Int128 || Serialize.writeInt124 |
142+
| BigInteger | Int256 || Serialize.writeInt256 |
143+
| int/Integer | UInt8 | | Serialize.writeUInt8 |
144+
| int/Integer | UInt16 | | Serialize.writeUInt16 |
145+
| long/Long | UInt32 | | Serialize.writeUInt32 |
146+
| long/Long | UInt64 | | Serialize.writeUInt64 |
147+
| BigInteger | UInt128 | | Serialize.writeUInt128 |
148+
| BigInteger | UInt256 | | Serialize.writeUInt256 |
149+
| BigDecimal | Decimal || N/A |
150+
| BigDecimal | Decimal32 || N/A |
151+
| BigDecimal | Decimal64 || N/A |
152+
| BigDecimal | Decimal128 || N/A |
153+
| BigDecimal | Decimal256 || N/A |
154+
| float/Float | Float || Serialize.writeFloat32 |
155+
| double/Double | Double || Serialize.writeFloat64 |
156+
| boolean/Boolean | Boolean || Serialize.writeBoolean |
157+
| String | String || Serialize.writeString |
158+
| String | FixedString | | Serialize.writeFixedString |
159+
| LocalDate | Date | | Serialize.writeDate |
160+
| LocalDate | Date32 | | Serialize.writeDate32 |
161+
| LocalDateTime | DateTime | | Serialize.writeDateTime |
162+
| LocalDateTime | DateTime64 | | Serialize.writeDateTime64 |
163+
| int/Integer | Time || N/A |
164+
| long/Long | Time64 || N/A |
165+
| byte/Byte | Enum8 || Serialize.writeInt8 |
166+
| int/Integer | Enum16 || Serialize.writeInt16 |
167+
| String | JSON || N/A |
168+
| Array<Type> | Array<Type> || N/A |
169+
| Map<K,V> | Map<K,V> || N/A |
170+
| Tuple<Type,..> | Map<T1,T2,..> || N/A |
171+
| Object | Variant || N/A |
172172

173173
* For date operation need to provide ZoneId.
174174

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,16 @@ void SimplePOJODataTest() throws Exception {
208208
"longObject Int64," +
209209
"bigInteger128 Int128," +
210210
"bigInteger256 Int256," +
211+
"uint8Primitive UInt8," +
212+
"uint8Object UInt8," +
213+
"uint16Primitive UInt16," +
214+
"uint16Object UInt16," +
215+
"uint32Primitive UInt32," +
216+
"uint32Object UInt32," +
217+
"uint64Primitive UInt64," +
218+
"uint64Object UInt64," +
219+
"uint128Object UInt128," +
220+
"uint256Object UInt256," +
211221
"floatPrimitive Float," +
212222
"floatObject Float," +
213223
"doublePrimitive Double," +
@@ -481,6 +491,16 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
481491
"longObject Int64," +
482492
"bigInteger128 Int128," +
483493
"bigInteger256 Int256," +
494+
"uint8Primitive UInt8," +
495+
"uint8Object UInt8," +
496+
"uint16Primitive UInt16," +
497+
"uint16Object UInt16," +
498+
"uint32Primitive UInt32," +
499+
"uint32Object UInt32," +
500+
"uint64Primitive UInt64," +
501+
"uint64Object UInt64," +
502+
"uint128Object UInt128," +
503+
"uint256Object UInt256," +
484504
"floatPrimitive Float," +
485505
"floatObject Float," +
486506
"doublePrimitive Double," +

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,22 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
2626
Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
2727
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");
2828

29+
// UIntX
30+
Serialize.writeUInt8(out, input.getUint8Primitive(), false, false, ClickHouseDataType.UInt8, false, "uint8Primitive");
31+
Serialize.writeUInt8(out, input.getUint8Object(), false, false, ClickHouseDataType.UInt8, false, "uint8Object");
32+
33+
Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive");
34+
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object");
35+
36+
Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive");
37+
Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object");
38+
39+
Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive");
40+
Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object");
41+
42+
Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
43+
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");
44+
2945
Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
3046
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");
3147

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,21 @@ public class SimplePOJO {
1818
private long longPrimitive;
1919
private Long longObject;
2020

21+
private int uint8Primitive;
22+
private Integer uint8Object;
23+
24+
private int uint16Primitive;
25+
private Integer uint16Object;
26+
27+
private long uint32Primitive;
28+
private Long uint32Object;
29+
30+
private long uint64Primitive;
31+
private Long uint64Object;
32+
33+
private BigInteger uint128Object;
34+
private BigInteger uint256Object;
35+
2136
private float floatPrimitive;
2237
private Float floatObject;
2338

@@ -52,6 +67,21 @@ public SimplePOJO(int index) {
5267
this.longPrimitive = index;
5368
this.longObject = Long.MAX_VALUE;
5469

70+
this.uint8Primitive = Byte.MAX_VALUE;
71+
this.uint8Object = (int)Byte.MAX_VALUE;
72+
73+
this.uint16Primitive = Short.MAX_VALUE;
74+
this.uint16Object = (int)Short.MAX_VALUE;
75+
76+
this.uint32Primitive = Integer.MAX_VALUE;
77+
this.uint32Object = (long)Integer.MAX_VALUE;
78+
79+
this.uint64Primitive = Long.MAX_VALUE;
80+
this.uint64Object = Long.MAX_VALUE;
81+
82+
this.uint128Object = BigInteger.valueOf(index);
83+
this.uint256Object = BigInteger.valueOf(index);
84+
5585
this.floatPrimitive = Float.MIN_VALUE;
5686
this.floatObject = Float.MAX_VALUE;
5787

@@ -106,6 +136,26 @@ public Long getLongObject() {
106136
return longObject;
107137
}
108138

139+
public int getUint8Primitive() { return uint8Primitive; }
140+
141+
public Integer getUint8Object() { return uint8Object; }
142+
143+
public int getUint16Primitive() { return uint16Primitive; }
144+
145+
public Integer getUint16Object() { return uint16Object; }
146+
147+
public long getUint32Primitive() { return uint32Primitive; }
148+
149+
public Long getUint32Object() { return uint32Object; }
150+
151+
public long getUint64Primitive() { return uint64Primitive; }
152+
153+
public Long getUint64Object() { return uint64Object; }
154+
155+
public BigInteger getUint128Object() { return uint128Object; }
156+
157+
public BigInteger getUint256Object() { return uint256Object; }
158+
109159
public float getFloatPrimitive() {
110160
return floatPrimitive;
111161
}

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,42 @@ public static void writeInt256(OutputStream out, BigInteger value, boolean defau
222222
}
223223
}
224224

225+
public static void writeUInt8(OutputStream out, int value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
226+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
227+
BinaryStreamUtils.writeUnsignedInt8(out, value);
228+
}
229+
}
230+
231+
public static void writeUInt16(OutputStream out, int value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
232+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
233+
BinaryStreamUtils.writeUnsignedInt16(out, value);
234+
}
235+
}
236+
237+
public static void writeUInt32(OutputStream out, long value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
238+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
239+
BinaryStreamUtils.writeUnsignedInt32(out, value);
240+
}
241+
}
242+
243+
public static void writeUInt64(OutputStream out, long value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
244+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
245+
BinaryStreamUtils.writeUnsignedInt64(out, value);
246+
}
247+
}
248+
249+
public static void writeUInt128(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
250+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
251+
BinaryStreamUtils.writeUnsignedInt128(out, value);
252+
}
253+
}
254+
255+
public static void writeUInt256(OutputStream out, BigInteger value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
256+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
257+
BinaryStreamUtils.writeUnsignedInt256(out, value);
258+
}
259+
}
260+
225261
// Float32
226262
public static void writeFloat32(OutputStream out, Float value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
227263
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {

0 commit comments

Comments
 (0)