Skip to content

Commit f7c061b

Browse files
authored
Merge pull request #47 from ClickHouse/add-unsupported-types
Add unsupported types - FixedString, Date/Date32/DateTime/DateTime64, Uint8/16/32/64/128/256, Decimal, UUID
2 parents f66fdc7 + 94e5460 commit f7c061b

File tree

6 files changed

+322
-84
lines changed

6 files changed

+322
-84
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
## 0.1.1
2+
* Add support in FixedString, Date/Date32/DateTime/DateTime64, Uint8/16/32/64/128/256, Decimal, UUID
13
## 0.1.0
24
* ClickHouse Sink supports Apache Flink 1.17+

README.md

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -132,45 +132,47 @@ Planned for a future release — a complete end-to-end example will be added onc
132132

133133
## Supported ClickHouse Types
134134

135-
| Java Type | ClickHouse Type | Supported | Serialize Method |
136-
|-----------------|-----------------|-----------|-------------------------|
137-
| byte/Byte | Int8 || Serialize.writeInt8 |
138-
| short/Short | Int16 || Serialize.writeInt16 |
139-
| int/Integer | Int32 || Serialize.writeInt32 |
140-
| long/Long | Int64 || Serialize.writeInt64 |
141-
| BigInteger | Int128 || Serialize.writeInt124 |
142-
| BigInteger | Int256 || Serialize.writeInt256 |
143-
| byte/Byte | UInt8 || N/A |
144-
| short/Short | UInt16 || N/A |
145-
| int/Integer | UInt32 || N/A |
146-
| long/Long | UInt64 || N/A |
147-
| BigInteger | UInt128 || N/A |
148-
| BigInteger | UInt256 || N/A |
149-
| BigDecimal | Decimal || N/A |
150-
| BigDecimal | Decimal32 || N/A |
151-
| BigDecimal | Decimal64 || N/A |
152-
| BigDecimal | Decimal128 || N/A |
153-
| BigDecimal | Decimal256 || N/A |
154-
| float/Float | Float || Serialize.writeFloat32 |
155-
| double/Double | Double || Serialize.writeFloat64 |
156-
| boolean/Boolean | Boolean || Serialize.writeBoolean |
157-
| String | String || Serialize.writeString |
158-
| String | FixedString || N/A |
159-
| LocalDate | Date || N/A |
160-
| LocalDate | Date32 || N/A |
161-
| LocalDateTime | DateTime || N/A |
162-
| LocalDateTime | DateTime64 || N/A |
163-
| int/Integer | Time || N/A |
164-
| long/Long | Time64 || N/A |
165-
| byte/Byte | Enum8 || Serialize.writeInt8 |
166-
| int/Integer | Enum16 || Serialize.writeInt16 |
167-
| String | JSON || N/A |
168-
| Array<Type> | Array<Type> || N/A |
169-
| Map<K,V> | Map<K,V> || N/A |
170-
| Tuple<Type,..> | Map<T1,T2,..> || N/A |
171-
| Object | Variant || N/A |
172-
173-
* For date operation need to provide ZoneId.
135+
| Java Type | ClickHouse Type | Supported | Serialize Method |
136+
|-----------------|-----------------|-----------|----------------------------|
137+
| byte/Byte | Int8 || Serialize.writeInt8 |
138+
| short/Short | Int16 || Serialize.writeInt16 |
139+
| int/Integer | Int32 || Serialize.writeInt32 |
140+
| long/Long | Int64 || Serialize.writeInt64 |
141+
| BigInteger | Int128 || Serialize.writeInt124 |
142+
| BigInteger | Int256 || Serialize.writeInt256 |
143+
| int/Integer | UInt8 || Serialize.writeUInt8 |
144+
| int/Integer | UInt16 || Serialize.writeUInt16 |
145+
| long/Long | UInt32 || Serialize.writeUInt32 |
146+
| long/Long | UInt64 || Serialize.writeUInt64 |
147+
| BigInteger | UInt128 || Serialize.writeUInt128 |
148+
| BigInteger | UInt256 || Serialize.writeUInt256 |
149+
| BigDecimal | Decimal || Serialize.writeDecimal |
150+
| BigDecimal | Decimal32 || Serialize.writeDecimal |
151+
| BigDecimal | Decimal64 || Serialize.writeDecimal |
152+
| BigDecimal | Decimal128 || Serialize.writeDecimal |
153+
| BigDecimal | Decimal256 || Serialize.writeDecimal |
154+
| float/Float | Float || Serialize.writeFloat32 |
155+
| double/Double | Double || Serialize.writeFloat64 |
156+
| boolean/Boolean | Boolean || Serialize.writeBoolean |
157+
| String | String || Serialize.writeString |
158+
| String | FixedString || Serialize.writeFixedString |
159+
| LocalDate | Date || Serialize.writeDate |
160+
| LocalDate | Date32 || Serialize.writeDate32 |
161+
| LocalDateTime | DateTime || Serialize.writeDateTime |
162+
| LocalDateTime | DateTime64 || Serialize.writeDateTime64 |
163+
| int/Integer | Time || N/A |
164+
| long/Long | Time64 || N/A |
165+
| byte/Byte | Enum8 || Serialize.writeInt8 |
166+
| int/Integer | Enum16 || Serialize.writeInt16 |
167+
| java.util.UUID | UUID || Serialize.writeIntUUID |
168+
| String | JSON || N/A |
169+
| Array<Type> | Array<Type> || N/A |
170+
| Map<K,V> | Map<K,V> || N/A |
171+
| Tuple<Type,..> | Map<T1,T2,..> || N/A |
172+
| Object | Variant || N/A |
173+
174+
* A ZoneId must also be provided when performing date operations.
175+
* Precision and scale must also be provided when performing decimal operations.
174176

175177
## Configuration Options
176178

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,56 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4646

4747
static final int STREAM_PARALLELISM = 5;
4848

49+
private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
50+
String createTable = createSimplePOJOTableSQL(database, tableName);
51+
return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert);
52+
}
53+
54+
private String createSimplePOJOTableSQL(String database, String tableName) {
55+
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
56+
"bytePrimitive Int8," +
57+
"byteObject Int8," +
58+
"shortPrimitive Int16," +
59+
"shortObject Int16," +
60+
"intPrimitive Int32," +
61+
"integerObject Int32," +
62+
"longPrimitive Int64," +
63+
"longObject Int64," +
64+
"bigInteger128 Int128," +
65+
"bigInteger256 Int256," +
66+
"uint8Primitive UInt8," +
67+
"uint8Object UInt8," +
68+
"uint16Primitive UInt16," +
69+
"uint16Object UInt16," +
70+
"uint32Primitive UInt32," +
71+
"uint32Object UInt32," +
72+
"uint64Primitive UInt64," +
73+
"uint64Object UInt64," +
74+
"uint128Object UInt128," +
75+
"uint256Object UInt256," +
76+
"decimal Decimal(10,5)," +
77+
"decimal32 Decimal32(9)," +
78+
"decimal64 Decimal64(18)," +
79+
"decimal128 Decimal128(38)," +
80+
"decimal256 Decimal256(76)," +
81+
"floatPrimitive Float," +
82+
"floatObject Float," +
83+
"doublePrimitive Double," +
84+
"doubleObject Double," +
85+
"booleanPrimitive Boolean," +
86+
"booleanObject Boolean," +
87+
"str String," +
88+
"fixedStr FixedString(10)," +
89+
"v_date Date," +
90+
"v_date32 Date32," +
91+
"v_dateTime DateTime," +
92+
"v_dateTime64 DateTime64," +
93+
"uuid UUID," +
94+
") " +
95+
"ENGINE = MergeTree " +
96+
"ORDER BY (longPrimitive); ";
97+
}
98+
4999
private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception {
50100
JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource");
51101
int rows = 0;
@@ -197,27 +247,7 @@ void SimplePOJODataTest() throws Exception {
197247
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
198248
ClickHouseServerForTests.executeSql(dropTable);
199249
// create table
200-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
201-
"bytePrimitive Int8," +
202-
"byteObject Int8," +
203-
"shortPrimitive Int16," +
204-
"shortObject Int16," +
205-
"intPrimitive Int32," +
206-
"integerObject Int32," +
207-
"longPrimitive Int64," +
208-
"longObject Int64," +
209-
"bigInteger128 Int128," +
210-
"bigInteger256 Int256," +
211-
"floatPrimitive Float," +
212-
"floatObject Float," +
213-
"doublePrimitive Double," +
214-
"doubleObject Double," +
215-
"booleanPrimitive Boolean," +
216-
"booleanObject Boolean," +
217-
"str String," +
218-
") " +
219-
"ENGINE = MergeTree " +
220-
"ORDER BY (longPrimitive); ";
250+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName);
221251
ClickHouseServerForTests.executeSql(tableSql);
222252

223253

@@ -465,28 +495,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
465495
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
466496
ClickHouseServerForTests.executeSql(dropTable);
467497
// create table
468-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
469-
"bytePrimitive Int8," +
470-
"byteObject Int8," +
471-
"shortPrimitive Int16," +
472-
"shortObject Int16," +
473-
"intPrimitive Int32," +
474-
"integerObject Int32," +
475-
"longPrimitive Int64," +
476-
"longObject Int64," +
477-
"bigInteger128 Int128," +
478-
"bigInteger256 Int256," +
479-
"floatPrimitive Float," +
480-
"floatObject Float," +
481-
"doublePrimitive Double," +
482-
"doubleObject Double," +
483-
"booleanPrimitive Boolean," +
484-
"booleanObject Boolean," +
485-
"str String," +
486-
") " +
487-
"ENGINE = MergeTree " +
488-
"ORDER BY (longPrimitive) " +
489-
"SETTINGS parts_to_throw_insert = 10;";
498+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10);
490499
ClickHouseServerForTests.executeSql(tableSql);
491500
//ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName));
492501

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,28 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
2626
Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
2727
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");
2828

29+
// UIntX
30+
Serialize.writeUInt8(out, input.getUint8Primitive(), false, false, ClickHouseDataType.UInt8, false, "uint8Primitive");
31+
Serialize.writeUInt8(out, input.getUint8Object(), false, false, ClickHouseDataType.UInt8, false, "uint8Object");
32+
33+
Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive");
34+
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object");
35+
36+
Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive");
37+
Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object");
38+
39+
Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive");
40+
Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object");
41+
42+
Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
43+
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");
44+
45+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal, false, "decimal", 10, 5);
46+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1);
47+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10);
48+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19);
49+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39);
50+
2951
Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
3052
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");
3153

@@ -35,7 +57,14 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
3557
Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
3658
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");
3759

38-
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "String");
60+
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str");
61+
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, "fixedStr", 10);
62+
63+
Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date");
64+
Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32");
65+
Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime");
66+
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
3967

68+
Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");
4069
}
4170
}

0 commit comments

Comments
 (0)