Skip to content

Commit df6cc98

Browse files
committed
Add support Map & Tuple
1 parent c5b4e24 commit df6cc98

File tree

4 files changed

+35
-3
lines changed

4 files changed

+35
-3
lines changed

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ private String createSimplePOJOTableSQL(String database, String tableName) {
9393
"uuid UUID," +
9494
"stringList Array(String)," +
9595
"longList Array(Int64)," +
96+
"mapOfStrings Map(String,String)," +
97+
"tupleOfObjects Tuple(String,Int64,Boolean)," +
9698
") " +
9799
"ENGINE = MergeTree " +
98100
"ORDER BY (longPrimitive); ";
@@ -285,6 +287,7 @@ void SimplePOJODataTest() throws Exception {
285287
simplePOJOs.sinkTo(simplePOJOSink);
286288
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
287289
Assertions.assertEquals(EXPECTED_ROWS, rows);
290+
ClickHouseServerForTests.showData(tableName);
288291
}
289292

290293
@Test

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,10 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
7171
Serialize.writeArray(out, input.getStringList(), ClickHouseColumn.of("stringList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
7272

7373
Serialize.writeArray(out, input.getLongList(), ClickHouseColumn.of("longList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.Int64.toString())));
74+
75+
Serialize.writeMap(out, input.getMapOfStrings(), ClickHouseColumn.of("mapOfStrings", ClickHouseDataType.Map, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
76+
77+
Serialize.writeTuple(out, input.getTupleOfObjects(), ClickHouseColumn.of("tupleOfObjects", ClickHouseDataType.Tuple, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.Int64.toString()), ClickHouseColumn.of("", ClickHouseDataType.Bool.toString()) ));
7478
}
79+
7580
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44
import java.math.BigInteger;
55
import java.time.LocalDate;
66
import java.time.LocalDateTime;
7-
import java.util.ArrayList;
8-
import java.util.List;
9-
import java.util.UUID;
7+
import java.util.*;
108

119
public class SimplePOJO {
1210

@@ -69,6 +67,10 @@ public class SimplePOJO {
6967
private List<String> stringList;
7068
private List<Long> longList;
7169

70+
private Map<String, String> mapOfStrings;
71+
72+
private List<Object> tupleOfObjects;
73+
7274
public SimplePOJO(int index) {
7375
this.bytePrimitive = Byte.MIN_VALUE;
7476
this.byteObject = Byte.MAX_VALUE;
@@ -136,6 +138,15 @@ public SimplePOJO(int index) {
136138
this.longList.add(2L);
137139
this.longList.add(3L);
138140
this.longList.add(4L);
141+
142+
this.mapOfStrings = new HashMap<>();
143+
this.mapOfStrings.put("a", "a");
144+
this.mapOfStrings.put("b", "b");
145+
146+
this.tupleOfObjects = new ArrayList<>();
147+
this.tupleOfObjects.add("test");
148+
this.tupleOfObjects.add(1L);
149+
this.tupleOfObjects.add(true);
139150
}
140151

141152
public byte getBytePrimitive() {
@@ -242,4 +253,7 @@ public Double getDoubleObject() {
242253

243254
public List<Long> getLongList() { return longList; }
244255

256+
public Map<String, String> getMapOfStrings() { return mapOfStrings; }
257+
258+
public List<Object> getTupleOfObjects() { return tupleOfObjects; }
245259
}

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,4 +300,14 @@ public static void writeArray(OutputStream out, Object value, ClickHouseColumn c
300300
SerializerUtils.serializeArrayData(out, value, column);
301301
}
302302

303+
// Map
304+
public static void writeMap(OutputStream out, Object value, ClickHouseColumn column) throws IOException {
305+
SerializerUtils.serializeData(out, value, column);
306+
}
307+
308+
// Tuple
309+
public static void writeTuple(OutputStream out, Object value, ClickHouseColumn column) throws IOException {
310+
SerializerUtils.serializeData(out, value, column);
311+
}
312+
303313
}

0 commit comments

Comments
 (0)