Skip to content

Commit 3ef0858

Browse files
authored
Merge pull request #56 from ClickHouse/add-support-array-map-tuple
Add support array map tuple
2 parents 8e414f0 + 35c0c69 commit 3ef0858

File tree

10 files changed

+357
-37
lines changed

10 files changed

+357
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## 0.1.1
2+
* Add support for Array, Map, Tuple #57
23
* Add support in FixedString, Date/Date32/DateTime/DateTime64, Uint8/16/32/64/128/256, Decimal, UUID
34
## 0.1.0
45
* ClickHouse Sink supports Apache Flink 1.17+

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ Planned for a future release — a complete end-to-end example will be added onc
167167
| int/Integer | Enum16 || Serialize.writeInt16 |
168168
| java.util.UUID | UUID || Serialize.writeIntUUID |
169169
| String | JSON || N/A |
170-
| Array<Type> | Array<Type> | | N/A |
171-
| Map<K,V> | Map<K,V> | | N/A |
172-
| Tuple<Type,..> | Map<T1,T2,..> | | N/A |
170+
| Array<Type> | Array<Type> | | Serialize.writeArray |
171+
| Map<K,V> | Map<K,V> | | Serialize.writeMap |
172+
| Tuple<Type,..> | Tuple<T1,T2,..> | | Serialize.writeTuple |
173173
| Object | Variant || N/A |
174174

175175
* A ZoneId must also be provided when performing date operations.

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ private String createSimplePOJOTableSQL(String database, String tableName) {
9191
"v_dateTime DateTime," +
9292
"v_dateTime64 DateTime64," +
9393
"uuid UUID," +
94+
"stringList Array(String)," +
95+
"longList Array(Int64)," +
96+
"mapOfStrings Map(String,String)," +
97+
"tupleOfObjects Tuple(String,Int64,Boolean)," +
9498
") " +
9599
"ENGINE = MergeTree " +
96100
"ORDER BY (longPrimitive); ";
@@ -283,6 +287,7 @@ void SimplePOJODataTest() throws Exception {
283287
simplePOJOs.sinkTo(simplePOJOSink);
284288
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
285289
Assertions.assertEquals(EXPECTED_ROWS, rows);
290+
// ClickHouseServerForTests.showData(tableName);
286291
}
287292

288293
@Test
@@ -532,6 +537,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
532537
simplePOJOs.sinkTo(simplePOJOSink);
533538
int rows = executeAsyncJob(env, tableName, 100, EXPECTED_ROWS);
534539
Assertions.assertEquals(EXPECTED_ROWS, rows);
540+
// ClickHouseServerForTests.showData("simple_too_many_parts_pojo");
535541
//ClickHouseServerForTests.executeSql(String.format("SYSTEM START MERGES `%s.%s`", getDatabase(), tableName));
536542
}
537543

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.apache.flink.connector.clickhouse.sink.convertor;
22

3+
import com.clickhouse.data.ClickHouseColumn;
34
import com.clickhouse.data.ClickHouseDataType;
45
import com.clickhouse.utils.Serialize;
56
import org.apache.flink.connector.clickhouse.convertor.POJOConvertor;
@@ -66,5 +67,14 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
6667
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
6768

6869
Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");
70+
71+
Serialize.writeArray(out, input.getStringList(), ClickHouseColumn.of("stringList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
72+
73+
Serialize.writeArray(out, input.getLongList(), ClickHouseColumn.of("longList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.Int64.toString())));
74+
75+
Serialize.writeMap(out, input.getMapOfStrings(), ClickHouseColumn.of("mapOfStrings", ClickHouseDataType.Map, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
76+
77+
Serialize.writeTuple(out, input.getTupleOfObjects(), ClickHouseColumn.of("tupleOfObjects", ClickHouseDataType.Tuple, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.Int64.toString()), ClickHouseColumn.of("", ClickHouseDataType.Bool.toString()) ));
6978
}
79+
7080
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.math.BigInteger;
55
import java.time.LocalDate;
66
import java.time.LocalDateTime;
7-
import java.util.UUID;
7+
import java.util.*;
88

99
public class SimplePOJO {
1010

@@ -64,6 +64,13 @@ public class SimplePOJO {
6464

6565
private UUID uuid;
6666

67+
private List<String> stringList;
68+
private List<Long> longList;
69+
70+
private Map<String, String> mapOfStrings;
71+
72+
private List<Object> tupleOfObjects;
73+
6774
public SimplePOJO(int index) {
6875
this.bytePrimitive = Byte.MIN_VALUE;
6976
this.byteObject = Byte.MAX_VALUE;
@@ -119,6 +126,27 @@ public SimplePOJO(int index) {
119126
this.dateTime64 = LocalDateTime.now();
120127

121128
this.uuid = UUID.randomUUID();
129+
130+
this.stringList = new ArrayList<>();
131+
this.stringList.add("a");
132+
this.stringList.add("b");
133+
this.stringList.add("c");
134+
this.stringList.add("d");
135+
136+
this.longList = new ArrayList<>();
137+
this.longList.add(1L);
138+
this.longList.add(2L);
139+
this.longList.add(3L);
140+
this.longList.add(4L);
141+
142+
this.mapOfStrings = new HashMap<>();
143+
this.mapOfStrings.put("a", "a");
144+
this.mapOfStrings.put("b", "b");
145+
146+
this.tupleOfObjects = new ArrayList<>();
147+
this.tupleOfObjects.add("test");
148+
this.tupleOfObjects.add(1L);
149+
this.tupleOfObjects.add(true);
122150
}
123151

124152
public byte getBytePrimitive() {
@@ -221,4 +249,11 @@ public Double getDoubleObject() {
221249

222250
public UUID getUuid() { return uuid; }
223251

252+
public List<String> getStringList() { return stringList; }
253+
254+
public List<Long> getLongList() { return longList; }
255+
256+
public Map<String, String> getMapOfStrings() { return mapOfStrings; }
257+
258+
public List<Object> getTupleOfObjects() { return tupleOfObjects; }
224259
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,18 @@ public static void executeSql(String sql) throws ExecutionException, Interrupted
102102
}
103103
}
104104

105+
public static void showData(String tableName) throws ExecutionException, InterruptedException {
106+
String showDataSql = String.format("select * from '%s'", tableName);
107+
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
108+
List<GenericRecord> content = client.queryAll(showDataSql);
109+
for (GenericRecord record : content) {
110+
System.out.println();
111+
for (int i = 0; i< record.getSchema().getColumns().toArray().length; i++) {
112+
System.out.print(" | " + record.getObject(i +1));
113+
}
114+
}
115+
}
116+
105117
public static int countParts(String table) {
106118
String countPartsSql = String.format("SELECT count(*) FROM system.parts WHERE table = '%s' and active = 1", table);
107119
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,60 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4646

4747
static final int STREAM_PARALLELISM = 5;
4848

49+
private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
50+
String createTable = createSimplePOJOTableSQL(database, tableName);
51+
return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert);
52+
}
53+
54+
private String createSimplePOJOTableSQL(String database, String tableName) {
55+
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
56+
"bytePrimitive Int8," +
57+
"byteObject Int8," +
58+
"shortPrimitive Int16," +
59+
"shortObject Int16," +
60+
"intPrimitive Int32," +
61+
"integerObject Int32," +
62+
"longPrimitive Int64," +
63+
"longObject Int64," +
64+
"bigInteger128 Int128," +
65+
"bigInteger256 Int256," +
66+
"uint8Primitive UInt8," +
67+
"uint8Object UInt8," +
68+
"uint16Primitive UInt16," +
69+
"uint16Object UInt16," +
70+
"uint32Primitive UInt32," +
71+
"uint32Object UInt32," +
72+
"uint64Primitive UInt64," +
73+
"uint64Object UInt64," +
74+
"uint128Object UInt128," +
75+
"uint256Object UInt256," +
76+
"decimal Decimal(10,5)," +
77+
"decimal32 Decimal32(9)," +
78+
"decimal64 Decimal64(18)," +
79+
"decimal128 Decimal128(38)," +
80+
"decimal256 Decimal256(76)," +
81+
"floatPrimitive Float," +
82+
"floatObject Float," +
83+
"doublePrimitive Double," +
84+
"doubleObject Double," +
85+
"booleanPrimitive Boolean," +
86+
"booleanObject Boolean," +
87+
"str String," +
88+
"fixedStr FixedString(10)," +
89+
"v_date Date," +
90+
"v_date32 Date32," +
91+
"v_dateTime DateTime," +
92+
"v_dateTime64 DateTime64," +
93+
"uuid UUID," +
94+
"stringList Array(String)," +
95+
"longList Array(Int64)," +
96+
"mapOfStrings Map(String,String)," +
97+
"tupleOfObjects Tuple(String,Int64,Boolean)," +
98+
") " +
99+
"ENGINE = MergeTree " +
100+
"ORDER BY (longPrimitive); ";
101+
}
102+
49103
private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception {
50104
JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource");
51105
int rows = 0;
@@ -197,22 +251,7 @@ void SimplePOJODataTest() throws Exception {
197251
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
198252
ClickHouseServerForTests.executeSql(dropTable);
199253
// create table
200-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
201-
"bytePrimitive Int8," +
202-
"byteObject Int8," +
203-
"shortPrimitive Int16," +
204-
"shortObject Int16," +
205-
"intPrimitive Int32," +
206-
"integerObject Int32," +
207-
"longPrimitive Int64," +
208-
"longObject Int64," +
209-
"floatPrimitive Float," +
210-
"floatObject Float," +
211-
"doublePrimitive Double," +
212-
"doubleObject Double," +
213-
") " +
214-
"ENGINE = MergeTree " +
215-
"ORDER BY (longPrimitive); ";
254+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName);
216255
ClickHouseServerForTests.executeSql(tableSql);
217256

218257

@@ -458,23 +497,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
458497
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
459498
ClickHouseServerForTests.executeSql(dropTable);
460499
// create table
461-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
462-
"bytePrimitive Int8," +
463-
"byteObject Int8," +
464-
"shortPrimitive Int16," +
465-
"shortObject Int16," +
466-
"intPrimitive Int32," +
467-
"integerObject Int32," +
468-
"longPrimitive Int64," +
469-
"longObject Int64," +
470-
"floatPrimitive Float," +
471-
"floatObject Float," +
472-
"doublePrimitive Double," +
473-
"doubleObject Double," +
474-
") " +
475-
"ENGINE = MergeTree " +
476-
"ORDER BY (longPrimitive) " +
477-
"SETTINGS parts_to_throw_insert = 10;";
500+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10);
478501
ClickHouseServerForTests.executeSql(tableSql);
479502
//ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName));
480503

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.apache.flink.connector.clickhouse.sink.convertor;
22

3+
import com.clickhouse.data.ClickHouseColumn;
34
import com.clickhouse.data.ClickHouseDataType;
45
import com.clickhouse.utils.Serialize;
56
import org.apache.flink.connector.clickhouse.convertor.POJOConvertor;
@@ -23,10 +24,57 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
2324
Serialize.writeInt64(out, input.getLongPrimitive(), false, false, ClickHouseDataType.Int64, false, "longPrimitive");
2425
Serialize.writeInt64(out, input.getLongObject(), false, false, ClickHouseDataType.Int64, false, "longObject");
2526

27+
Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
28+
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");
29+
30+
// UIntX
31+
Serialize.writeUInt8(out, input.getUint8Primitive(), false, false, ClickHouseDataType.UInt8, false, "uint8Primitive");
32+
Serialize.writeUInt8(out, input.getUint8Object(), false, false, ClickHouseDataType.UInt8, false, "uint8Object");
33+
34+
Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive");
35+
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object");
36+
37+
Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive");
38+
Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object");
39+
40+
Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive");
41+
Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object");
42+
43+
Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
44+
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");
45+
46+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal, false, "decimal", 10, 5);
47+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1);
48+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10);
49+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19);
50+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39);
51+
2652
Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
2753
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");
2854

2955
Serialize.writeFloat64(out, input.getDoublePrimitive(), false, false, ClickHouseDataType.Float64, false, "doublePrimitive");
3056
Serialize.writeFloat64(out, input.getDoubleObject(), false, false, ClickHouseDataType.Float64, false, "doubleObject");
57+
58+
Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
59+
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");
60+
61+
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str");
62+
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, "fixedStr", 10);
63+
64+
Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date");
65+
Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32");
66+
Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime");
67+
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
68+
69+
Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");
70+
71+
Serialize.writeArray(out, input.getStringList(), ClickHouseColumn.of("stringList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
72+
73+
Serialize.writeArray(out, input.getLongList(), ClickHouseColumn.of("longList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.Int64.toString())));
74+
75+
Serialize.writeMap(out, input.getMapOfStrings(), ClickHouseColumn.of("mapOfStrings", ClickHouseDataType.Map, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
76+
77+
Serialize.writeTuple(out, input.getTupleOfObjects(), ClickHouseColumn.of("tupleOfObjects", ClickHouseDataType.Tuple, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.Int64.toString()), ClickHouseColumn.of("", ClickHouseDataType.Bool.toString()) ));
3178
}
79+
3280
}

0 commit comments

Comments
 (0)