Skip to content

Commit a384faa

Browse files
authored
Merge pull request #20 from ClickHouse/embedded-for-tests
2 parents 06cda99 + a08d8d3 commit a384faa

File tree

4 files changed

+123
-0
lines changed

4 files changed

+123
-0
lines changed

.github/workflows/tests.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
name: Apache Flink ClickHouse Connector Tests CI
2+
3+
on: [push]
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
name: Apache Flink ClickHouse Connector tests
9+
steps:
10+
- uses: actions/checkout@v3
11+
- name: Set up JDK 17
12+
uses: actions/setup-java@v3
13+
with:
14+
java-version: '21'
15+
distribution: 'adopt'
16+
architecture: x64
17+
- name: Setup and execute Gradle 'test' task
18+
uses: gradle/gradle-build-action@v2
19+
with:
20+
arguments: test

flink-connector-clickhouse-base/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
*
44
*/
55

6+
plugins {
7+
kotlin("jvm") version "1.9.0"
8+
java
9+
}
10+
611
repositories {
712
// Use Maven Central for resolving dependencies.
813
mavenCentral()
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.apache.flink.connector.test;
2+
3+
import org.apache.flink.api.common.functions.MapFunction;
4+
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
5+
import org.apache.flink.streaming.util.TestStreamEnvironment;
6+
import org.apache.flink.test.util.MiniClusterWithClientResource;
7+
import org.junit.jupiter.api.*;
8+
9+
import java.util.ArrayList;
10+
import java.util.Collections;
11+
import java.util.List;
12+
13+
class DummyFlinkClusterTest extends EmbeddedFlinkClusterForTests {
14+
// A simple Collection Sink
15+
private static class CollectSink implements SinkFunction<Long> {
16+
17+
// must be static
18+
public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
19+
20+
@Override
21+
public void invoke(Long value, SinkFunction.Context context) throws Exception {
22+
values.add(value);
23+
}
24+
}
25+
26+
public static class IncrementMapFunction implements MapFunction<Long, Long> {
27+
28+
@Override
29+
public Long map(Long record) throws Exception {
30+
return record + 1;
31+
}
32+
}
33+
34+
@Test
35+
void testDummyFlinkCluster() throws Exception {
36+
MiniClusterWithClientResource flinkCluster = EmbeddedFlinkClusterForTests.getMiniCluster();
37+
TestStreamEnvironment env = flinkCluster.getTestStreamEnvironment();
38+
39+
CollectSink.values.clear();
40+
41+
env.fromData(1L, 2L)
42+
.map(new IncrementMapFunction())
43+
.addSink(new CollectSink());
44+
env.execute("testDummyFlinkCluster");
45+
Assertions.assertEquals(2, CollectSink.values.size());
46+
Assertions.assertEquals(2L, CollectSink.values.get(0));
47+
Assertions.assertEquals(3L, CollectSink.values.get(1));
48+
}
49+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.apache.flink.connector.test;
2+
3+
import org.apache.flink.configuration.Configuration;
4+
import org.apache.flink.configuration.RestOptions;
5+
import org.apache.flink.configuration.TaskManagerOptions;
6+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
7+
import org.apache.flink.test.util.MiniClusterWithClientResource;
8+
import org.junit.jupiter.api.AfterAll;
9+
import org.junit.jupiter.api.BeforeAll;
10+
11+
public class EmbeddedFlinkClusterForTests {
12+
13+
static MiniClusterWithClientResource flinkCluster = null;
14+
static int REST_PORT = getFromEnvOrDefault("REST_PORT", 8081);
15+
static int NUM_TASK_SLOTS = getFromEnvOrDefault("NUM_TASK_SLOTS", 2);
16+
static int NUM_TASK_SLOTS_PER_TASK = getFromEnvOrDefault("NUM_TASK_SLOTS_PER_TASK" , 2);
17+
static int NUM_TASK_MANAGERS = getFromEnvOrDefault("NUM_TASK_MANAGERS",3);
18+
19+
static int getFromEnvOrDefault(String key, int defaultValue) {
20+
String value = System.getenv().getOrDefault(key, String.valueOf(defaultValue));
21+
return Integer.parseInt(value);
22+
}
23+
24+
@BeforeAll
25+
static void setUp() throws Exception {
26+
Configuration config = new Configuration();
27+
config.set(RestOptions.PORT, REST_PORT); // web UI port (optional)
28+
config.set(TaskManagerOptions.NUM_TASK_SLOTS, NUM_TASK_SLOTS);
29+
flinkCluster = new MiniClusterWithClientResource(
30+
new MiniClusterResourceConfiguration.Builder()
31+
.setNumberSlotsPerTaskManager(NUM_TASK_SLOTS_PER_TASK)
32+
.setNumberTaskManagers(NUM_TASK_MANAGERS)
33+
.setConfiguration(config)
34+
.build());
35+
flinkCluster.before();
36+
}
37+
@AfterAll
38+
static void tearDown() {
39+
if (flinkCluster != null) {
40+
flinkCluster.after();
41+
}
42+
}
43+
44+
protected static MiniClusterWithClientResource getMiniCluster() {
45+
if (flinkCluster == null)
46+
throw new RuntimeException("No MiniCluster available");
47+
return flinkCluster;
48+
}
49+
}

0 commit comments

Comments
 (0)