diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml new file mode 100644 index 0000000..b1d9270 --- /dev/null +++ b/.github/workflows/integration-tests.yaml @@ -0,0 +1,69 @@ +name: Integration Tests CI (Java) + +on: + push: + paths-ignore: + - '**.md' + - 'LICENSE' + - 'examples' + +jobs: + test-integration-tests: + runs-on: ubuntu-latest + timeout-minutes: 10 + strategy: + fail-fast: false + matrix: + clickhouse: [ "23.7", "24.3", "latest", "cloud" ] + flink: [ "1.17.2", "1.18.1", "1.19.3", "1.20.2", "2.0.0", "latest"] + steps: + - name: Check for Cloud Credentials + id: check-cloud-credentials + run: | + if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then + echo "SKIP_STEP=true" >> $GITHUB_ENV + else + echo "SKIP_STEP=false" >> $GITHUB_ENV + fi + shell: bash + - uses: actions/checkout@v3 + if: env.SKIP_STEP != 'true' + - name: Set up JDK 17 + if: env.SKIP_STEP != 'true' + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'adopt' + architecture: x64 + - name: Publish locally flink-connector-clickhouse-1.17 + if: env.SKIP_STEP != 'true' + uses: gradle/gradle-build-action@v2 + with: + arguments: :flink-connector-clickhouse-1.17:publishToMavenLocal + - name: Publish locally flink-connector-clickhouse-2.0.0 + if: env.SKIP_STEP != 'true' + uses: gradle/gradle-build-action@v2 + with: + arguments: :flink-connector-clickhouse-2.0.0:publishToMavenLocal + - name: Generate Flink Covid App example 2.X + if: env.SKIP_STEP != 'true' + working-directory: ./examples/maven/flink-v2/covid + run: mvn clean install + - name: Generate Flink Covid App example 1.17+ + if: env.SKIP_STEP != 'true' + working-directory: ./examples/maven/flink-v1.7/covid + run: mvn clean install + - name: Setup and execute Gradle 'integration-test' task + if: env.SKIP_STEP != 'true' + uses: gradle/gradle-build-action@v2 + env: + CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} + CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} + CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} + FLINK_VERSION: ${{ matrix.flink }} + with: + arguments: :flink-connector-clickhouse-integration:test + + + + diff --git a/.github/workflows/tests-scala.yaml b/.github/workflows/tests-scala.yaml index d3fe425..170abf4 100644 --- a/.github/workflows/tests-scala.yaml +++ b/.github/workflows/tests-scala.yaml @@ -5,6 +5,7 @@ on: [push] jobs: test-flink-17: runs-on: ubuntu-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: @@ -42,6 +43,7 @@ jobs: arguments: :flink-connector-clickhouse-1.17:runScalaTests test-flink-2: runs-on: ubuntu-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 85dc7e4..48c9b70 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -5,6 +5,7 @@ on: [push] jobs: test-flink-17: runs-on: ubuntu-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: @@ -23,11 +24,11 @@ jobs: shell: bash - uses: actions/checkout@v3 if: env.SKIP_STEP != 'true' - - name: Set up JDK 21 + - name: Set up JDK 17 if: env.SKIP_STEP != 'true' uses: actions/setup-java@v3 with: - java-version: '21' + java-version: '17' distribution: 'adopt' architecture: x64 - name: Setup and execute Gradle 'test' task @@ -42,6 +43,7 @@ jobs: arguments: :flink-connector-clickhouse-1.17:test test-flink-2: runs-on: ubuntu-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: @@ -59,11 +61,11 @@ jobs: shell: bash - uses: actions/checkout@v3 if: env.SKIP_STEP != 'true' - - name: Set up JDK 21 + - name: Set up JDK 17 if: env.SKIP_STEP != 'true' uses: actions/setup-java@v3 with: - java-version: '21' + java-version: '17' distribution: 'adopt' architecture: x64 - name: Setup and execute Gradle 'test' task diff --git a/build.gradle.kts b/build.gradle.kts index b760824..de2f187 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -3,13 +3,12 @@ plugins { java signing id("com.gradleup.nmcp") version "0.0.8" - id("com.github.johnrengelman.shadow") version "8.1.1" - + id("com.gradleup.shadow") version "9.0.2" } val sinkVersion by extra("0.0.1") val flinkVersion by extra("1.18.0") -val clickhouseVersion by extra("0.4.6") +val clickhouseVersion by extra("0.9.1") val junitVersion by extra("5.8.2") allprojects { @@ -75,23 +74,4 @@ subprojects { "-s", "org.apache.flink.connector.clickhouse.test.scala.ClickHouseSinkTests" ) } -} - -//sourceSets { -// main { -// scala { -// srcDirs("src/main/scala") -// } -// java { -// srcDirs("src/main/java") -// } -// } -// test { -// scala { -// srcDirs("src/test/scala") -// } -// java { -// srcDirs("src/test/java") -// } -// } -//} +} \ No newline at end of file diff --git a/examples/maven/flink-v1.7/covid/pom.xml b/examples/maven/flink-v1.7/covid/pom.xml new file mode 100644 index 0000000..a6bf8b9 --- /dev/null +++ b/examples/maven/flink-v1.7/covid/pom.xml @@ -0,0 +1,225 @@ + + + 4.0.0 + + com.clickhouse.example.covid + covid + 1.0-SNAPSHOT + jar + + Flink Quickstart Job + + + UTF-8 + 1.17.2 + 11 + 2.12 + ${target.java.version} + ${target.java.version} + 2.24.1 + + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + com.clickhouse.flink + flink-connector-clickhouse-1.17 + 0.0.1 + all + + + + + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + false + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.clickhouse.example.covid.DataStreamJob + + + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-shade-plugin + [3.1.1,) + + shade + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + diff --git a/examples/maven/flink-v1.7/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java b/examples/maven/flink-v1.7/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java new file mode 100644 index 0000000..fb258d1 --- /dev/null +++ b/examples/maven/flink-v1.7/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.clickhouse.example.covid; + +import com.clickhouse.data.ClickHouseFormat; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.clickhouse.convertor.ClickHouseConvertor; +import org.apache.flink.connector.clickhouse.data.ClickHousePayload; +import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncSink; +import org.apache.flink.connector.clickhouse.sink.ClickHouseClientConfig; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.connector.file.src.reader.TextLineInputFormat; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + + +/** + * Skeleton for a Flink DataStream Job. + * + *

For a tutorial how to write a Flink application, check the + * tutorials and examples on the Flink Website. + * + *

To package your application into a JAR file for execution, run + * 'mvn clean package' on the command line. + * + *

If you change the name of the main class (with the public static void main(String[] args)) + * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + */ +public class DataStreamJob { + + static final int MAX_BATCH_SIZE = 5000; + static final int MAX_IN_FLIGHT_REQUESTS = 2; + static final int MAX_BUFFERED_REQUESTS = 20000; + static final long MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024; + static final long MAX_TIME_IN_BUFFER_MS = 5 * 1000; + static final long MAX_RECORD_SIZE_IN_BYTES = 1000; + + /* + Create covid table before running the example + + CREATE TABLE `default`.`covid` ( + date Date, + location_key LowCardinality(String), + new_confirmed Int32, + new_deceased Int32, + new_recovered Int32, + new_tested Int32, + cumulative_confirmed Int32, + cumulative_deceased Int32, + cumulative_recovered Int32, + cumulative_tested Int32 + ) + ENGINE = MergeTree + ORDER BY (location_key, date); + */ + + public static void main(String[] args) throws Exception { + // Sets up the execution environment, which is the main entry point + ParameterTool parameters = ParameterTool.fromArgs(args); + final String fileFullName = parameters.get("input"); + final String url = parameters.get("url"); + final String username = parameters.get("username"); + final String password = parameters.get("password"); + final String database = parameters.get("database"); + final String tableName = parameters.get("table"); + + ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName); + ElementConverter convertorString = new ClickHouseConvertor<>(String.class); + + ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( + convertorString, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, + clickHouseClientConfig + ); + csvSink.setClickHouseFormat(ClickHouseFormat.CSV); + + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + + Path filePath = new Path(fileFullName); + FileSource source = FileSource + .forRecordStreamFormat(new TextLineInputFormat(), filePath) + .build(); + + DataStreamSource lines = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "GzipCsvSource" + ); + lines.sinkTo(csvSink); + env.execute("Flink Java API Read CSV (covid)"); + } +} diff --git a/examples/maven/covid/src/main/resources/log4j2.properties b/examples/maven/flink-v1.7/covid/src/main/resources/log4j2.properties similarity index 100% rename from examples/maven/covid/src/main/resources/log4j2.properties rename to examples/maven/flink-v1.7/covid/src/main/resources/log4j2.properties diff --git a/examples/maven/covid/pom.xml b/examples/maven/flink-v2/covid/pom.xml similarity index 99% rename from examples/maven/covid/pom.xml rename to examples/maven/flink-v2/covid/pom.xml index d55421a..9128176 100644 --- a/examples/maven/covid/pom.xml +++ b/examples/maven/flink-v2/covid/pom.xml @@ -71,7 +71,7 @@ under the License. org.apache.flink flink-connector-files - 2.0.0 + ${flink.version} provided diff --git a/examples/maven/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java b/examples/maven/flink-v2/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java similarity index 100% rename from examples/maven/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java rename to examples/maven/flink-v2/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java diff --git a/examples/maven/flink-v2/covid/src/main/resources/log4j2.properties b/examples/maven/flink-v2/covid/src/main/resources/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/examples/maven/flink-v2/covid/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-connector-clickhouse-1.17/build.gradle.kts b/flink-connector-clickhouse-1.17/build.gradle.kts index 5d78887..3f33d3f 100644 --- a/flink-connector-clickhouse-1.17/build.gradle.kts +++ b/flink-connector-clickhouse-1.17/build.gradle.kts @@ -1,6 +1,10 @@ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + /* - * This file is the build file of flink-connector-clickhouse-base submodule - * + * Build configuration for Flink 1.17+ ClickHouse Connector + * + * This module provides Apache Flink 1.17+ compatibility for the ClickHouse connector. + * It depends on the flink-connector-clickhouse-base module for shared functionality. */ plugins { @@ -8,12 +12,14 @@ plugins { scala java signing + `java-test-fixtures` id("com.gradleup.nmcp") version "0.0.8" - id("com.github.johnrengelman.shadow") version "8.1.1" + id("com.gradleup.shadow") version "9.0.2" } val scalaVersion = "2.13.12" val sinkVersion: String by rootProject.extra +val clickhouseVersion: String by rootProject.extra repositories { mavenCentral() @@ -22,7 +28,6 @@ repositories { val flinkVersion = System.getenv("FLINK_VERSION") ?: "1.17.2" extra.apply { - set("clickHouseDriverVersion", "0.9.1") set("flinkVersion", flinkVersion) set("log4jVersion","2.17.2") set("testContainersVersion", "1.21.0") @@ -46,11 +51,10 @@ dependencies { implementation("org.apache.logging.log4j:log4j-api:${project.extra["log4jVersion"]}") implementation("org.apache.logging.log4j:log4j-1.2-api:${project.extra["log4jVersion"]}") implementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}") - implementation(project(":flink-connector-clickhouse-base")) - testImplementation(project(":flink-connector-clickhouse-base")) + implementation(project(":flink-connector-clickhouse-base")) // ClickHouse Client Libraries - implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}:all") + implementation("com.clickhouse:client-v2:${clickhouseVersion}:all") // Apache Flink Libraries implementation("org.apache.flink:flink-connector-base:${project.extra["flinkVersion"]}") implementation("org.apache.flink:flink-streaming-java:${project.extra["flinkVersion"]}") @@ -73,7 +77,6 @@ dependencies { testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}") testImplementation("org.scalatest:scalatest_2.13:3.2.19") testRuntimeOnly("org.scalatestplus:junit-4-13_2.13:3.2.18.0") -// testRuntimeOnly("org.pegdown:pegdown:1.6.0") // sometimes required by ScalaTest } sourceSets { @@ -95,11 +98,12 @@ sourceSets { } } -tasks.shadowJar { +tasks.named("shadowJar") { archiveClassifier.set("all") - dependencies { - exclude(dependency("org.apache.flink:.*")) + include(dependency("org.apache.flink.connector.clickhouse:.*")) + include(project(":flink-connector-clickhouse-base")) + include(dependency("com.clickhouse:client-v2:${clickhouseVersion}:all")) } mergeServiceFiles() } @@ -110,10 +114,6 @@ val shadowSourcesJar by tasks.registering(Jar::class) { duplicatesStrategy = DuplicatesStrategy.EXCLUDE } -tasks.jar { - enabled = false -} - publishing { publications { create("maven") { diff --git a/flink-connector-clickhouse-2.0.0/build.gradle.kts b/flink-connector-clickhouse-2.0.0/build.gradle.kts index 6f05e34..35085d1 100644 --- a/flink-connector-clickhouse-2.0.0/build.gradle.kts +++ b/flink-connector-clickhouse-2.0.0/build.gradle.kts @@ -1,6 +1,10 @@ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + /* - * This file is the build file of flink-connector-clickhouse-base submodule - * + * Build configuration for Flink 2.0.0 ClickHouse Connector + * + * This module provides Apache Flink 2.0.0 compatibility for the ClickHouse connector. + * It depends on the flink-connector-clickhouse-base module for shared functionality. */ plugins { @@ -9,18 +13,18 @@ plugins { java signing id("com.gradleup.nmcp") version "0.0.8" - id("com.github.johnrengelman.shadow") version "8.1.1" + id("com.gradleup.shadow") version "9.0.2" } val scalaVersion = "2.13.12" val sinkVersion: String by rootProject.extra +val clickhouseVersion: String by rootProject.extra // Temporary until we have a Java Client release repositories { mavenCentral() } extra.apply { - set("clickHouseDriverVersion", "0.9.1") set("flinkVersion", "2.0.0") set("log4jVersion","2.17.2") set("testContainersVersion", "1.21.0") @@ -28,6 +32,10 @@ extra.apply { } dependencies { + // Use JUnit Jupiter for testing. + testImplementation(libs.junit.jupiter) + + testRuntimeOnly("org.junit.platform:junit-platform-launcher") implementation("net.bytebuddy:byte-buddy:${project.extra["byteBuddyVersion"]}") implementation("net.bytebuddy:byte-buddy-agent:${project.extra["byteBuddyVersion"]}") @@ -42,13 +50,12 @@ dependencies { implementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}") // ClickHouse Client Libraries - implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}:all") + implementation("com.clickhouse:client-v2:${clickhouseVersion}:all") // Apache Flink Libraries implementation("org.apache.flink:flink-connector-base:${project.extra["flinkVersion"]}") implementation("org.apache.flink:flink-streaming-java:${project.extra["flinkVersion"]}") implementation(project(":flink-connector-clickhouse-base")) - testImplementation(project(":flink-connector-clickhouse-base")) testImplementation("org.apache.flink:flink-connector-files:${project.extra["flinkVersion"]}") testImplementation("org.apache.flink:flink-connector-base:${project.extra["flinkVersion"]}") testImplementation("org.apache.flink:flink-streaming-java:${project.extra["flinkVersion"]}") @@ -88,11 +95,12 @@ sourceSets { } } -tasks.shadowJar { +tasks.named("shadowJar") { archiveClassifier.set("all") - dependencies { - exclude(dependency("org.apache.flink:.*")) + include(dependency("org.apache.flink.connector.clickhouse:.*")) + include(project(":flink-connector-clickhouse-base")) + include(dependency("com.clickhouse:client-v2:${clickhouseVersion}:all")) } mergeServiceFiles() } @@ -103,10 +111,6 @@ val shadowSourcesJar by tasks.registering(Jar::class) { duplicatesStrategy = DuplicatesStrategy.EXCLUDE } -tasks.jar { - enabled = false -} - publishing { publications { create("maven") { diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts index d7e7b5b..a30c449 100644 --- a/flink-connector-clickhouse-base/build.gradle.kts +++ b/flink-connector-clickhouse-base/build.gradle.kts @@ -7,12 +7,13 @@ plugins { `maven-publish` java signing + `java-test-fixtures` id("com.gradleup.nmcp") version "0.0.8" - id("com.github.johnrengelman.shadow") version "8.1.1" } val scalaVersion = "2.13.12" val sinkVersion: String by rootProject.extra +val clickhouseVersion: String by rootProject.extra // Temporary until we have a Java Client release repositories { // Use Maven Central for resolving dependencies. @@ -20,7 +21,6 @@ repositories { } extra.apply { - set("clickHouseDriverVersion", "0.9.1") set("log4jVersion","2.17.2") set("testContainersVersion", "1.21.0") set("byteBuddyVersion", "1.17.5") @@ -28,8 +28,8 @@ extra.apply { dependencies { // Use JUnit Jupiter for testing. - testImplementation(libs.junit.jupiter) - testRuntimeOnly("org.junit.platform:junit-platform-launcher") + testFixturesImplementation(libs.junit.jupiter) + testFixturesImplementation("org.junit.platform:junit-platform-launcher") // logger implementation("org.apache.logging.log4j:log4j-slf4j-impl:${project.extra["log4jVersion"]}") @@ -38,7 +38,17 @@ dependencies { implementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}") // ClickHouse Client Libraries - implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}:all") + implementation("com.clickhouse:client-v2:${clickhouseVersion}:all") + + // For testing + testFixturesImplementation("com.clickhouse:client-v2:${clickhouseVersion}:all") + testFixturesImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}") + testFixturesImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}") + testFixturesImplementation("com.squareup.okhttp3:okhttp:5.1.0") + testFixturesImplementation("com.google.code.gson:gson:2.10.1") + testFixturesImplementation("org.scalatest:scalatest_2.13:3.2.19") + testFixturesImplementation("org.scalatestplus:junit-4-13_2.13:3.2.18.0") + } sourceSets { @@ -53,134 +63,3 @@ sourceSets { } } } - -// Apply a specific Java toolchain to ease working on different environments. -//java { -// toolchain { -// languageVersion = JavaLanguageVersion.of(11) -// } -//} - -//tasks.test { -// useJUnitPlatform() -// -// include("**/*Test.class", "**/*Tests.class", "**/*Spec.class") -// testLogging { -// events("passed", "failed", "skipped") -// //showStandardStreams = true - , "standardOut", "standardError" -// } -//} -// -//tasks.withType { -// scalaCompileOptions.apply { -// encoding = "UTF-8" -// isDeprecation = true -// additionalParameters = listOf("-feature", "-unchecked") -// } -//} -// -//tasks.named("test") { -// // Use JUnit Platform for unit tests. -// useJUnitPlatform() -//} -// -//tasks.register("runScalaTests") { -// group = "verification" -// mainClass.set("org.scalatest.tools.Runner") -// classpath = sourceSets["test"].runtimeClasspath -// args = listOf( -// "-R", "build/classes/scala/test", -// "-oD", // show durations -// "-s", "org.apache.flink.connector.clickhouse.test.scala.ClickHouseSinkTests" -// ) -//} -// -//tasks.shadowJar { -// archiveClassifier.set("all") -// -// dependencies { -// exclude(dependency("org.apache.flink:.*")) -// } -// mergeServiceFiles() -//} -// -//val shadowSourcesJar by tasks.registering(Jar::class) { -// archiveClassifier.set("all-sources") -// from(sourceSets.main.get().allSource) -// duplicatesStrategy = DuplicatesStrategy.EXCLUDE -//} -// -//tasks.jar { -// enabled = false -//} -// -//publishing { -// publications { -// create("maven") { -// artifact(tasks.shadowJar) -// groupId = "com.clickhouse.flink" -// artifactId = "flink-connector-clickhouse" -// version = sinkVersion -// -// artifact(shadowSourcesJar) -// -// pom { -// name.set("ClickHouse Flink Connector") -// description.set("Official Apache Flink connector for ClickHouse") -// url.set("https://github.com/ClickHouse/flink-connector-clickhouse") -// -// licenses { -// license { -// name.set("The Apache License, Version 2.0") -// url.set("https://github.com/ClickHouse/flink-connector-clickhouse/blob/main/LICENSE") -// } -// } -// -// developers { -// developer { -// id.set("mzitnik") -// name.set("Mark Zitnik") -// email.set("mark@clickhouse.com") -// } -// developer { -// id.set("BentsiLeviav") -// name.set("Bentsi Leviav") -// email.set("bentsi.leviav@clickhouse.com") -// } -// } -// -// scm { -// connection.set("git@github.com:ClickHouse/flink-connector-clickhouse.git") -// url.set("https://github.com/ClickHouse/flink-connector-clickhouse") -// } -// -// organization { -// name.set("ClickHouse") -// url.set("https://clickhouse.com") -// } -// -// issueManagement { -// system.set("GitHub Issues") -// url.set("https://github.com/ClickHouse/flink-connector-clickhouse/issues") -// } -// } -// } -// } -//} -// -//signing { -// val signingKey = System.getenv("SIGNING_KEY") -// val signingPassword = System.getenv("SIGNING_PASSWORD") -// if (signingKey != null && signingPassword != null) { -// useInMemoryPgpKeys(signingKey, signingPassword) -// sign(publishing.publications["maven"]) -// } -//} -// -//nmcp { -// publish("maven") { -// username = System.getenv("NMCP_USERNAME") -// password = System.getenv("NMCP_PASSWORD") -// publicationType = "AUTOMATIC" -// } -//} \ No newline at end of file diff --git a/flink-connector-clickhouse-base/src/testFixtures/java/com/clickhouse/flink/Cluster.java b/flink-connector-clickhouse-base/src/testFixtures/java/com/clickhouse/flink/Cluster.java new file mode 100644 index 0000000..1dfd0b2 --- /dev/null +++ b/flink-connector-clickhouse-base/src/testFixtures/java/com/clickhouse/flink/Cluster.java @@ -0,0 +1,260 @@ +package com.clickhouse.flink; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import okhttp3.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class Cluster { + + private static final Logger LOG = LoggerFactory.getLogger(Cluster.class); + private static final int INTERNAL_REST_PORT = 8081; + private static final int INTERNAL_JOB_MANAGER_RCP_PORT = 6123; + + private GenericContainer containerJobManager; + private List> containerTaskManagerList = new ArrayList<>(); + + public static class Builder { + private String flinkVersion; + private int taskManagers; + private String sourcePath; + private String dataFilename; + private String targetPath; + private Network network; + + public Builder() { + taskManagers = 1; + flinkVersion = "latest"; + sourcePath = null; + dataFilename = null; + targetPath = null; + network = null; + } + public Builder withTaskManagers(int taskManagers) { + this.taskManagers = taskManagers; + return this; + } + + public Builder withFlinkVersion(String flinkVersion) { + this.flinkVersion = flinkVersion; + return this; + } + + public Builder withDataFile(String sourcePath, String dataFilename, String targetPath) { + this.sourcePath = sourcePath; + this.dataFilename = dataFilename; + this.targetPath = targetPath; + return this; + } + + public Builder withNetwork(Network network) { + this.network = network; + return this; + } + + public Cluster build() { + // when we are not specifying a network we should create one + if (network == null) { + network = Network.newNetwork(); + } + Cluster cluster = new Cluster(flinkVersion, taskManagers, sourcePath, dataFilename, targetPath, network); + return cluster; + } + + } + + public Cluster(String flinkVersion, int taskManagers, String sourcePath, String dataFilename, String targetPath, Network network) { + MountableFile mountableFile = MountableFile.forHostPath(sourcePath + dataFilename); + String dataFileInContainer = String.format("%s/%s", targetPath, dataFilename); + String flinkImageTag = String.format("flink:%s", flinkVersion); + LOG.info("Using flink image tag: {}", flinkImageTag); + LOG.info("Data file location in container: {}", dataFileInContainer); + DockerImageName FLINK_IMAGE = DockerImageName.parse(flinkImageTag); + containerJobManager = new GenericContainer<>(FLINK_IMAGE) + .withCommand("jobmanager") + .withNetwork(network) + .withExposedPorts(INTERNAL_REST_PORT, INTERNAL_JOB_MANAGER_RCP_PORT) + .withNetworkAliases("jobmanager") + .withEnv("FLINK_PROPERTIES","jobmanager.rpc.address: jobmanager"); + + if (sourcePath != null) { + containerJobManager.withCopyFileToContainer(mountableFile, dataFileInContainer); + } + for (int i = 0; i < taskManagers; i++) { + GenericContainer containerTaskManager = new GenericContainer<>(FLINK_IMAGE) + .withCommand("taskmanager") + .withNetwork(network) + .dependsOn(containerJobManager) + .withEnv("FLINK_PROPERTIES","jobmanager.rpc.address: jobmanager"); + if (sourcePath != null) { + containerTaskManager.withCopyFileToContainer(mountableFile, dataFileInContainer); + } + containerTaskManagerList.add(containerTaskManager); + } + + LOG.info("Starting JobManager"); + containerJobManager.start(); + LOG.info("Using task managers: {} and starting taskManagers", containerTaskManagerList.size()); + for (int i = 0; i < taskManagers; i++) { + containerTaskManagerList.get(i).start(); + } + // TODO: add strategy for wait + } + + public int getDashboardPort() { + return containerJobManager.getMappedPort(INTERNAL_REST_PORT); + } + + public String getDashboardUrl() { + return String.format("%s:%s",containerJobManager.getContainerIpAddress(), getDashboardPort()); + } + + public GenericContainer getContainerJobManager() { + return containerJobManager; + } + + public List> getContainerTaskManagerList() { + return containerTaskManagerList; + } + + public String uploadJar(String jarFilePath) throws IOException { + File jarFile = new File(jarFilePath); + String clusterURLJarUploadAPI = String.format("http://%s/jars/upload", getDashboardUrl()); + OkHttpClient client = new OkHttpClient(); + RequestBody fileBody = RequestBody.create(jarFile, MediaType.parse("application/java-archive")); + MultipartBody requestBody = new MultipartBody.Builder() + .setType(MultipartBody.FORM) + .addFormDataPart("jarfile", jarFile.getName(), fileBody) + .build(); + Request request = new Request.Builder() + .url(clusterURLJarUploadAPI) + .post(requestBody).build(); + Response response = client.newCall(request).execute(); + if (!response.isSuccessful()) { + LOG.error("uploadJar failed code: {}", response.code()); + return null; + } else { + Gson gson = new Gson(); + String responseBody = response.body().string(); + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); + if (jsonObject.has("status") && + jsonObject.get("status").getAsString().equalsIgnoreCase("success") && + jsonObject.has("filename") ) { + String filename = jsonObject.get("filename").getAsString(); + LOG.info("Uploading jar file: {}", filename); + return filename; + } + return null; + } + + + } + public List listAllJars() throws IOException { + String clusterURLListJars = String.format("http://%s/jars", getDashboardUrl()); + + OkHttpClient client = new OkHttpClient(); + Request request = new Request.Builder() + .url(clusterURLListJars) + .build(); + Response response = client.newCall(request).execute(); + if (!response.isSuccessful()) { + LOG.error("listAllJars failed code: {}", response.code()); + return null; + } else { + Gson gson = new Gson(); + String responseBody = response.body().string(); + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); + List jars = new ArrayList<>(); + if (jsonObject.has("files")) { + JsonArray jsonArray = jsonObject.getAsJsonArray("files"); + for (JsonElement element : jsonArray) { + if (element.getAsJsonObject().has("id")) { + jars.add(element.getAsJsonObject().get("id").getAsString()); + LOG.info("listAllJars successful: {}", element.getAsJsonObject().get("id").getAsString()); + } + } + + } + return jars; + } + + + } + + public String runJob(String jarId, String entryClass, int parallelism, String... args) throws IOException { + String programArg = String.join(",", args); + String clusterURLJarRunAPI = String.format("http://%s/jars/%s/run?programArg=%s", getDashboardUrl(), jarId, programArg); + RequestBody body = RequestBody.create( + "", + MediaType.get("application/json; charset=utf-8") + ); + + OkHttpClient client = new OkHttpClient(); + Request request = new Request.Builder() + .url(clusterURLJarRunAPI) + .post(body) + .build(); + + Response response = client.newCall(request).execute(); + if (!response.isSuccessful()) { + LOG.error("runJob failed code: {}", response.code()); + return null; + } else { + Gson gson = new Gson(); + String responseBody = response.body().string(); + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); + if (jsonObject.has("jobid") ) { + String jobid = jsonObject.get("jobid").getAsString(); + LOG.info("runJob successful jobid: {}", jobid); + return jobid; + } + return null; + } + } + + public String jobStatus(String jobId) throws IOException { + String clusterURLJobStatusAPI = String.format("http://%s/jobs/%s", getDashboardUrl(), jobId); + OkHttpClient client = new OkHttpClient(); + Request request = new Request.Builder() + .url(clusterURLJobStatusAPI) + .build(); + Response response = client.newCall(request).execute(); + if (!response.isSuccessful()) { + LOG.error("jobStatus response code: {}", response.code()); + return null; + } else { + Gson gson = new Gson(); + String responseBody = response.body().string(); + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); + if (jsonObject.has("state")) { + String state = jsonObject.get("state").getAsString(); + LOG.info("jobStatus state: {}", state); + return state; + } + return null; + } + } + + public void tearDown() { + if (containerTaskManagerList != null) { + containerTaskManagerList.forEach(GenericContainer::stop); + } + if (containerJobManager != null) { + containerJobManager.stop(); + } + } + +} diff --git a/flink-connector-clickhouse-base/src/testFixtures/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/testFixtures/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java new file mode 100644 index 0000000..49b0f0c --- /dev/null +++ b/flink-connector-clickhouse-base/src/testFixtures/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -0,0 +1,170 @@ +package org.apache.flink.connector.test.embedded.clickhouse; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.metadata.TableSchema; +import com.clickhouse.client.api.query.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.Network; + +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class ClickHouseServerForTests { + + private static final Logger LOG = LoggerFactory.getLogger(ClickHouseServerForTests.class); + + protected static boolean isCloud = ClickHouseTestHelpers.isCloud(); + protected static String database = null; + protected static ClickHouseContainer db = null; + + protected static String host = null; + protected static int port = 0; + protected static String username = null; + protected static String password = null; + protected static boolean isSSL = false; + + + public static void initConfiguration() { + if (isCloud) { + LOG.info("Init ClickHouse Cloud Configuration"); + host = System.getenv("CLICKHOUSE_CLOUD_HOST"); + port = Integer.parseInt(ClickHouseTestHelpers.HTTPS_PORT); + database = String.format("flink_connector_test_%s", System.currentTimeMillis()); + username = ClickHouseTestHelpers.USERNAME_DEFAULT; + password = System.getenv("CLICKHOUSE_CLOUD_PASSWORD"); + } else { + LOG.info("Init ClickHouse Docker Configuration"); + host = db.getHost(); + port = db.getFirstMappedPort(); + database = ClickHouseTestHelpers.DATABASE_DEFAULT; + username = db.getUsername(); + password = db.getPassword(); + } + isSSL = ClickHouseTestHelpers.isCloud(); + } + + public static void setUp() throws InterruptedException, ExecutionException { + setUp(true); + } + + public static void setUp(boolean bridge) throws InterruptedException, ExecutionException { + if (!isCloud) { + db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE) + .withPassword("test_password") + .withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1"); + if (!bridge) { + db.withNetworkAliases("clickhouse") + .withNetwork(Network.newNetwork()); + + } + db.start(); + } + initConfiguration(); + // wakeup cloud + // have a for loop + boolean isLive = false; + int counter = 0; + while (counter < 5) { + isLive = ClickHouseTestHelpers.ping(host, port, isSSL, username, password); + if (isLive) { + String createDatabase = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database); + executeSql(createDatabase); + return; + } + Thread.sleep(2000); + counter++; + } + throw new RuntimeException("Failed to connect to ClickHouse"); + } + + public static void tearDown() { + if (db != null) { + db.stop(); + } + } + + public static Network getNetwork() { + if (isCloud()) + return Network.newNetwork(); + return db.getNetwork(); + } + + public static String getDatabase() { return database; } + + public static String getHost() { return host; } + public static int getPort() { return port; } + public static String getUsername() { return username; } + public static String getPassword() { return password; } + + public static String getURL() { + return ClickHouseServerForTests.getURL(host, port); + } + + public static String getAliasURL(String alias, int port) { + return ClickHouseServerForTests.getURL(alias, port); + } + + public static String getURLForCluster() { + if (isCloud) + return getURL(); + else + return getAliasURL("clickhouse", 8123); + } + + public static String getURL(String host, int port) { + if (isCloud) { + return "https://" + host + ":" + port + "/"; + } else { + return "http://" + host + ":" + port + "/"; + } + } + + public static boolean isCloud() { return isCloud; } + + public static void executeSql(String sql) throws ExecutionException, InterruptedException { + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + try { + client.execute(sql).get().close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static int countParts(String table) { + String countPartsSql = String.format("SELECT count(*) FROM system.parts WHERE table = '%s' and active = 1", table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countPartsSql); + return countResult.get(0).getInteger(1); + } + + public static int countMerges(String table) { + String countPartsSql = String.format("SELECT count(*) FROM system.merges WHERE table = '%s'", table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countPartsSql); + return countResult.get(0).getInteger(1); + } + + public static int countRows(String table) throws ExecutionException, InterruptedException { + String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countSql); + return countResult.get(0).getInteger(1); + } + // http_user_agent + public static String extractProductName(String databaseName, String tableName) { + String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List userAgentResult = client.queryAll(extractProductName); + if (!userAgentResult.isEmpty()) { + return userAgentResult.get(0).getString(1); + } + throw new RuntimeException("Query is returning empty result."); + } + + public static TableSchema getTableSchema(String table) throws ExecutionException, InterruptedException { + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + return client.getTableSchema(table, database); + } +} diff --git a/flink-connector-clickhouse-base/src/testFixtures/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java b/flink-connector-clickhouse-base/src/testFixtures/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java new file mode 100644 index 0000000..b079f77 --- /dev/null +++ b/flink-connector-clickhouse-base/src/testFixtures/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseTestHelpers.java @@ -0,0 +1,51 @@ +package org.apache.flink.connector.test.embedded.clickhouse; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.enums.Protocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class ClickHouseTestHelpers { + private static final Logger LOG = LoggerFactory.getLogger(ClickHouseTestHelpers.class); + + public static final String CLICKHOUSE_VERSION_DEFAULT = "24.3"; + public static final String CLICKHOUSE_PROXY_VERSION_DEFAULT = "23.8"; + public static final String CLICKHOUSE_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", getClickhouseVersion()); + public static final String CLICKHOUSE_FOR_PROXY_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", CLICKHOUSE_PROXY_VERSION_DEFAULT); + + public static final String HTTPS_PORT = "8443"; + public static final String DATABASE_DEFAULT = "default"; + public static final String USERNAME_DEFAULT = "default"; + + private static final int TIMEOUT_VALUE = 60; + private static final TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS; + + public static String getClickhouseVersion() { + String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); + if (clickHouseVersion == null) { + clickHouseVersion = CLICKHOUSE_VERSION_DEFAULT; + } + return clickHouseVersion; + } + + public static boolean isCloud() { + String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); + return clickHouseVersion != null && clickHouseVersion.equalsIgnoreCase("cloud"); + } + + public static Client getClient(String host, int port, boolean ssl, String username, String password) { + return new Client.Builder().addEndpoint(Protocol.HTTP, host, port, ssl) + .setUsername(username) + .setPassword(password) + .setConnectTimeout(TIMEOUT_VALUE, TIMEOUT_UNIT.toChronoUnit()) + .build(); + } + + public static boolean ping(String host, int port, boolean ssl, String username, String password) { + Client client = getClient(host, port, ssl, username, password); + return client.ping(); + } + +} diff --git a/flink-connector-clickhouse-base/src/testFixtures/resources/data/100k_epidemiology.csv.gz b/flink-connector-clickhouse-base/src/testFixtures/resources/data/100k_epidemiology.csv.gz new file mode 100644 index 0000000..c24da7b Binary files /dev/null and b/flink-connector-clickhouse-base/src/testFixtures/resources/data/100k_epidemiology.csv.gz differ diff --git a/flink-connector-clickhouse-base/src/test/resources/log4j.properties b/flink-connector-clickhouse-base/src/testFixtures/resources/log4j.properties similarity index 100% rename from flink-connector-clickhouse-base/src/test/resources/log4j.properties rename to flink-connector-clickhouse-base/src/testFixtures/resources/log4j.properties diff --git a/flink-connector-clickhouse-integration/build.gradle.kts b/flink-connector-clickhouse-integration/build.gradle.kts new file mode 100644 index 0000000..06bcbba --- /dev/null +++ b/flink-connector-clickhouse-integration/build.gradle.kts @@ -0,0 +1,60 @@ +/* +`* Build configuration for flink-connector-clickhouse-integrations module + * Contains integration tests for the Flink-ClickHouse connector + */ + +plugins { + `maven-publish` + java + signing + id("com.gradleup.nmcp") version "0.0.8" +} + +val scalaVersion = "2.13.12" +val sinkVersion: String by rootProject.extra +val clickhouseVersion: String by rootProject.extra // Temporary until we have a Java Client release + +repositories { + // Use Maven Central for resolving dependencies. + mavenCentral() +} + +extra.apply { + set("log4jVersion","2.17.2") + set("testContainersVersion", "1.21.0") + set("byteBuddyVersion", "1.17.5") +} + +dependencies { + // Use JUnit Jupiter for testing. + testImplementation(libs.junit.jupiter) + testRuntimeOnly("org.junit.platform:junit-platform-launcher") + + // logger + implementation("org.apache.logging.log4j:log4j-slf4j-impl:${project.extra["log4jVersion"]}") + implementation("org.apache.logging.log4j:log4j-api:${project.extra["log4jVersion"]}") + implementation("org.apache.logging.log4j:log4j-1.2-api:${project.extra["log4jVersion"]}") + implementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}") + + // ClickHouse Client Libraries + implementation("com.clickhouse:client-v2:${clickhouseVersion}:all") + + // For testing + testImplementation(testFixtures(project(":flink-connector-clickhouse-base"))) + testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}") + testImplementation("com.squareup.okhttp3:okhttp:5.1.0") + testImplementation("com.google.code.gson:gson:2.10.1") +} + +sourceSets { + main { + java { + srcDirs("src/main/java") + } + } + test { + java { + srcDirs("src/test/java") + } + } +} diff --git a/flink-connector-clickhouse-integration/src/test/java/com/clickhouse/flink/integration/FlinkTests.java b/flink-connector-clickhouse-integration/src/test/java/com/clickhouse/flink/integration/FlinkTests.java new file mode 100644 index 0000000..62e464e --- /dev/null +++ b/flink-connector-clickhouse-integration/src/test/java/com/clickhouse/flink/integration/FlinkTests.java @@ -0,0 +1,133 @@ +package com.clickhouse.flink.integration; + +import com.clickhouse.flink.Cluster; +import org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests; +import org.junit.Assert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +public class FlinkTests { + static int NUMBERS_OF_RECORDS = 100000; + static String flinkVersion = "latest"; + @BeforeAll + public static void setUp() throws Exception { + flinkVersion = (System.getenv("FLINK_VERSION") != null ? System.getenv("FLINK_VERSION") : "latest"); + System.out.println("FLINK_VERSION: " + flinkVersion); + ClickHouseServerForTests.setUp(false); + } + + @AfterAll + public static void tearDown() throws Exception { + ClickHouseServerForTests.tearDown(); + } + + private String getRoot() throws Exception { + String PWD = System.getenv("PWD"); + if (PWD != null) + return PWD; + String GITHUB_WORKSPACE = System.getenv("GITHUB_WORKSPACE"); + if (GITHUB_WORKSPACE != null) + return GITHUB_WORKSPACE; + else + throw new RuntimeException("Can not get root path"); + } + + private String exampleSubFolder(String flinkVersion) { + if (flinkVersion.equalsIgnoreCase("latest") || flinkVersion.startsWith("2.0")) + return "flink-v2"; + return "flink-v1.7"; + } + + private String getResourcePath(String resourceName) throws URISyntaxException { + URI resourceUri = getClass().getResource("/data/" + resourceName).toURI(); + Path resourcePath = Paths.get(resourceUri); + String dataFileLocation = resourcePath.getParent().toString() + "/"; + return dataFileLocation; + } + + @Test + void testFlinkCluster() throws Exception { + String root = getRoot(); + String exampleSubFolder = exampleSubFolder(flinkVersion); + String jarLocation = String.format("%s/examples/maven/%s/covid/target/covid-1.0-SNAPSHOT.jar", root, exampleSubFolder); + String dataFile = "100k_epidemiology.csv.gz"; + String tableName = "covid"; + + String dataFileLocation = getResourcePath(dataFile); + // Since we have in docker communication + String clickHouseURL = ClickHouseServerForTests.getURLForCluster(); + String username = ClickHouseServerForTests.getUsername(); + String password = ClickHouseServerForTests.getPassword(); + String database = ClickHouseServerForTests.getDatabase(); + + // create table + String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", database, tableName); + ClickHouseServerForTests.executeSql(dropTable); + // create table + String tableSql = "CREATE TABLE `" + database + "`.`" + tableName + "` (" + + "date Date," + + "location_key LowCardinality(String)," + + "new_confirmed Int32," + + "new_deceased Int32," + + "new_recovered Int32," + + "new_tested Int32," + + "cumulative_confirmed Int32," + + "cumulative_deceased Int32," + + "cumulative_recovered Int32," + + "cumulative_tested Int32" + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (location_key, date); "; + + ClickHouseServerForTests.executeSql(tableSql); + + // start on demand flink cluster + Network network = ClickHouseServerForTests.getNetwork(); + + Cluster.Builder builder = new Cluster.Builder() + .withTaskManagers(3) + .withNetwork(network) + .withDataFile(dataFileLocation, dataFile, "/tmp") + .withFlinkVersion(flinkVersion); + + Cluster cluster = builder.build(); + String jarId = cluster.uploadJar(jarLocation); + List jars = cluster.listAllJars(); + String jobId = cluster.runJob(jars.get(0), + "com.clickhouse.example.covid.DataStreamJob", + 1, + "-input", + "/tmp/" + dataFile, + "-url", + clickHouseURL, + "-username", + username, + "-password", + password, + "-database", + database, + "-table", + tableName + ); + String state = cluster.jobStatus(jobId); + while (state.equalsIgnoreCase("RUNNING")) { + state = cluster.jobStatus(jobId); + int count = ClickHouseServerForTests.countRows(tableName); + if (count == NUMBERS_OF_RECORDS) + break; + Thread.sleep(2000); + } + int count = ClickHouseServerForTests.countRows(tableName); + Assert.assertEquals(NUMBERS_OF_RECORDS, count); + // destroy cluster + cluster.tearDown(); + } +} diff --git a/flink-connector-clickhouse-integration/src/test/resources/data/100k_epidemiology.csv.gz b/flink-connector-clickhouse-integration/src/test/resources/data/100k_epidemiology.csv.gz new file mode 100644 index 0000000..c24da7b Binary files /dev/null and b/flink-connector-clickhouse-integration/src/test/resources/data/100k_epidemiology.csv.gz differ diff --git a/flink-connector-clickhouse-integration/src/test/resources/log4j.properties b/flink-connector-clickhouse-integration/src/test/resources/log4j.properties new file mode 100644 index 0000000..0b4ff80 --- /dev/null +++ b/flink-connector-clickhouse-integration/src/test/resources/log4j.properties @@ -0,0 +1,8 @@ +# Define the root logger with appender X +log4j.rootLogger=INFO, console +#log4j.logger.org.testcontainers=WARN +log4j.logger.com.clickhouse.kafka=DEBUG + +log4j.appender.console= org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.conversionPattern=[%d] %p %C %m%n \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 9355b41..2a84e18 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.0.0-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/settings.gradle.kts b/settings.gradle.kts index b4e576d..8dc44fd 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -9,5 +9,5 @@ plugins { } rootProject.name = "flink-connector-clickhouse" -include("flink-connector-clickhouse-base", "flink-connector-clickhouse-2.0.0", "flink-connector-clickhouse-1.17") +include("flink-connector-clickhouse-base", "flink-connector-clickhouse-2.0.0", "flink-connector-clickhouse-1.17", "flink-connector-clickhouse-integration")