Skip to content

Commit 318a239

Browse files
committed
Added useragent to easy identification
1 parent 0e97053 commit 318a239

File tree

4 files changed

+86
-1
lines changed

4 files changed

+86
-1
lines changed

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.clickhouse.client.api.Client;
44
import com.clickhouse.client.api.ClientConfigProperties;
5+
import org.apache.flink.runtime.util.EnvironmentInformation;
56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
78

@@ -16,6 +17,7 @@ public class ClickHouseClientConfig implements Serializable {
1617
private final String password;
1718
private final String database;
1819
private final String tableName;
20+
private final String fullProductName;
1921
private Boolean supportDefault = null;
2022

2123
public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) {
@@ -24,6 +26,7 @@ public ClickHouseClientConfig(String url, String username, String password, Stri
2426
this.password = password;
2527
this.database = database;
2628
this.tableName = tableName;
29+
this.fullProductName = String.format("Flink-ClickHouse-Sink/%s (fv:flink/%s, lv:scala/%s)", ClickHouseSinkVersion.getVersion(), EnvironmentInformation.getVersion(), EnvironmentInformation.getScalaVersion());
2730
}
2831

2932
public Client createClient(String database) {
@@ -32,6 +35,7 @@ public Client createClient(String database) {
3235
.setUsername(username)
3336
.setPassword(password)
3437
.setDefaultDatabase(database)
38+
.setClientName(fullProductName)
3539
.setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true")
3640
.build();
3741
return client;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.apache.flink.connector.clickhouse.sink;
2+
3+
public class ClickHouseSinkVersion {
4+
public static String getVersion() {
5+
return "0.0.1";
6+
}
7+
}

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,4 +237,67 @@ void SimplePOJODataTest() throws Exception {
237237
int rows = executeJob(env, tableName);
238238
Assertions.assertEquals(EXPECTED_ROWS, rows);
239239
}
240+
241+
@Test
242+
void ProductNameTest() throws Exception {
243+
String tableName = "product_name_csv_covid";
244+
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
245+
ClickHouseServerForTests.executeSql(dropTable);
246+
// create table
247+
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
248+
"date Date," +
249+
"location_key LowCardinality(String)," +
250+
"new_confirmed Int32," +
251+
"new_deceased Int32," +
252+
"new_recovered Int32," +
253+
"new_tested Int32," +
254+
"cumulative_confirmed Int32," +
255+
"cumulative_deceased Int32," +
256+
"cumulative_recovered Int32," +
257+
"cumulative_tested Int32" +
258+
") " +
259+
"ENGINE = MergeTree " +
260+
"ORDER BY (location_key, date); ";
261+
ClickHouseServerForTests.executeSql(tableSql);
262+
263+
final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment();
264+
env.setParallelism(1);
265+
266+
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
267+
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
268+
// create sink
269+
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
270+
convertorString,
271+
5000,
272+
2,
273+
20000,
274+
1024 * 1024,
275+
5 * 1000,
276+
1000,
277+
clickHouseClientConfig
278+
);
279+
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
280+
281+
Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz");
282+
283+
FileSource<String> source = FileSource
284+
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
285+
.build();
286+
// read csv data from file
287+
DataStreamSource<String> lines = env.fromSource(
288+
source,
289+
WatermarkStrategy.noWatermarks(),
290+
"GzipCsvSource"
291+
);
292+
lines.sinkTo(csvSink);
293+
int rows = executeJob(env, tableName);
294+
Assertions.assertEquals(EXPECTED_ROWS, rows);
295+
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
296+
// let's wait until data will be available in query log
297+
String productName = ClickHouseServerForTests.extractProductName();
298+
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/2.0.0, lv:scala/2.12)", ClickHouseSinkVersion.getVersion());
299+
300+
boolean isContains = productName.contains(compareString);
301+
Assertions.assertTrue(isContains);
302+
}
240303
}

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ public static String getURL() {
8888

8989
public static void executeSql(String sql) throws ExecutionException, InterruptedException {
9090
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
91-
client.execute(sql).get();
91+
try {
92+
client.execute(sql).get().close();
93+
} catch (Exception e) {
94+
throw new RuntimeException(e);
95+
}
9296
}
9397

9498
public static int countRows(String table) throws ExecutionException, InterruptedException {
@@ -98,6 +102,13 @@ public static int countRows(String table) throws ExecutionException, Interrupted
98102
return countResult.get(0).getInteger(1);
99103
}
100104

105+
public static String extractProductName() {
106+
String extractProductName = "SELECT http_user_agent FROM system.query_log WHERE type = 'QueryStart' and query_kind = 'Insert' LIMIT 10";
107+
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
108+
List<GenericRecord> userAgentResult = client.queryAll(extractProductName);
109+
return userAgentResult.get(0).getString(1);
110+
}
111+
101112
public static TableSchema getTableSchema(String table) throws ExecutionException, InterruptedException {
102113
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
103114
return client.getTableSchema(table, database);

0 commit comments

Comments
 (0)