Skip to content

Commit 8670fd4

Browse files
committed
Ading new types to flink 2+ tests
1 parent f456910 commit 8670fd4

File tree

3 files changed

+273
-33
lines changed

3 files changed

+273
-33
lines changed

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,60 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4646

4747
static final int STREAM_PARALLELISM = 5;
4848

49+
private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
50+
String createTable = createSimplePOJOTableSQL(database, tableName);
51+
return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert);
52+
}
53+
54+
private String createSimplePOJOTableSQL(String database, String tableName) {
55+
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
56+
"bytePrimitive Int8," +
57+
"byteObject Int8," +
58+
"shortPrimitive Int16," +
59+
"shortObject Int16," +
60+
"intPrimitive Int32," +
61+
"integerObject Int32," +
62+
"longPrimitive Int64," +
63+
"longObject Int64," +
64+
"bigInteger128 Int128," +
65+
"bigInteger256 Int256," +
66+
"uint8Primitive UInt8," +
67+
"uint8Object UInt8," +
68+
"uint16Primitive UInt16," +
69+
"uint16Object UInt16," +
70+
"uint32Primitive UInt32," +
71+
"uint32Object UInt32," +
72+
"uint64Primitive UInt64," +
73+
"uint64Object UInt64," +
74+
"uint128Object UInt128," +
75+
"uint256Object UInt256," +
76+
"decimal Decimal(10,5)," +
77+
"decimal32 Decimal32(9)," +
78+
"decimal64 Decimal64(18)," +
79+
"decimal128 Decimal128(38)," +
80+
"decimal256 Decimal256(76)," +
81+
"floatPrimitive Float," +
82+
"floatObject Float," +
83+
"doublePrimitive Double," +
84+
"doubleObject Double," +
85+
"booleanPrimitive Boolean," +
86+
"booleanObject Boolean," +
87+
"str String," +
88+
"fixedStr FixedString(10)," +
89+
"v_date Date," +
90+
"v_date32 Date32," +
91+
"v_dateTime DateTime," +
92+
"v_dateTime64 DateTime64," +
93+
"uuid UUID," +
94+
"stringList Array(String)," +
95+
"longList Array(Int64)," +
96+
"mapOfStrings Map(String,String)," +
97+
"tupleOfObjects Tuple(String,Int64,Boolean)," +
98+
") " +
99+
"ENGINE = MergeTree " +
100+
"ORDER BY (longPrimitive); ";
101+
}
102+
49103
private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception {
50104
JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource");
51105
int rows = 0;
@@ -197,22 +251,7 @@ void SimplePOJODataTest() throws Exception {
197251
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
198252
ClickHouseServerForTests.executeSql(dropTable);
199253
// create table
200-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
201-
"bytePrimitive Int8," +
202-
"byteObject Int8," +
203-
"shortPrimitive Int16," +
204-
"shortObject Int16," +
205-
"intPrimitive Int32," +
206-
"integerObject Int32," +
207-
"longPrimitive Int64," +
208-
"longObject Int64," +
209-
"floatPrimitive Float," +
210-
"floatObject Float," +
211-
"doublePrimitive Double," +
212-
"doubleObject Double," +
213-
") " +
214-
"ENGINE = MergeTree " +
215-
"ORDER BY (longPrimitive); ";
254+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName);
216255
ClickHouseServerForTests.executeSql(tableSql);
217256

218257

@@ -458,23 +497,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
458497
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
459498
ClickHouseServerForTests.executeSql(dropTable);
460499
// create table
461-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
462-
"bytePrimitive Int8," +
463-
"byteObject Int8," +
464-
"shortPrimitive Int16," +
465-
"shortObject Int16," +
466-
"intPrimitive Int32," +
467-
"integerObject Int32," +
468-
"longPrimitive Int64," +
469-
"longObject Int64," +
470-
"floatPrimitive Float," +
471-
"floatObject Float," +
472-
"doublePrimitive Double," +
473-
"doubleObject Double," +
474-
") " +
475-
"ENGINE = MergeTree " +
476-
"ORDER BY (longPrimitive) " +
477-
"SETTINGS parts_to_throw_insert = 10;";
500+
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10);
478501
ClickHouseServerForTests.executeSql(tableSql);
479502
//ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName));
480503

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.apache.flink.connector.clickhouse.sink.convertor;
22

3+
import com.clickhouse.data.ClickHouseColumn;
34
import com.clickhouse.data.ClickHouseDataType;
45
import com.clickhouse.utils.Serialize;
56
import org.apache.flink.connector.clickhouse.convertor.POJOConvertor;
@@ -23,10 +24,57 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
2324
Serialize.writeInt64(out, input.getLongPrimitive(), false, false, ClickHouseDataType.Int64, false, "longPrimitive");
2425
Serialize.writeInt64(out, input.getLongObject(), false, false, ClickHouseDataType.Int64, false, "longObject");
2526

27+
Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
28+
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");
29+
30+
// UIntX
31+
Serialize.writeUInt8(out, input.getUint8Primitive(), false, false, ClickHouseDataType.UInt8, false, "uint8Primitive");
32+
Serialize.writeUInt8(out, input.getUint8Object(), false, false, ClickHouseDataType.UInt8, false, "uint8Object");
33+
34+
Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive");
35+
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object");
36+
37+
Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive");
38+
Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object");
39+
40+
Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive");
41+
Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object");
42+
43+
Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
44+
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");
45+
46+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal, false, "decimal", 10, 5);
47+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1);
48+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10);
49+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19);
50+
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39);
51+
2652
Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
2753
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");
2854

2955
Serialize.writeFloat64(out, input.getDoublePrimitive(), false, false, ClickHouseDataType.Float64, false, "doublePrimitive");
3056
Serialize.writeFloat64(out, input.getDoubleObject(), false, false, ClickHouseDataType.Float64, false, "doubleObject");
57+
58+
Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
59+
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");
60+
61+
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str");
62+
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, "fixedStr", 10);
63+
64+
Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date");
65+
Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32");
66+
Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime");
67+
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
68+
69+
Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");
70+
71+
Serialize.writeArray(out, input.getStringList(), ClickHouseColumn.of("stringList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
72+
73+
Serialize.writeArray(out, input.getLongList(), ClickHouseColumn.of("longList", ClickHouseDataType.Array, false, ClickHouseColumn.of("", ClickHouseDataType.Int64.toString())));
74+
75+
Serialize.writeMap(out, input.getMapOfStrings(), ClickHouseColumn.of("mapOfStrings", ClickHouseDataType.Map, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.String.toString())));
76+
77+
Serialize.writeTuple(out, input.getTupleOfObjects(), ClickHouseColumn.of("tupleOfObjects", ClickHouseDataType.Tuple, false, ClickHouseColumn.of("", ClickHouseDataType.String.toString()), ClickHouseColumn.of("", ClickHouseDataType.Int64.toString()), ClickHouseColumn.of("", ClickHouseDataType.Bool.toString()) ));
3178
}
79+
3280
}

flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/SimplePOJO.java

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package org.apache.flink.connector.clickhouse.sink.pojo;
22

3+
import java.math.BigDecimal;
4+
import java.math.BigInteger;
5+
import java.time.LocalDate;
6+
import java.time.LocalDateTime;
7+
import java.util.*;
8+
39
public class SimplePOJO {
410

511
private byte bytePrimitive;
@@ -14,12 +20,57 @@ public class SimplePOJO {
1420
private long longPrimitive;
1521
private Long longObject;
1622

23+
private int uint8Primitive;
24+
private Integer uint8Object;
25+
26+
private int uint16Primitive;
27+
private Integer uint16Object;
28+
29+
private long uint32Primitive;
30+
private Long uint32Object;
31+
32+
private long uint64Primitive;
33+
private Long uint64Object;
34+
35+
private BigInteger uint128Object;
36+
private BigInteger uint256Object;
37+
38+
private BigDecimal bigDecimal;
39+
private BigDecimal bigDecimal32;
40+
private BigDecimal bigDecimal64;
41+
private BigDecimal bigDecimal128;
42+
private BigDecimal bigDecimal256;
43+
1744
private float floatPrimitive;
1845
private Float floatObject;
1946

2047
private double doublePrimitive;
2148
private Double doubleObject;
2249

50+
private boolean booleanPrimitive;
51+
private Boolean booleanObject;
52+
53+
private String str;
54+
55+
private String fixedStr;
56+
57+
private BigInteger bigInteger128;
58+
private BigInteger bigInteger256;
59+
60+
private LocalDate date;
61+
private LocalDate date32;
62+
private LocalDateTime dateTime;
63+
private LocalDateTime dateTime64;
64+
65+
private UUID uuid;
66+
67+
private List<String> stringList;
68+
private List<Long> longList;
69+
70+
private Map<String, String> mapOfStrings;
71+
72+
private List<Object> tupleOfObjects;
73+
2374
public SimplePOJO(int index) {
2475
this.bytePrimitive = Byte.MIN_VALUE;
2576
this.byteObject = Byte.MAX_VALUE;
@@ -33,11 +84,69 @@ public SimplePOJO(int index) {
3384
this.longPrimitive = index;
3485
this.longObject = Long.MAX_VALUE;
3586

87+
this.uint8Primitive = Byte.MAX_VALUE;
88+
this.uint8Object = (int)Byte.MAX_VALUE;
89+
90+
this.uint16Primitive = Short.MAX_VALUE;
91+
this.uint16Object = (int)Short.MAX_VALUE;
92+
93+
this.uint32Primitive = Integer.MAX_VALUE;
94+
this.uint32Object = (long)Integer.MAX_VALUE;
95+
96+
this.uint64Primitive = Long.MAX_VALUE;
97+
this.uint64Object = Long.MAX_VALUE;
98+
99+
this.uint128Object = BigInteger.valueOf(index);
100+
this.uint256Object = BigInteger.valueOf(index);
101+
102+
this.bigDecimal = new BigDecimal(index);
103+
this.bigDecimal32 = new BigDecimal(index);
104+
this.bigDecimal64 = new BigDecimal(index);
105+
this.bigDecimal128 = new BigDecimal(index);
106+
this.bigDecimal256 = new BigDecimal(index);
107+
36108
this.floatPrimitive = Float.MIN_VALUE;
37109
this.floatObject = Float.MAX_VALUE;
38110

39111
this.doublePrimitive = Double.MIN_VALUE;
40112
this.doubleObject = Double.MAX_VALUE;
113+
114+
this.booleanPrimitive = true;
115+
this.booleanObject = Boolean.FALSE;
116+
117+
this.str = "str" + longPrimitive;
118+
this.fixedStr = (str + "_FixedString").substring(0, 10);
119+
120+
this.bigInteger128 = BigInteger.valueOf(longPrimitive);
121+
this.bigInteger256 = BigInteger.valueOf(longPrimitive);
122+
123+
this.date = LocalDate.ofEpochDay(0);
124+
this.date32 = LocalDate.ofEpochDay(0);
125+
this.dateTime = LocalDateTime.now();
126+
this.dateTime64 = LocalDateTime.now();
127+
128+
this.uuid = UUID.randomUUID();
129+
130+
this.stringList = new ArrayList<>();
131+
this.stringList.add("a");
132+
this.stringList.add("b");
133+
this.stringList.add("c");
134+
this.stringList.add("d");
135+
136+
this.longList = new ArrayList<>();
137+
this.longList.add(1L);
138+
this.longList.add(2L);
139+
this.longList.add(3L);
140+
this.longList.add(4L);
141+
142+
this.mapOfStrings = new HashMap<>();
143+
this.mapOfStrings.put("a", "a");
144+
this.mapOfStrings.put("b", "b");
145+
146+
this.tupleOfObjects = new ArrayList<>();
147+
this.tupleOfObjects.add("test");
148+
this.tupleOfObjects.add(1L);
149+
this.tupleOfObjects.add(true);
41150
}
42151

43152
public byte getBytePrimitive() {
@@ -72,6 +181,36 @@ public Long getLongObject() {
72181
return longObject;
73182
}
74183

184+
public int getUint8Primitive() { return uint8Primitive; }
185+
186+
public Integer getUint8Object() { return uint8Object; }
187+
188+
public int getUint16Primitive() { return uint16Primitive; }
189+
190+
public Integer getUint16Object() { return uint16Object; }
191+
192+
public long getUint32Primitive() { return uint32Primitive; }
193+
194+
public Long getUint32Object() { return uint32Object; }
195+
196+
public long getUint64Primitive() { return uint64Primitive; }
197+
198+
public Long getUint64Object() { return uint64Object; }
199+
200+
public BigInteger getUint128Object() { return uint128Object; }
201+
202+
public BigInteger getUint256Object() { return uint256Object; }
203+
204+
public BigDecimal getBigDecimal() { return bigDecimal; }
205+
206+
public BigDecimal getBigDecimal32() { return bigDecimal32; }
207+
208+
public BigDecimal getBigDecimal64() { return bigDecimal64; }
209+
210+
public BigDecimal getBigDecimal128() { return bigDecimal128; }
211+
212+
public BigDecimal getBigDecimal256() { return bigDecimal256; }
213+
75214
public float getFloatPrimitive() {
76215
return floatPrimitive;
77216
}
@@ -87,4 +226,34 @@ public double getDoublePrimitive() {
87226
public Double getDoubleObject() {
88227
return doubleObject;
89228
}
229+
230+
public boolean isBooleanPrimitive() { return booleanPrimitive; }
231+
232+
public Boolean getBooleanObject() { return booleanObject; }
233+
234+
public String getStr() { return str; }
235+
236+
public BigInteger getBigInteger128() { return bigInteger128; }
237+
238+
public BigInteger getBigInteger256() { return bigInteger256; }
239+
240+
public String getFixedStr() { return fixedStr; }
241+
242+
public LocalDate getDate() { return date; }
243+
244+
public LocalDate getDate32() { return date32; }
245+
246+
public LocalDateTime getDateTime() { return dateTime; }
247+
248+
public LocalDateTime getDateTime64() { return dateTime64; }
249+
250+
public UUID getUuid() { return uuid; }
251+
252+
public List<String> getStringList() { return stringList; }
253+
254+
public List<Long> getLongList() { return longList; }
255+
256+
public Map<String, String> getMapOfStrings() { return mapOfStrings; }
257+
258+
public List<Object> getTupleOfObjects() { return tupleOfObjects; }
90259
}

0 commit comments

Comments
 (0)