Skip to content

Commit c5b4e24

Browse files
committed
Add Array support
1 parent 8e414f0 commit c5b4e24

File tree

5 files changed

+47
-0
lines changed

5 files changed

+47
-0
lines changed

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ private String createSimplePOJOTableSQL(String database, String tableName) {
9191
"v_dateTime DateTime," +
9292
"v_dateTime64 DateTime64," +
9393
"uuid UUID," +
94+
"stringList Array(String)," +
95+
"longList Array(Int64)," +
9496
") " +
9597
"ENGINE = MergeTree " +
9698
"ORDER BY (longPrimitive); ";
@@ -532,6 +534,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
532534
simplePOJOs.sinkTo(simplePOJOSink);
533535
int rows = executeAsyncJob(env, tableName, 100, EXPECTED_ROWS);
534536
Assertions.assertEquals(EXPECTED_ROWS, rows);
537+
ClickHouseServerForTests.showData("simple_too_many_parts_pojo");
535538
//ClickHouseServerForTests.executeSql(String.format("SYSTEM START MERGES `%s.%s`", getDatabase(), tableName));
536539
}
537540

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.apache.flink.connector.clickhouse.sink.convertor;
22

3+
import com.clickhouse.data.ClickHouseColumn;
34
import com.clickhouse.data.ClickHouseDataType;
45
import com.clickhouse.utils.Serialize;
56
import org.apache.flink.connector.clickhouse.convertor.POJOConvertor;
@@ -66,5 +67,9 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
6667
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
6768

6869
Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");
70+
71+
Serialize.writeArray(out, input.getStringList(), ClickHouseColumn.of("stringList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
72+
73+
Serialize.writeArray(out, input.getLongList(), ClickHouseColumn.of("longList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.Int64.toString())));
6974
}
7075
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import java.math.BigInteger;
55
import java.time.LocalDate;
66
import java.time.LocalDateTime;
7+
import java.util.ArrayList;
8+
import java.util.List;
79
import java.util.UUID;
810

911
public class SimplePOJO {
@@ -64,6 +66,9 @@ public class SimplePOJO {
6466

6567
private UUID uuid;
6668

69+
private List<String> stringList;
70+
private List<Long> longList;
71+
6772
public SimplePOJO(int index) {
6873
this.bytePrimitive = Byte.MIN_VALUE;
6974
this.byteObject = Byte.MAX_VALUE;
@@ -119,6 +124,18 @@ public SimplePOJO(int index) {
119124
this.dateTime64 = LocalDateTime.now();
120125

121126
this.uuid = UUID.randomUUID();
127+
128+
this.stringList = new ArrayList<>();
129+
this.stringList.add("a");
130+
this.stringList.add("b");
131+
this.stringList.add("c");
132+
this.stringList.add("d");
133+
134+
this.longList = new ArrayList<>();
135+
this.longList.add(1L);
136+
this.longList.add(2L);
137+
this.longList.add(3L);
138+
this.longList.add(4L);
122139
}
123140

124141
public byte getBytePrimitive() {
@@ -221,4 +238,8 @@ public Double getDoubleObject() {
221238

222239
public UUID getUuid() { return uuid; }
223240

241+
public List<String> getStringList() { return stringList; }
242+
243+
public List<Long> getLongList() { return longList; }
244+
224245
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,18 @@ public static void executeSql(String sql) throws ExecutionException, Interrupted
102102
}
103103
}
104104

105+
public static void showData(String tableName) throws ExecutionException, InterruptedException {
106+
String showDataSql = String.format("select * from '%s'", tableName);
107+
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
108+
List<GenericRecord> content = client.queryAll(showDataSql);
109+
for (GenericRecord record : content) {
110+
System.out.println();
111+
for (int i = 0; i< record.getSchema().getColumns().toArray().length; i++) {
112+
System.out.print(" | " + record.getObject(i +1));
113+
}
114+
}
115+
}
116+
105117
public static int countParts(String table) {
106118
String countPartsSql = String.format("SELECT count(*) FROM system.parts WHERE table = '%s' and active = 1", table);
107119
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.clickhouse.utils;
22

33
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
4+
import com.clickhouse.data.ClickHouseColumn;
45
import com.clickhouse.data.ClickHouseDataType;
56
import com.clickhouse.data.format.BinaryStreamUtils;
67
import org.slf4j.Logger;
@@ -294,4 +295,9 @@ public static void writeUUID(OutputStream out, UUID value, boolean defaultsSuppo
294295
}
295296
}
296297

298+
// Array
299+
public static void writeArray(OutputStream out, Object value, ClickHouseColumn column) throws IOException {
300+
SerializerUtils.serializeArrayData(out, value, column);
301+
}
302+
297303
}

0 commit comments

Comments
 (0)