Skip to content

Commit 3349e4c

Browse files
committed
review comments
1 parent e2f4fc6 commit 3349e4c

File tree

16 files changed

+363
-304
lines changed

16 files changed

+363
-304
lines changed

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.util.ArrayList;
2727
import java.util.List;
28+
import java.util.concurrent.ExecutionException;
2829

2930
import static org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests.*;
3031
import static org.apache.flink.connector.clickhouse.sink.ClickHouseSinkTestUtils.*;
@@ -40,8 +41,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4041
@Test
4142
void CSVDataTest() throws Exception {
4243
String tableName = "csv_covid";
43-
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
44-
ClickHouseServerForTests.executeSql(dropTable);
44+
dropTableIfExists(getDatabase(), tableName);
4545
// create table
4646
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
4747
"date Date," +
@@ -97,24 +97,9 @@ void CSVDataTest() throws Exception {
9797
void CovidPOJODataTest() throws Exception {
9898
String tableName = "covid_pojo";
9999

100-
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
101-
ClickHouseServerForTests.executeSql(dropTable);
100+
dropTableIfExists(getDatabase(), tableName);
102101
// create table
103-
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
104-
"date Date," +
105-
"location_key LowCardinality(String)," +
106-
"new_confirmed Int32," +
107-
"new_deceased Int32," +
108-
"new_recovered Int32," +
109-
"new_tested Int32," +
110-
"cumulative_confirmed Int32," +
111-
"cumulative_deceased Int32," +
112-
"cumulative_recovered Int32," +
113-
"cumulative_tested Int32" +
114-
") " +
115-
"ENGINE = MergeTree " +
116-
"ORDER BY (location_key, date); ";
117-
ClickHouseServerForTests.executeSql(tableSql);
102+
ClickHouseServerForTests.executeSql(CovidPOJO.createTableSql(getDatabase(), tableName));
118103

119104
TableSchema covidTableSchema = ClickHouseServerForTests.getTableSchema(tableName);
120105

@@ -166,8 +151,7 @@ public CovidPOJO map(String value) throws Exception {
166151
void ProductNameTest() throws Exception {
167152
String flinkVersion = System.getenv("FLINK_VERSION") != null ? System.getenv("FLINK_VERSION") : "1.17.2";
168153
String tableName = "product_name_csv_covid";
169-
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
170-
ClickHouseServerForTests.executeSql(dropTable);
154+
dropTableIfExists(getDatabase(), tableName);
171155
// create table
172156
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
173157
"date Date," +
@@ -241,8 +225,7 @@ void ProductNameTest() throws Exception {
241225
@Test
242226
void CSVDataOnFailureDropDataTest() throws Exception {
243227
String tableName = "csv_failure_covid";
244-
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
245-
ClickHouseServerForTests.executeSql(dropTable);
228+
dropTableIfExists(getDatabase(), tableName);
246229
// create table
247230
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
248231
"date Date," +
@@ -304,8 +287,7 @@ void CSVDataOnFailureDropDataTest() throws Exception {
304287
@Test
305288
void CSVDataOnRetryAndDropDataTest() throws Exception {
306289
String tableName = "csv_retry_covid";
307-
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
308-
ClickHouseServerForTests.executeSql(dropTable);
290+
dropTableIfExists(getDatabase(), tableName);
309291
// create table
310292
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
311293
"date Date," +
@@ -371,8 +353,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
371353
return;
372354
String tableName = "simple_too_many_parts_pojo";
373355

374-
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
375-
ClickHouseServerForTests.executeSql(dropTable);
356+
dropTableIfExists(getDatabase(), tableName);
376357
// create table
377358
String tableSql = SimplePOJO.createTableSQL(getDatabase(), tableName, 10);
378359
ClickHouseServerForTests.executeSql(tableSql);
@@ -421,4 +402,9 @@ void CheckClickHouseAlive() {
421402
new ClickHouseClientConfig(getServerURL(), getUsername() + "wrong_username", getPassword(), getDatabase(), "dummy");
422403
});
423404
}
405+
406+
private static void dropTableIfExists(String database, String tableName) throws ExecutionException, InterruptedException {
407+
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", database, tableName);
408+
ClickHouseServerForTests.executeSql(dropTable);
409+
}
424410
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/convertor/SimplePOJOConvertor.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,22 @@ public void instrument(DataWriter dataWriter, SimplePOJO input) throws IOExcepti
3131
dataWriter.writeInt128(input.getBigInteger128(), false, ClickHouseDataType.Int128, false, "bigInteger128");
3232
dataWriter.writeInt256(input.getBigInteger256(), false, ClickHouseDataType.Int256, false, "bigInteger256");
3333

34-
dataWriter.writeUInt8(input.getUint8Primitive_Int(), false, ClickHouseDataType.UInt8, false, "uint8Primitive_Int");
35-
dataWriter.writeUInt8(input.getUint8Object_Int(), false, ClickHouseDataType.UInt8, false, "uint8Object_Int");
34+
dataWriter.writeUInt8(input.getUint8PrimitiveInt(), false, ClickHouseDataType.UInt8, false, "uint8PrimitiveInt");
35+
dataWriter.writeUInt8(input.getUint8ObjectInt(), false, ClickHouseDataType.UInt8, false, "uint8ObjectInt");
3636

37-
dataWriter.writeUInt8(input.getUint8Primitive_Short(), false, ClickHouseDataType.UInt8, false, "uint8Primitive_Short");
38-
dataWriter.writeUInt8(input.getUint8Object_Short(), false, ClickHouseDataType.UInt8, false, "uint8Object_Short");
37+
dataWriter.writeUInt8(input.getUint8PrimitiveShort(), false, ClickHouseDataType.UInt8, false, "uint8PrimitiveShort");
38+
dataWriter.writeUInt8(input.getUint8ObjectShort(), false, ClickHouseDataType.UInt8, false, "uint8ObjectShort");
3939

4040
dataWriter.writeUInt16(input.getUint16Primitive(), false, ClickHouseDataType.UInt16, false, "uint16Primitive");
4141
dataWriter.writeUInt16(input.getUint16Object(), false, ClickHouseDataType.UInt16, false, "uint16Object");
4242

4343
dataWriter.writeUInt32(input.getUint32Primitive(), false, ClickHouseDataType.UInt32, false, "uint32Primitive");
4444
dataWriter.writeUInt32(input.getUint32Object(), false, ClickHouseDataType.UInt32, false, "uint32Object");
4545

46-
dataWriter.writeUInt64(input.getUint64Primitive_Long(), false, ClickHouseDataType.UInt64, false, "uint64Primitive_Long");
47-
dataWriter.writeUInt64(input.getUint64Object_Long(), false, ClickHouseDataType.UInt64, false, "uint64Object_Long");
46+
dataWriter.writeUInt64(input.getUint64PrimitiveLong(), false, ClickHouseDataType.UInt64, false, "uint64PrimitiveLong");
47+
dataWriter.writeUInt64(input.getUint64ObjectLong(), false, ClickHouseDataType.UInt64, false, "uint64ObjectLong");
4848

49-
dataWriter.writeUInt64(input.getUint64Object_BigInt(), false, ClickHouseDataType.UInt64, false, "uint64Object_BigInt");
49+
dataWriter.writeUInt64(input.getUint64ObjectBigInt(), false, ClickHouseDataType.UInt64, false, "uint64ObjectBigInt");
5050

5151
dataWriter.writeUInt128(input.getUint128Object(), false, ClickHouseDataType.UInt128, false, "uint128Object");
5252
dataWriter.writeUInt256(input.getUint256Object(), false, ClickHouseDataType.UInt256, false, "uint256Object");
@@ -72,11 +72,11 @@ public void instrument(DataWriter dataWriter, SimplePOJO input) throws IOExcepti
7272
dataWriter.writeDate(input.getDateObject(), false, ClickHouseDataType.Date, false, "dateObject");
7373
dataWriter.writeDate32(input.getDate32Object(), false, ClickHouseDataType.Date32, false, "date32Object");
7474

75-
dataWriter.writeDateTime(input.getDateTimeObject_Local(), false, ClickHouseDataType.DateTime, false, "dateTimeObject_Local");
76-
dataWriter.writeDateTime64(input.getDateTime64Object_Local(), false, ClickHouseDataType.DateTime64, false, "dateTime64Object_Local", 6);
75+
dataWriter.writeDateTime(input.getDateTimeObjectLocal(), false, ClickHouseDataType.DateTime, false, "dateTimeObjectLocal");
76+
dataWriter.writeDateTime64(input.getDateTime64ObjectLocal(), false, ClickHouseDataType.DateTime64, false, "dateTime64ObjectLocal", 6);
7777

78-
dataWriter.writeDateTime(input.getDateTimeObject_Zoned(), false, ClickHouseDataType.DateTime, false, "dateTimeObject_Zoned");
79-
dataWriter.writeDateTime64(input.getDateTime64Object_Zoned(), false, ClickHouseDataType.DateTime64, false, "dateTime64Object_Zoned", 6);
78+
dataWriter.writeDateTime(input.getDateTimeObjectZoned(), false, ClickHouseDataType.DateTime, false, "dateTimeObjectZoned");
79+
dataWriter.writeDateTime64(input.getDateTime64ObjectZoned(), false, ClickHouseDataType.DateTime64, false, "dateTime64ObjectZoned", 6);
8080

8181
dataWriter.writeUUID(input.getUuid(), false, ClickHouseDataType.UUID, false, "uuid");
8282

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/CovidPOJO.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,21 @@ public Integer getCumulative_tested() {
117117
public void setCumulative_tested(Integer cumulative_tested) {
118118
this.cumulative_tested = cumulative_tested;
119119
}
120+
121+
public static String createTableSql(String database, String tableName) {
122+
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
123+
"date Date," +
124+
"location_key LowCardinality(String)," +
125+
"new_confirmed Int32," +
126+
"new_deceased Int32," +
127+
"new_recovered Int32," +
128+
"new_tested Int32," +
129+
"cumulative_confirmed Int32," +
130+
"cumulative_deceased Int32," +
131+
"cumulative_recovered Int32," +
132+
"cumulative_tested Int32" +
133+
") " +
134+
"ENGINE = MergeTree " +
135+
"ORDER BY (location_key, date); ";
136+
}
120137
}

flink-connector-clickhouse-1.17/src/test/java/org/apache/flink/connector/clickhouse/sink/pojo/DateTimePOJO.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,14 @@ public enum DataType {
3030
DATETIME,
3131
DATETIME64
3232
}
33+
34+
public static String createTableSql(String database, String tableName, String dateTimeType) {
35+
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
36+
"id String," +
37+
String.format("created_at %s,", dateTimeType) +
38+
"num_logins Int32," +
39+
") " +
40+
"ENGINE = MergeTree " +
41+
"ORDER BY (id); ";
42+
}
3343
}

0 commit comments

Comments
 (0)