Skip to content

Commit 2fe0a25

Browse files
committed
add data validation to 1.17 tests
1 parent 10e158e commit 2fe0a25

File tree

10 files changed

+719
-229
lines changed

10 files changed

+719
-229
lines changed

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTestUtils.java

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,58 +8,4 @@ public class ClickHouseSinkTestUtils {
88
public static final long MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024;
99
public static final long MAX_TIME_IN_BUFFER_MS = 5 * 1000;
1010
public static final long MAX_RECORD_SIZE_IN_BYTES = 1000;
11-
12-
public static String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
13-
String createTable = createSimplePOJOTableSQL(database, tableName);
14-
return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert);
15-
}
16-
17-
public static String createSimplePOJOTableSQL(String database, String tableName) {
18-
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
19-
"bytePrimitive Int8," +
20-
"byteObject Int8," +
21-
"shortPrimitive Int16," +
22-
"shortObject Int16," +
23-
"intPrimitive Int32," +
24-
"integerObject Int32," +
25-
"longPrimitive Int64," +
26-
"longObject Int64," +
27-
"bigInteger128 Int128," +
28-
"bigInteger256 Int256," +
29-
"uint8Primitive UInt8," +
30-
"uint8Object UInt8," +
31-
"uint16Primitive UInt16," +
32-
"uint16Object UInt16," +
33-
"uint32Primitive UInt32," +
34-
"uint32Object UInt32," +
35-
"uint64Primitive UInt64," +
36-
"uint64Object UInt64," +
37-
"uint128Object UInt128," +
38-
"uint256Object UInt256," +
39-
"decimal Decimal(10,5)," +
40-
"decimal32 Decimal32(9)," +
41-
"decimal64 Decimal64(18)," +
42-
"decimal128 Decimal128(38)," +
43-
"decimal256 Decimal256(76)," +
44-
"floatPrimitive Float," +
45-
"floatObject Float," +
46-
"doublePrimitive Double," +
47-
"doubleObject Double," +
48-
"booleanPrimitive Boolean," +
49-
"booleanObject Boolean," +
50-
"str String," +
51-
"fixedStr FixedString(10)," +
52-
"v_date Date," +
53-
"v_date32 Date32," +
54-
"v_dateTime DateTime," +
55-
"v_dateTime64 DateTime64," +
56-
"uuid UUID," +
57-
"stringList Array(String)," +
58-
"longList Array(Int64)," +
59-
"mapOfStrings Map(String,String)," +
60-
"tupleOfObjects Tuple(String,Int64,Boolean)," +
61-
") " +
62-
"ENGINE = MergeTree " +
63-
"ORDER BY (longPrimitive); ";
64-
}
6511
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 1 addition & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
377377
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
378378
ClickHouseServerForTests.executeSql(dropTable);
379379
// create table
380-
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10);
380+
String tableSql = SimplePOJO.createTableSQL(getDatabase(), tableName, 10);
381381
ClickHouseServerForTests.executeSql(tableSql);
382382
//ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName));
383383

@@ -422,63 +422,4 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
422422
void CheckClickHouseAlive() {
423423
Assertions.assertThrows(RuntimeException.class, () -> { new ClickHouseClientConfig(getServerURL(), getUsername() + "wrong_username", getPassword(), getDatabase(), "dummy");});
424424
}
425-
426-
@Test
427-
void SimplePOJOWithJSONDataTest() throws Exception {
428-
Assumptions.assumeTrue(
429-
isCloud() || ClickHouseTestHelpers.getClickhouseVersion().equalsIgnoreCase("latest"));
430-
431-
String tableName = "simple_pojo_with_json_data";
432-
433-
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
434-
ClickHouseServerForTests.executeSql(dropTable);
435-
// create table
436-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
437-
"longPrimitive Int64," +
438-
"jsonPayload JSON," +
439-
") " +
440-
"ENGINE = MergeTree " +
441-
"ORDER BY (longPrimitive); ";
442-
ClickHouseServerForTests.executeSql(tableSql);
443-
444-
TableSchema simplePOJOWithJSONTableSchema = ClickHouseServerForTests.getTableSchema(tableName);
445-
446-
POJOConvertor<SimplePOJOWithJSON> simplePOJOWithJSONConvertor = new SimplePOJOWithJSONConvertor(simplePOJOWithJSONTableSchema.hasDefaults());
447-
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
448-
env.setParallelism(STREAM_PARALLELISM);
449-
450-
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName, true);
451-
clickHouseClientConfig.setSupportDefault(simplePOJOWithJSONTableSchema.hasDefaults());
452-
ElementConverter<SimplePOJOWithJSON, ClickHousePayload> convertorCovid = new ClickHouseConvertor<>(SimplePOJOWithJSON.class, simplePOJOWithJSONConvertor);
453-
454-
ClickHouseAsyncSink<SimplePOJOWithJSON> simplePOJOWithJSONSink = new ClickHouseAsyncSink<>(
455-
convertorCovid,
456-
MAX_BATCH_SIZE,
457-
MAX_IN_FLIGHT_REQUESTS,
458-
MAX_BUFFERED_REQUESTS,
459-
MAX_BATCH_SIZE_IN_BYTES,
460-
MAX_TIME_IN_BUFFER_MS,
461-
MAX_RECORD_SIZE_IN_BYTES,
462-
clickHouseClientConfig
463-
);
464-
465-
List<SimplePOJOWithJSON> simplePOJOWithJSONList = new ArrayList<>();
466-
for (int i = 0; i < EXPECTED_ROWS; i++) {
467-
simplePOJOWithJSONList.add(new SimplePOJOWithJSON(i));
468-
}
469-
// create from list
470-
DataStream<SimplePOJOWithJSON> simplePOJOsWithJSON = env.fromElements(simplePOJOWithJSONList.toArray(new SimplePOJOWithJSON[0]));
471-
// send to a sink
472-
simplePOJOsWithJSON.sinkTo(simplePOJOWithJSONSink);
473-
int rows = executeAsyncJob(env, tableName, 100, EXPECTED_ROWS);
474-
Assertions.assertEquals(EXPECTED_ROWS, rows);
475-
476-
List<GenericRecord> genericRecordList = ClickHouseServerForTests.extractData(getDatabase(), tableName, "longPrimitive", "getSubcolumn(jsonPayload, 'bar') as bar");
477-
for (int j = 0; j < genericRecordList.size(); j++) {
478-
long longPrimitive = simplePOJOWithJSONList.get(j).getLongPrimitive();
479-
String foo = genericRecordList.get(j).getString("bar");
480-
Assertions.assertEquals(longPrimitive, genericRecordList.get(j).getLong("longPrimitive"));
481-
Assertions.assertEquals("foo", foo);
482-
}
483-
}
484425
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import org.apache.flink.connector.clickhouse.sink.pojo.SimplePOJO;
88

99
import java.io.IOException;
10-
import java.io.OutputStream;
1110

1211
public class SimplePOJOConvertor extends POJOConvertor<SimplePOJO> {
1312

@@ -32,44 +31,52 @@ public void instrument(DataWriter dataWriter, SimplePOJO input) throws IOExcepti
3231
dataWriter.writeInt128(input.getBigInteger128(), false, ClickHouseDataType.Int128, false, "bigInteger128");
3332
dataWriter.writeInt256(input.getBigInteger256(), false, ClickHouseDataType.Int256, false, "bigInteger256");
3433

35-
// UIntX
36-
dataWriter.writeUInt8(input.getUint8Primitive(), false, ClickHouseDataType.UInt8, false, "uint8Primitive");
37-
dataWriter.writeUInt8(input.getUint8Object(), false, ClickHouseDataType.UInt8, false, "uint8Object");
34+
dataWriter.writeUInt8(input.getUint8Primitive_Int(), false, ClickHouseDataType.UInt8, false, "uint8Primitive_Int");
35+
dataWriter.writeUInt8(input.getUint8Object_Int(), false, ClickHouseDataType.UInt8, false, "uint8Object_Int");
3836

39-
dataWriter.writeUInt16(input.getUint16Primitive(), false, ClickHouseDataType.UInt16, false, "uint8Primitive");
40-
dataWriter.writeUInt16(input.getUint16Object(), false, ClickHouseDataType.UInt16, false, "uint8Object");
37+
dataWriter.writeUInt8(input.getUint8Primitive_Short(), false, ClickHouseDataType.UInt8, false, "uint8Primitive_Short");
38+
dataWriter.writeUInt8(input.getUint8Object_Short(), false, ClickHouseDataType.UInt8, false, "uint8Object_Short");
4139

42-
dataWriter.writeUInt32(input.getUint32Primitive(), false, ClickHouseDataType.UInt32, false, "uint8Primitive");
43-
dataWriter.writeUInt32(input.getUint32Object(), false, ClickHouseDataType.UInt32, false, "uint8Object");
40+
dataWriter.writeUInt16(input.getUint16Primitive(), false, ClickHouseDataType.UInt16, false, "uint16Primitive");
41+
dataWriter.writeUInt16(input.getUint16Object(), false, ClickHouseDataType.UInt16, false, "uint16Object");
4442

45-
dataWriter.writeUInt64(input.getUint64Primitive(), false, ClickHouseDataType.UInt64, false, "uint8Primitive");
46-
dataWriter.writeUInt64(input.getUint64Object(), false, ClickHouseDataType.UInt64, false, "uint8Object");
43+
dataWriter.writeUInt32(input.getUint32Primitive(), false, ClickHouseDataType.UInt32, false, "uint32Primitive");
44+
dataWriter.writeUInt32(input.getUint32Object(), false, ClickHouseDataType.UInt32, false, "uint32Object");
4745

48-
dataWriter.writeUInt128(input.getUint128Object(), false, ClickHouseDataType.UInt128, false, "bigInteger128");
49-
dataWriter.writeUInt256(input.getUint256Object(), false, ClickHouseDataType.UInt256, false, "bigInteger256");
46+
dataWriter.writeUInt64(input.getUint64Primitive_Long(), false, ClickHouseDataType.UInt64, false, "uint64Primitive_Long");
47+
dataWriter.writeUInt64(input.getUint64Object_Long(), false, ClickHouseDataType.UInt64, false, "uint64Object_Long");
5048

51-
dataWriter.writeDecimal(input.getBigDecimal(), false, ClickHouseDataType.Decimal, false, "decimal", 10, 5);
52-
dataWriter.writeDecimal(input.getBigDecimal(), false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1);
53-
dataWriter.writeDecimal(input.getBigDecimal(), false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10);
54-
dataWriter.writeDecimal(input.getBigDecimal(), false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19);
55-
dataWriter.writeDecimal(input.getBigDecimal(), false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39);
49+
dataWriter.writeUInt64(input.getUint64Object_BigInt(), false, ClickHouseDataType.UInt64, false, "uint64Object_BigInt");
50+
51+
dataWriter.writeUInt128(input.getUint128Object(), false, ClickHouseDataType.UInt128, false, "uint128Object");
52+
dataWriter.writeUInt256(input.getUint256Object(), false, ClickHouseDataType.UInt256, false, "uint256Object");
53+
54+
dataWriter.writeDecimal(input.getBigDecimal(), false, ClickHouseDataType.Decimal, false, "bigDecimal", 10, 5);
55+
dataWriter.writeDecimal(input.getBigDecimal32(), false, ClickHouseDataType.Decimal32, false, "bigDecimal32", 9, 9);
56+
dataWriter.writeDecimal(input.getBigDecimal64(), false, ClickHouseDataType.Decimal64, false, "bigDecimal64", 18, 18);
57+
dataWriter.writeDecimal(input.getBigDecimal128(), false, ClickHouseDataType.Decimal128, false, "bigDecimal128", 38, 38);
58+
dataWriter.writeDecimal(input.getBigDecimal256(), false, ClickHouseDataType.Decimal256, false, "bigDecimal256", 76, 76);
5659

5760
dataWriter.writeFloat32(input.getFloatPrimitive(), false, ClickHouseDataType.Float32, false, "floatPrimitive");
5861
dataWriter.writeFloat32(input.getFloatObject(), false, ClickHouseDataType.Float32, false, "floatObject");
5962

6063
dataWriter.writeFloat64(input.getDoublePrimitive(), false, ClickHouseDataType.Float64, false, "doublePrimitive");
6164
dataWriter.writeFloat64(input.getDoubleObject(), false, ClickHouseDataType.Float64, false, "doubleObject");
6265

63-
dataWriter.writeBoolean(input.isBooleanPrimitive(), false, ClickHouseDataType.Bool, false, "booleanPrimitive");
66+
dataWriter.writeBoolean(input.getBooleanPrimitive(), false, ClickHouseDataType.Bool, false, "booleanPrimitive");
6467
dataWriter.writeBoolean(input.getBooleanObject(), false, ClickHouseDataType.Bool, false, "booleanObject");
6568

6669
dataWriter.writeString(input.getStr(), false, ClickHouseDataType.String, false, "str");
6770
dataWriter.writeFixedString(input.getFixedStr(), false, ClickHouseDataType.FixedString, false, "fixedStr", 10);
6871

69-
dataWriter.writeDate(input.getDate(), false, ClickHouseDataType.Date, false, "v_date");
70-
dataWriter.writeDate32(input.getDate32(), false, ClickHouseDataType.Date32, false, "v_date32");
71-
dataWriter.writeTimeDate(input.getDateTime(), false, ClickHouseDataType.DateTime, false, "v_dateTime");
72-
dataWriter.writeTimeDate64(input.getDateTime64(), false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);
72+
dataWriter.writeDate(input.getDateObject(), false, ClickHouseDataType.Date, false, "dateObject");
73+
dataWriter.writeDate32(input.getDate32Object(), false, ClickHouseDataType.Date32, false, "date32Object");
74+
75+
dataWriter.writeTimeDate(input.getDateTimeObject_Local(), false, ClickHouseDataType.DateTime, false, "dateTimeObject_Local");
76+
dataWriter.writeTimeDate64(input.getDateTime64Object_Local(), false, ClickHouseDataType.DateTime64, false, "dateTime64Object_Local", 6);
77+
78+
dataWriter.writeTimeDate(input.getDateTimeObject_Zoned(), false, ClickHouseDataType.DateTime, false, "dateTimeObject_Zoned");
79+
dataWriter.writeTimeDate64(input.getDateTime64Object_Zoned(), false, ClickHouseDataType.DateTime64, false, "dateTime64Object_Zoned", 6);
7380

7481
dataWriter.writeUUID(input.getUuid(), false, ClickHouseDataType.UUID, false, "uuid");
7582

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOWithDefaultsConvertor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ public SimplePOJOWithDefaultsConvertor(boolean schemaHasDefaults) {
1515

1616
@Override
1717
public void instrument(DataWriter dataWriter, SimplePOJOWithDefaults input) throws IOException {
18-
dataWriter.writeString(input.getId(), false, ClickHouseDataType.String, false, "id");
19-
dataWriter.writeTimeDate64(input.getCreatedOn(), false, ClickHouseDataType.DateTime64, true, "createdOn", 1);
18+
dataWriter.writeInt32(input.getId(), false, ClickHouseDataType.Int32, false, "id");
19+
dataWriter.writeTimeDate64(input.getCreatedOn(), false, ClickHouseDataType.DateTime64, true, "created_on", 6);
2020
}
21-
2221
}

0 commit comments

Comments
 (0)