diff --git a/.gitignore b/.gitignore
index 710850f..d2b6968 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,6 +20,13 @@ build/
gradle-app.setting
!gradle-wrapper.jar
+##############################
+## Sbt
+##############################
+project/project/
+project/target/
+target/
+
##############################
## IntelliJ
##############################
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 0a57a25..cb05879 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -34,3 +34,9 @@ Does not require a running ClickHouse server.
./gradlew test
```
+if you want to run scala unit tests
+
+ ```bash
+ cd flink-connector-clickhouse
+ ./gradlew clean runScalaTests
+ ```
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 0000000..dd50526
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,94 @@
+
+

+
ClickHouse Flink Connector
+
+
+Table of Contents
+* [Covid Flink Example](#covid-flink-application-example)
+* [Build Application](#build-covid-application)
+ * [Build Connector](#build-clickhouse-flink-connector)
+ * [Java Application](#java-covid-app)
+ * [Scala Application](#scala-covid-app)
+* [Running Example](#running-the-example)
+ * [Download Data](#download-covid-data)
+ * [Create table](#create-a-destination-covid-table)
+ * [Submit Flink](#submit-flink-job)
+
+# Covid Flink Application example
+
+Read covid data from a file and insert into ClickHouse
+
+### Build Covid Application
+
+#### Build ClickHouse Flink Connector
+If you wish to build the connector locally run before building the example
+```bash
+./gradlew publishToMavenLocal
+```
+
+#### Java Covid App
+
+From the project directory, run the following command, which will create a `covid-1.0-SNAPSHOT.jar` artifact that can be found in your target folder
+
+```bash
+mvn clean package -DskipTests
+```
+
+#### Scala Covid App
+
+From project directory run this will create a `covid.jar` can be found in `target/scala-2.12` folder
+
+Build Covid Scala App
+
+```bash
+sbt clean assembly
+```
+
+## Running the example
+
+- Prepare ClickHouse OSS or [ClickHouse Cloud](https://clickhouse.com/)
+- Flink Cluster or Standalone running
+- Download covid data
+
+### Download covid data
+
+Download covid data set and save it in a location that is accessible to Flink
+
+```bash
+curl -L -# -o epidemiology.csv https://storage.googleapis.com/covid19-open-data/v3/epidemiology.csv
+```
+
+### Create a destination covid table
+
+```sql
+
+CREATE TABLE IF NOT EXISTS `default`.`covid` (
+ date Date,
+ location_key LowCardinality(String),
+ new_confirmed Int32,
+ new_deceased Int32,
+ new_recovered Int32,
+ new_tested Int32,
+ cumulative_confirmed Int32,
+ cumulative_deceased Int32,
+ cumulative_recovered Int32,
+ cumulative_tested Int32
+) ENGINE = MergeTree
+ORDER BY (location_key, date);
+```
+
+### Submit Flink Job
+
+With the Java `covid-1.0-SNAPSHOT.jar` or Scala `covid.jar` built, you can now submit the job to your Flink cluster (or standalone instance)
+
+```bash
+# Run the application
+./bin/flink run \
+ /path/to/your/generated/jar \
+ -input "/path/to/epidemiology.csv" \
+ -url "/url/clickhouse" \
+ -username "default" \
+ -password "" \
+ -database "default" \
+ -table "covid"
+```
diff --git a/examples/maven/covid/pom.xml b/examples/maven/covid/pom.xml
new file mode 100644
index 0000000..d55421a
--- /dev/null
+++ b/examples/maven/covid/pom.xml
@@ -0,0 +1,225 @@
+
+
+ 4.0.0
+
+ com.clickhouse.example.covid
+ covid
+ 1.0-SNAPSHOT
+ jar
+
+ Flink Quickstart Job
+
+
+ UTF-8
+ 2.0.0
+ 11
+ 2.12
+ ${target.java.version}
+ ${target.java.version}
+ 2.24.1
+
+
+
+
+
+ apache.snapshots
+ Apache Development Snapshot Repository
+ https://repository.apache.org/content/repositories/snapshots/
+
+ false
+
+
+ true
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-connector-files
+ 2.0.0
+ provided
+
+
+
+ com.clickhouse.flink
+ flink-connector-clickhouse-2.0.0
+ 0.0.1
+ all
+
+
+
+
+
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+ runtime
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+ runtime
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+ runtime
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.1
+
+
+
+ package
+
+ shade
+
+
+ false
+
+
+ org.apache.flink:flink-shaded-force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ org.apache.logging.log4j:*
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ com.clickhouse.example.covid.DataStreamJob
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ [3.1.1,)
+
+ shade
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ [3.1,)
+
+ testCompile
+ compile
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/examples/maven/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java b/examples/maven/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java
new file mode 100644
index 0000000..7c069ce
--- /dev/null
+++ b/examples/maven/covid/src/main/java/com/clickhouse/example/covid/DataStreamJob.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.clickhouse.example.covid;
+
+import com.clickhouse.data.ClickHouseFormat;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.clickhouse.convertor.ClickHouseConvertor;
+import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
+import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncSink;
+import org.apache.flink.connector.clickhouse.sink.ClickHouseClientConfig;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.ParameterTool;
+
+/**
+ * Skeleton for a Flink DataStream Job.
+ *
+ * For a tutorial how to write a Flink application, check the
+ * tutorials and examples on the Flink Website.
+ *
+ *
To package your application into a JAR file for execution, run
+ * 'mvn clean package' on the command line.
+ *
+ *
If you change the name of the main class (with the public static void main(String[] args))
+ * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
+ */
+public class DataStreamJob {
+
+ static final int MAX_BATCH_SIZE = 5000;
+ static final int MAX_IN_FLIGHT_REQUESTS = 2;
+ static final int MAX_BUFFERED_REQUESTS = 20000;
+ static final long MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024;
+ static final long MAX_TIME_IN_BUFFER_MS = 5 * 1000;
+ static final long MAX_RECORD_SIZE_IN_BYTES = 1000;
+
+ /*
+ Create covid table before running the example
+
+ CREATE TABLE `default`.`covid` (
+ date Date,
+ location_key LowCardinality(String),
+ new_confirmed Int32,
+ new_deceased Int32,
+ new_recovered Int32,
+ new_tested Int32,
+ cumulative_confirmed Int32,
+ cumulative_deceased Int32,
+ cumulative_recovered Int32,
+ cumulative_tested Int32
+ )
+ ENGINE = MergeTree
+ ORDER BY (location_key, date);
+ */
+
+ public static void main(String[] args) throws Exception {
+ // Sets up the execution environment, which is the main entry point
+ ParameterTool parameters = ParameterTool.fromArgs(args);
+ final String fileFullName = parameters.get("input");
+ final String url = parameters.get("url");
+ final String username = parameters.get("username");
+ final String password = parameters.get("password");
+ final String database = parameters.get("database");
+ final String tableName = parameters.get("table");
+
+ ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
+ ElementConverter convertorString = new ClickHouseConvertor<>(String.class);
+
+ ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>(
+ convertorString,
+ MAX_BATCH_SIZE,
+ MAX_IN_FLIGHT_REQUESTS,
+ MAX_BUFFERED_REQUESTS,
+ MAX_BATCH_SIZE_IN_BYTES,
+ MAX_TIME_IN_BUFFER_MS,
+ MAX_RECORD_SIZE_IN_BYTES,
+ clickHouseClientConfig
+ );
+ csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
+
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ Path filePath = new Path(fileFullName);
+ FileSource source = FileSource
+ .forRecordStreamFormat(new TextLineInputFormat(), filePath)
+ .build();
+
+ DataStreamSource lines = env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "GzipCsvSource"
+ );
+ lines.sinkTo(csvSink);
+ env.execute("Flink Java API Read CSV (covid)");
+ }
+}
diff --git a/examples/maven/covid/src/main/resources/log4j2.properties b/examples/maven/covid/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..32c696e
--- /dev/null
+++ b/examples/maven/covid/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/examples/sbt/covid/build.sbt b/examples/sbt/covid/build.sbt
new file mode 100644
index 0000000..69a19fc
--- /dev/null
+++ b/examples/sbt/covid/build.sbt
@@ -0,0 +1,28 @@
+
+
+ThisBuild / scalaVersion := "2.12.17"
+
+name := "covid"
+organization := "clickhouse"
+version := "1.0"
+
+val flinkVersion = "2.0.0"
+
+resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"
+
+libraryDependencies ++= Seq(
+ "org.apache.flink" % "flink-streaming-java" % flinkVersion % "provided",
+ "org.apache.flink" % "flink-clients" % flinkVersion % "provided",
+ "org.apache.flink" % "flink-connector-files" % "2.0.0" % "provided",
+ "org.apache.flink.connector" % "clickhouse" % "0.0.1" classifier "all"
+)
+
+assembly / assemblyJarName := "covid.jar"
+
+assembly / assemblyExcludedJars := {
+ val cp = (assembly / fullClasspath).value
+ cp filter { jar =>
+ jar.data.getName.contains("flink-") ||
+ jar.data.getName.contains("scala-library")
+ }
+}
diff --git a/examples/sbt/covid/project/build.properties b/examples/sbt/covid/project/build.properties
new file mode 100644
index 0000000..bbb0b60
--- /dev/null
+++ b/examples/sbt/covid/project/build.properties
@@ -0,0 +1 @@
+sbt.version=1.11.2
diff --git a/examples/sbt/covid/project/plugins.sbt b/examples/sbt/covid/project/plugins.sbt
new file mode 100644
index 0000000..f751db5
--- /dev/null
+++ b/examples/sbt/covid/project/plugins.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")
\ No newline at end of file
diff --git a/examples/sbt/covid/src/main/resources/log4j2.properties b/examples/sbt/covid/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..32c696e
--- /dev/null
+++ b/examples/sbt/covid/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/examples/sbt/covid/src/main/scala/Main.scala b/examples/sbt/covid/src/main/scala/Main.scala
new file mode 100644
index 0000000..3001f7a
--- /dev/null
+++ b/examples/sbt/covid/src/main/scala/Main.scala
@@ -0,0 +1,82 @@
+import com.clickhouse.data.ClickHouseFormat
+import org.apache.flink.connector.clickhouse.sink.{ClickHouseAsyncSink, ClickHouseClientConfig}
+import org.apache.flink.util.ParameterTool
+import org.apache.flink.core.fs.Path
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.connector.base.sink.writer.ElementConverter
+import org.apache.flink.connector.clickhouse.convertor.ClickHouseConvertor
+import org.apache.flink.connector.clickhouse.data.ClickHousePayload
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat
+import org.apache.flink.streaming.api.datastream.DataStreamSource
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+object Main extends App {
+
+ private val MAX_BATCH_SIZE = 5000
+ private val MAX_IN_FLIGHT_REQUESTS = 2
+ private val MAX_BUFFERED_REQUESTS = 20000
+ private val MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024L
+ private val MAX_TIME_IN_BUFFER_MS = 5 * 1000L
+ private val MAX_RECORD_SIZE_IN_BYTES = 1000L
+
+ /*
+ Create covid table before running the example
+ CREATE TABLE `default`.`covid` (
+ date Date,
+ location_key LowCardinality(String),
+ new_confirmed Int32,
+ new_deceased Int32,
+ new_recovered Int32,
+ new_tested Int32,
+ cumulative_confirmed Int32,
+ cumulative_deceased Int32,
+ cumulative_recovered Int32,
+ cumulative_tested Int32
+ )
+ ENGINE = MergeTree
+ ORDER BY (location_key, date);
+ */
+
+ val parameters: ParameterTool = ParameterTool.fromArgs(args)
+ val fileFullName = parameters.get("input")
+ val url = parameters.get("url")
+ val username = parameters.get("username")
+ val password = parameters.get("password")
+ val database = parameters.get("database")
+ val tableName = parameters.get("table")
+
+ val clickHouseClientConfig : ClickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName)
+
+ val convertorString: ElementConverter[String, ClickHousePayload] =
+ new ClickHouseConvertor[String](classOf[String])
+
+ val csvSink: ClickHouseAsyncSink[String] = new ClickHouseAsyncSink[String](
+ convertorString,
+ MAX_BATCH_SIZE,
+ MAX_IN_FLIGHT_REQUESTS,
+ MAX_BUFFERED_REQUESTS,
+ MAX_BATCH_SIZE_IN_BYTES,
+ MAX_TIME_IN_BUFFER_MS,
+ MAX_RECORD_SIZE_IN_BYTES,
+ clickHouseClientConfig
+ )
+ csvSink.setClickHouseFormat(ClickHouseFormat.CSV)
+
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(2)
+
+ val filePath = new Path(fileFullName)
+ val source: FileSource[String] = FileSource
+ .forRecordStreamFormat(new TextLineInputFormat(), filePath)
+ .build()
+
+ val lines: DataStreamSource[String] = env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks[String](),
+ "GzipCsvSource"
+ )
+
+ lines.sinkTo(csvSink)
+ env.execute("Flink Scala API Read CSV (covid)")
+}
\ No newline at end of file
diff --git a/flink-connector-clickhouse-1.17/build.gradle.kts b/flink-connector-clickhouse-1.17/build.gradle.kts
index 710a25a..5d78887 100644
--- a/flink-connector-clickhouse-1.17/build.gradle.kts
+++ b/flink-connector-clickhouse-1.17/build.gradle.kts
@@ -16,16 +16,13 @@ val scalaVersion = "2.13.12"
val sinkVersion: String by rootProject.extra
repositories {
- // Use Maven Central for resolving dependencies.
- // mavenLocal()
- maven("https://s01.oss.sonatype.org/content/groups/staging/") // Temporary until we have a Java Client release
mavenCentral()
}
val flinkVersion = System.getenv("FLINK_VERSION") ?: "1.17.2"
extra.apply {
- set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") // Temporary until we have a Java Client release
+ set("clickHouseDriverVersion", "0.9.1")
set("flinkVersion", flinkVersion)
set("log4jVersion","2.17.2")
set("testContainersVersion", "1.21.0")
diff --git a/flink-connector-clickhouse-2.0.0/build.gradle.kts b/flink-connector-clickhouse-2.0.0/build.gradle.kts
index 9d362ef..6f05e34 100644
--- a/flink-connector-clickhouse-2.0.0/build.gradle.kts
+++ b/flink-connector-clickhouse-2.0.0/build.gradle.kts
@@ -16,14 +16,11 @@ val scalaVersion = "2.13.12"
val sinkVersion: String by rootProject.extra
repositories {
- // Use Maven Central for resolving dependencies.
- // mavenLocal()
- maven("https://s01.oss.sonatype.org/content/groups/staging/") // Temporary until we have a Java Client release
mavenCentral()
}
extra.apply {
- set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") // Temporary until we have a Java Client release
+ set("clickHouseDriverVersion", "0.9.1")
set("flinkVersion", "2.0.0")
set("log4jVersion","2.17.2")
set("testContainersVersion", "1.21.0")
diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
index 1496d82..c755395 100644
--- a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
+++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java
@@ -308,7 +308,7 @@ void ProductNameTest() throws Exception {
if (ClickHouseServerForTests.isCloud())
Thread.sleep(5000);
// let's wait until data will be available in query log
- String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName);
+ String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, "Flink-ClickHouse-Sink");
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/2.0.0, lv:scala/2.12)", ClickHouseSinkVersion.getVersion());
boolean isContains = productName.contains(compareString);
Assertions.assertTrue(isContains, "Expected user agent to contain: " + compareString + " but got: " + productName);
diff --git a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
index 2a72dff..b47c5db 100644
--- a/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
+++ b/flink-connector-clickhouse-2.0.0/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java
@@ -122,13 +122,20 @@ public static int countRows(String table) throws ExecutionException, Interrupted
List countResult = client.queryAll(countSql);
return countResult.get(0).getInteger(1);
}
+
// http_user_agent
- public static String extractProductName(String databaseName, String tableName) {
+ public static String extractProductName(String databaseName, String tableName, String productNameStartWith) {
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
List userAgentResult = client.queryAll(extractProductName);
+ String userAgentValue = null;
if (!userAgentResult.isEmpty()) {
- return userAgentResult.get(0).getString(1);
+ for (GenericRecord userAgent : userAgentResult) {
+ userAgentValue = userAgent.getString(1);
+ if (userAgentValue.contains(productNameStartWith))
+ return userAgent.getString(1);
+ }
+ throw new RuntimeException("Can not extract product name from " + userAgentValue);
}
throw new RuntimeException("Query is returning empty result.");
}
diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts
index 4b4ce0e..d7e7b5b 100644
--- a/flink-connector-clickhouse-base/build.gradle.kts
+++ b/flink-connector-clickhouse-base/build.gradle.kts
@@ -15,14 +15,12 @@ val scalaVersion = "2.13.12"
val sinkVersion: String by rootProject.extra
repositories {
- maven("https://s01.oss.sonatype.org/content/groups/staging/") // Temporary until we have a Java Client release
// Use Maven Central for resolving dependencies.
mavenCentral()
}
extra.apply {
- set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") // Temporary until we have a Java Client release
- set("flinkVersion", "2.0.0")
+ set("clickHouseDriverVersion", "0.9.1")
set("log4jVersion","2.17.2")
set("testContainersVersion", "1.21.0")
set("byteBuddyVersion", "1.17.5")
diff --git a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java
index dc4e833..1743505 100644
--- a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java
+++ b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Serialize.java
@@ -16,8 +16,6 @@
import java.util.HashMap;
import java.util.Map;
-
-
public class Serialize {
private static final Logger LOG = LoggerFactory.getLogger(Serialize.class);
public static boolean writePrimitiveValuePreamble(OutputStream out, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {