Skip to content

Commit cbb1944

Browse files
authored
Merge pull request #21 from ClickHouse/clickhouse-support-for-tests
Clickhouse support for tests
2 parents a384faa + c018cbf commit cbb1944

File tree

8 files changed

+214
-10
lines changed

8 files changed

+214
-10
lines changed

.github/workflows/tests.yaml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,36 @@ on: [push]
55
jobs:
66
build:
77
runs-on: ubuntu-latest
8-
name: Apache Flink ClickHouse Connector tests
8+
strategy:
9+
fail-fast: false
10+
matrix:
11+
clickhouse: [ "23.7", "24.3", "latest", "cloud" ]
12+
name: Apache Flink ClickHouse Connector tests with ClickHouse ${{ matrix.clickhouse }}
913
steps:
14+
- name: Check for Cloud Credentials
15+
id: check-cloud-credentials
16+
run: |
17+
if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then
18+
echo "SKIP_STEP=true" >> $GITHUB_ENV
19+
else
20+
echo "SKIP_STEP=false" >> $GITHUB_ENV
21+
fi
22+
shell: bash
1023
- uses: actions/checkout@v3
24+
if: env.SKIP_STEP != 'true'
1125
- name: Set up JDK 17
26+
if: env.SKIP_STEP != 'true'
1227
uses: actions/setup-java@v3
1328
with:
1429
java-version: '21'
1530
distribution: 'adopt'
1631
architecture: x64
1732
- name: Setup and execute Gradle 'test' task
33+
if: env.SKIP_STEP != 'true'
1834
uses: gradle/gradle-build-action@v2
35+
env:
36+
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
37+
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
38+
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
1939
with:
2040
arguments: test

flink-connector-clickhouse-base/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ extra.apply {
1717
set("clickHouseDriverVersion", "0.8.5")
1818
set("flinkVersion", "2.0.0")
1919
set("log4jVersion","2.17.2")
20+
set("testContainersVersion", "1.21.0")
2021
}
2122

2223

@@ -56,6 +57,9 @@ dependencies {
5657
testImplementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}")
5758
// flink tests
5859
testImplementation("org.apache.flink:flink-test-utils:${project.extra["flinkVersion"]}")
60+
//
61+
testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}")
62+
testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}")
5963
}
6064

6165
// Apply a specific Java toolchain to ease working on different environments.

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/DummyFlinkClusterTest.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.apache.flink.connector.test;
22

33
import org.apache.flink.api.common.functions.MapFunction;
4+
import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests;
5+
import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests;
46
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
57
import org.apache.flink.streaming.util.TestStreamEnvironment;
68
import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -9,8 +11,9 @@
911
import java.util.ArrayList;
1012
import java.util.Collections;
1113
import java.util.List;
14+
import java.util.concurrent.ExecutionException;
1215

13-
class DummyFlinkClusterTest extends EmbeddedFlinkClusterForTests {
16+
class DummyFlinkClusterTest extends FlinkClusterTests {
1417
// A simple Collection Sink
1518
private static class CollectSink implements SinkFunction<Long> {
1619

@@ -39,11 +42,19 @@ void testDummyFlinkCluster() throws Exception {
3942
CollectSink.values.clear();
4043

4144
env.fromData(1L, 2L)
45+
.setParallelism(1)
4246
.map(new IncrementMapFunction())
4347
.addSink(new CollectSink());
4448
env.execute("testDummyFlinkCluster");
4549
Assertions.assertEquals(2, CollectSink.values.size());
46-
Assertions.assertEquals(2L, CollectSink.values.get(0));
47-
Assertions.assertEquals(3L, CollectSink.values.get(1));
50+
}
51+
52+
@Test
53+
void testClickHouse() throws ExecutionException, InterruptedException {
54+
String tableName = "clickhouse_test";
55+
String createTableSQl = String.format("CREATE TABLE `%s`.`%s` (order_id UInt64) ENGINE = MergeTree ORDER BY tuple(order_id);", ClickHouseServerForTests.getDataBase(), tableName);
56+
ClickHouseServerForTests.executeSql(createTableSQl);
57+
int rows = ClickHouseServerForTests.countRows(tableName);
58+
Assertions.assertEquals(0, rows);
4859
}
4960
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.apache.flink.connector.test;
2+
3+
import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests;
4+
import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests;
5+
import org.junit.jupiter.api.AfterAll;
6+
import org.junit.jupiter.api.BeforeAll;
7+
8+
public class FlinkClusterTests {
9+
@BeforeAll
10+
static void setUp() throws Exception {
11+
EmbeddedFlinkClusterForTests.setUp();
12+
ClickHouseServerForTests.setUp();
13+
}
14+
15+
@AfterAll
16+
static void tearDown() {
17+
EmbeddedFlinkClusterForTests.tearDown();
18+
ClickHouseServerForTests.tearDown();
19+
}
20+
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package org.apache.flink.connector.test.embedded.clickhouse;
2+
3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.query.GenericRecord;
5+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.util.HashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.concurrent.ExecutionException;
13+
14+
import org.testcontainers.clickhouse.ClickHouseContainer;
15+
16+
public class ClickHouseServerForTests {
17+
18+
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseServerForTests.class);
19+
20+
protected static boolean isCloud = ClickHouseTestHelpers.isCloud();
21+
protected static String database = null;
22+
protected static ClickHouseContainer db = null;
23+
24+
protected static String host = null;
25+
protected static int port = 0;
26+
protected static String username = null;
27+
protected static String password = null;
28+
protected static boolean isSSL = false;
29+
30+
31+
public static void initConfiguration() {
32+
if (isCloud) {
33+
LOG.info("Init ClickHouse Cloud Configuration");
34+
host = System.getenv("CLICKHOUSE_CLOUD_HOST");
35+
port = Integer.parseInt(ClickHouseTestHelpers.HTTPS_PORT);
36+
database = String.format("flink_connector_test_%s", System.currentTimeMillis());
37+
username = ClickHouseTestHelpers.USERNAME_DEFAULT;
38+
password = System.getenv("CLICKHOUSE_CLOUD_PASSWORD");
39+
} else {
40+
LOG.info("Init ClickHouse Docker Configuration");
41+
host = db.getHost();
42+
port = db.getFirstMappedPort();
43+
database = ClickHouseTestHelpers.DATABASE_DEFAULT;
44+
username = db.getUsername();
45+
password = db.getPassword();
46+
}
47+
isSSL = ClickHouseTestHelpers.isCloud();
48+
}
49+
public static void setUp() throws InterruptedException, ExecutionException {
50+
if (!isCloud) {
51+
db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE).withPassword("test_password").withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1");
52+
db.start();
53+
}
54+
initConfiguration();
55+
// wakeup cloud
56+
// have a for loop
57+
boolean isLive = false;
58+
int counter = 0;
59+
while (counter < 5) {
60+
isLive = ClickHouseTestHelpers.ping(host, port, isSSL, username, password);
61+
if (isLive) {
62+
String createDatabase = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database);
63+
executeSql(createDatabase);
64+
return;
65+
}
66+
Thread.sleep(2000);
67+
counter++;
68+
}
69+
throw new RuntimeException("Failed to connect to ClickHouse");
70+
}
71+
72+
public static void tearDown() {
73+
if (db != null) {
74+
db.stop();
75+
}
76+
}
77+
78+
public static String getDataBase() { return database; }
79+
80+
public static void executeSql(String sql) throws ExecutionException, InterruptedException {
81+
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
82+
client.execute(sql).get();
83+
}
84+
85+
public static int countRows(String table) throws ExecutionException, InterruptedException {
86+
String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table);
87+
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
88+
List<GenericRecord> countResult = client.queryAll(countSql);
89+
return countResult.get(0).getInteger(1);
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package org.apache.flink.connector.test.embedded.clickhouse;
2+
3+
import com.clickhouse.client.api.Client;
4+
import com.clickhouse.client.api.enums.Protocol;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.concurrent.TimeUnit;
9+
10+
public class ClickHouseTestHelpers {
11+
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseTestHelpers.class);
12+
13+
public static final String CLICKHOUSE_VERSION_DEFAULT = "24.3";
14+
public static final String CLICKHOUSE_PROXY_VERSION_DEFAULT = "23.8";
15+
public static final String CLICKHOUSE_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", getClickhouseVersion());
16+
public static final String CLICKHOUSE_FOR_PROXY_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", CLICKHOUSE_PROXY_VERSION_DEFAULT);
17+
18+
public static final String HTTPS_PORT = "8443";
19+
public static final String DATABASE_DEFAULT = "default";
20+
public static final String USERNAME_DEFAULT = "default";
21+
22+
private static final int TIMEOUT_VALUE = 60;
23+
private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;
24+
25+
public static String getClickhouseVersion() {
26+
String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION");
27+
if (clickHouseVersion == null) {
28+
clickHouseVersion = CLICKHOUSE_VERSION_DEFAULT;
29+
}
30+
return clickHouseVersion;
31+
}
32+
33+
public static boolean isCloud() {
34+
String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION");
35+
return clickHouseVersion != null && clickHouseVersion.equalsIgnoreCase("cloud");
36+
}
37+
38+
public static Client getClient(String host, int port, boolean ssl, String username, String password) {
39+
return new Client.Builder().addEndpoint(Protocol.HTTP, host, port, ssl)
40+
.setUsername(username)
41+
.setPassword(password)
42+
.setConnectTimeout(TIMEOUT_VALUE, TIMEOUT_UNIT.toChronoUnit())
43+
.build();
44+
}
45+
46+
public static boolean ping(String host, int port, boolean ssl, String username, String password) {
47+
Client client = getClient(host, port, ssl, username, password);
48+
return client.ping();
49+
}
50+
51+
}
Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.apache.flink.connector.test;
1+
package org.apache.flink.connector.test.embedded.flink;
22

33
import org.apache.flink.configuration.Configuration;
44
import org.apache.flink.configuration.RestOptions;
@@ -21,8 +21,7 @@ static int getFromEnvOrDefault(String key, int defaultValue) {
2121
return Integer.parseInt(value);
2222
}
2323

24-
@BeforeAll
25-
static void setUp() throws Exception {
24+
public static void setUp() throws Exception {
2625
Configuration config = new Configuration();
2726
config.set(RestOptions.PORT, REST_PORT); // web UI port (optional)
2827
config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS);
@@ -34,14 +33,13 @@ static void setUp() throws Exception {
3433
.build());
3534
flinkCluster.before();
3635
}
37-
@AfterAll
38-
static void tearDown() {
36+
public static void tearDown() {
3937
if (flinkCluster != null) {
4038
flinkCluster.after();
4139
}
4240
}
4341

44-
protected static MiniClusterWithClientResource getMiniCluster() {
42+
public static MiniClusterWithClientResource getMiniCluster() {
4543
if (flinkCluster == null)
4644
throw new RuntimeException("No MiniCluster available");
4745
return flinkCluster;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Define the root logger with appender X
2+
log4j.rootLogger=INFO, console
3+
#log4j.logger.org.testcontainers=WARN
4+
log4j.logger.com.clickhouse.kafka=DEBUG
5+
6+
log4j.appender.console= org.apache.log4j.ConsoleAppender
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.conversionPattern=[%d] %p %C %m%n

0 commit comments

Comments
 (0)