Skip to content

Commit 9d641d9

Browse files
committed
Adding scala tests
1 parent ab73284 commit 9d641d9

File tree

6 files changed

+180
-11
lines changed

6 files changed

+180
-11
lines changed

.github/workflows/tests.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,13 @@ jobs:
3737
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
3838
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
3939
with:
40-
arguments: test
40+
arguments: test
41+
- name: Setup and execute Gradle 'runScalaTests' task
42+
if: env.SKIP_STEP != 'true'
43+
uses: gradle/gradle-build-action@v2
44+
env:
45+
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
46+
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
47+
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
48+
with:
49+
arguments: runScalaTests

flink-connector-clickhouse-base/build.gradle.kts

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
*/
55

66
plugins {
7-
kotlin("jvm") version "1.9.0"
7+
scala
88
java
99
}
1010

11+
val scalaVersion = "2.13.12"
12+
1113
repositories {
1214
// Use Maven Central for resolving dependencies.
1315
mavenCentral()
@@ -20,7 +22,6 @@ extra.apply {
2022
set("testContainersVersion", "1.21.0")
2123
}
2224

23-
2425
dependencies {
2526
// Use JUnit Jupiter for testing.
2627
testImplementation(libs.junit.jupiter)
@@ -29,6 +30,8 @@ dependencies {
2930

3031
// This dependency is used by the application.
3132
implementation(libs.guava)
33+
implementation("org.scala-lang:scala-library:$scalaVersion")
34+
implementation("org.scala-lang:scala-compiler:$scalaVersion")
3235
// logger
3336
implementation("org.apache.logging.log4j:log4j-slf4j-impl:${project.extra["log4jVersion"]}")
3437
implementation("org.apache.logging.log4j:log4j-api:${project.extra["log4jVersion"]}")
@@ -60,6 +63,28 @@ dependencies {
6063
//
6164
testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}")
6265
testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}")
66+
testImplementation("org.scalatest:scalatest_2.13:3.2.19")
67+
testRuntimeOnly("org.scalatestplus:junit-4-13_2.13:3.2.18.0")
68+
// testRuntimeOnly("org.pegdown:pegdown:1.6.0") // sometimes required by ScalaTest
69+
}
70+
71+
sourceSets {
72+
main {
73+
scala {
74+
srcDirs("src/main/scala")
75+
}
76+
java {
77+
srcDirs("src/main/java")
78+
}
79+
}
80+
test {
81+
scala {
82+
srcDirs("src/test/scala")
83+
}
84+
java {
85+
srcDirs("src/test/java")
86+
}
87+
}
6388
}
6489

6590
// Apply a specific Java toolchain to ease working on different environments.
@@ -69,7 +94,36 @@ java {
6994
}
7095
}
7196

97+
tasks.test {
98+
useJUnitPlatform()
99+
100+
include("**/*Test.class", "**/*Tests.class", "**/*Spec.class")
101+
testLogging {
102+
events("passed", "failed", "skipped")
103+
//showStandardStreams = true - , "standardOut", "standardError"
104+
}
105+
}
106+
107+
tasks.withType<ScalaCompile> {
108+
scalaCompileOptions.apply {
109+
encoding = "UTF-8"
110+
isDeprecation = true
111+
additionalParameters = listOf("-feature", "-unchecked")
112+
}
113+
}
114+
72115
tasks.named<Test>("test") {
73116
// Use JUnit Platform for unit tests.
74117
useJUnitPlatform()
75118
}
119+
120+
tasks.register<JavaExec>("runScalaTests") {
121+
group = "verification"
122+
mainClass.set("org.scalatest.tools.Runner")
123+
classpath = sourceSets["test"].runtimeClasspath
124+
args = listOf(
125+
"-R", "build/classes/scala/test",
126+
"-oD", // show durations
127+
"-s", "org.apache.flink.connector.clickhouse.test.scala.ClickHouseSinkTests"
128+
)
129+
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class ClickHouseAsyncSink<InputT> extends AsyncSinkBase<InputT, ClickHous
2323
protected ClickHouseClientConfig clickHouseClientConfig;
2424
protected ClickHouseFormat clickHouseFormat = null;
2525

26-
protected ClickHouseAsyncSink(
26+
public ClickHouseAsyncSink(
2727
ElementConverter<InputT, ClickHousePayload> converter,
2828
int maxBatchSize,
2929
int maxInFlightRequests,

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void CSVDataTest() throws Exception {
8282
clickHouseClientConfig
8383
);
8484
// in case of just want to forward our data use the appropriate ClickHouse format
85-
csvSink.setClickHouseFormat(ClickHouseFormat.TSV);
85+
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
8686

8787
Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz");
8888

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,32 @@
66
import org.junit.jupiter.api.BeforeAll;
77

88
public class FlinkClusterTests {
9+
910
@BeforeAll
10-
static void setUp() throws Exception {
11+
public static void setUp() throws Exception {
1112
EmbeddedFlinkClusterForTests.setUp();
1213
ClickHouseServerForTests.setUp();
1314
}
1415

1516
@AfterAll
16-
static void tearDown() {
17+
public static void tearDown() {
1718
EmbeddedFlinkClusterForTests.tearDown();
1819
ClickHouseServerForTests.tearDown();
1920
}
2021

21-
public String getServerURL() {
22+
public static String getServerURL() {
2223
return ClickHouseServerForTests.getURL();
2324
}
2425

25-
public String getUsername() {
26+
public static String getUsername() {
2627
return ClickHouseServerForTests.getUsername();
2728
}
2829

29-
public String getPassword() {
30+
public static String getPassword() {
3031
return ClickHouseServerForTests.getPassword();
3132
}
3233

33-
public String getDatabase() {
34+
public static String getDatabase() {
3435
return ClickHouseServerForTests.getDatabase();
3536
}
3637
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package org.apache.flink.connector.clickhouse.test.scala
2+
3+
import com.clickhouse.data.ClickHouseFormat
4+
import org.apache.flink.api.common.eventtime.WatermarkStrategy
5+
import org.apache.flink.connector.clickhouse.convertor.ClickHouseConvertor
6+
import org.apache.flink.connector.clickhouse.sink.{ClickHouseAsyncSink, ClickHouseClientConfig}
7+
import org.apache.flink.connector.file.src.FileSource
8+
import org.apache.flink.connector.file.src.reader.TextLineInputFormat
9+
import org.apache.flink.connector.test.FlinkClusterTests
10+
import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests
11+
import org.apache.flink.connector.test.embedded.flink.EmbeddedFlinkClusterForTests
12+
import org.apache.flink.core.execution.JobClient
13+
import org.apache.flink.core.fs.Path
14+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
15+
import org.scalatest.BeforeAndAfterAll
16+
import org.scalatest.funsuite.AnyFunSuite
17+
18+
class ClickHouseSinkTests extends AnyFunSuite with BeforeAndAfterAll {
19+
20+
val EXPECTED_ROWS = 10000
21+
22+
override def beforeAll(): Unit = {
23+
FlinkClusterTests.setUp()
24+
}
25+
26+
override def afterAll(): Unit = {
27+
FlinkClusterTests.tearDown()
28+
}
29+
30+
@throws[Exception]
31+
private def executeJob(env: StreamExecutionEnvironment, tableName: String) = {
32+
val jobClient : JobClient = env.executeAsync("Read GZipped CSV with FileSource")
33+
var rows : Integer = 0
34+
var iterations : Integer = 0
35+
var continue : Boolean = true
36+
while (iterations < 10 && continue) {
37+
Thread.sleep(1000)
38+
iterations += 1
39+
rows = ClickHouseServerForTests.countRows(tableName)
40+
System.out.println("Rows: " + rows)
41+
if (rows == EXPECTED_ROWS) continue = false //todo: break is not supported
42+
}
43+
// cancel job
44+
jobClient.cancel
45+
rows
46+
}
47+
48+
test("csv data") {
49+
val tableName = "csv_scala_covid"
50+
val dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", FlinkClusterTests.getDatabase, tableName)
51+
ClickHouseServerForTests.executeSql(dropTable)
52+
// create table
53+
val tableSql = "CREATE TABLE `" + FlinkClusterTests.getDatabase + "`.`" + tableName + "` (" +
54+
"date Date," +
55+
"location_key LowCardinality(String)," +
56+
"new_confirmed Int32," +
57+
"new_deceased Int32," +
58+
"new_recovered Int32," +
59+
"new_tested Int32," +
60+
"cumulative_confirmed Int32," +
61+
"cumulative_deceased Int32," +
62+
"cumulative_recovered Int32," +
63+
"cumulative_tested Int32" +
64+
") " +
65+
"ENGINE = MergeTree " +
66+
"ORDER BY (location_key, date); "
67+
68+
ClickHouseServerForTests.executeSql(tableSql);
69+
val env = EmbeddedFlinkClusterForTests.getMiniCluster.getTestStreamEnvironment
70+
71+
env.setParallelism(1);
72+
73+
val clickHouseClientConfig = new ClickHouseClientConfig(FlinkClusterTests.getServerURL, FlinkClusterTests.getUsername, FlinkClusterTests.getPassword, FlinkClusterTests.getDatabase, tableName);
74+
val convertorString: ClickHouseConvertor[String] = new ClickHouseConvertor(classOf[String]);
75+
// create sink
76+
val csvSink = new ClickHouseAsyncSink[String](
77+
convertorString,
78+
5000,
79+
2,
80+
20000,
81+
1024 * 1024,
82+
5 * 1000,
83+
1000,
84+
clickHouseClientConfig
85+
);
86+
// in case of just want to forward our data use the appropriate ClickHouse format
87+
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
88+
89+
val filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz");
90+
91+
val source = FileSource
92+
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
93+
.build();
94+
95+
val lines = env.fromSource(
96+
source,
97+
WatermarkStrategy.noWatermarks(),
98+
"GzipCsvSource"
99+
);
100+
lines.sinkTo(csvSink);
101+
val rows = executeJob(env, tableName);
102+
assert(EXPECTED_ROWS == rows);
103+
}
104+
105+
}

0 commit comments

Comments
 (0)