Skip to content

Commit 7ff5c9e

Browse files
authored
Merge pull request #22 from ClickHouse/implement-sink-writer-api
SinkWriter implementation of String data type
2 parents cbb1944 + 1716d05 commit 7ff5c9e

File tree

25 files changed

+1490
-21
lines changed

25 files changed

+1490
-21
lines changed

.github/workflows/tests-scala.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
name: Apache Flink ClickHouse Connector Tests CI (Scala)
2+
3+
on: [push]
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
strategy:
9+
fail-fast: false
10+
matrix:
11+
clickhouse: [ "23.7", "24.3", "latest", "cloud" ]
12+
name: Apache Flink ClickHouse Connector tests with ClickHouse ${{ matrix.clickhouse }}
13+
steps:
14+
- name: Check for Cloud Credentials
15+
id: check-cloud-credentials
16+
run: |
17+
if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then
18+
echo "SKIP_STEP=true" >> $GITHUB_ENV
19+
else
20+
echo "SKIP_STEP=false" >> $GITHUB_ENV
21+
fi
22+
shell: bash
23+
- uses: actions/checkout@v3
24+
if: env.SKIP_STEP != 'true'
25+
- name: Set up JDK 21
26+
if: env.SKIP_STEP != 'true'
27+
uses: actions/setup-java@v3
28+
with:
29+
java-version: '21'
30+
distribution: 'adopt'
31+
architecture: x64
32+
- name: Setup and execute Gradle 'runScalaTests' task
33+
if: env.SKIP_STEP != 'true'
34+
uses: gradle/gradle-build-action@v2
35+
env:
36+
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
37+
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
38+
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
39+
with:
40+
arguments: runScalaTests

.github/workflows/tests.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Apache Flink ClickHouse Connector Tests CI
1+
name: Apache Flink ClickHouse Connector Tests CI (Java)
22

33
on: [push]
44

@@ -22,7 +22,7 @@ jobs:
2222
shell: bash
2323
- uses: actions/checkout@v3
2424
if: env.SKIP_STEP != 'true'
25-
- name: Set up JDK 17
25+
- name: Set up JDK 21
2626
if: env.SKIP_STEP != 'true'
2727
uses: actions/setup-java@v3
2828
with:

flink-connector-clickhouse-base/build.gradle.kts

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,42 +4,46 @@
44
*/
55

66
plugins {
7-
kotlin("jvm") version "1.9.0"
7+
scala
88
java
99
}
1010

11+
val scalaVersion = "2.13.12"
12+
1113
repositories {
1214
// Use Maven Central for resolving dependencies.
15+
mavenLocal()
1316
mavenCentral()
1417
}
1518

1619
extra.apply {
17-
set("clickHouseDriverVersion", "0.8.5")
20+
set("clickHouseDriverVersion", "0.8.6")
1821
set("flinkVersion", "2.0.0")
1922
set("log4jVersion","2.17.2")
2023
set("testContainersVersion", "1.21.0")
24+
set("byteBuddyVersion", "1.17.5")
2125
}
2226

23-
2427
dependencies {
2528
// Use JUnit Jupiter for testing.
2629
testImplementation(libs.junit.jupiter)
2730

2831
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
2932

33+
implementation("net.bytebuddy:byte-buddy:${project.extra["byteBuddyVersion"]}")
34+
implementation("net.bytebuddy:byte-buddy-agent:${project.extra["byteBuddyVersion"]}")
3035
// This dependency is used by the application.
3136
implementation(libs.guava)
37+
implementation("org.scala-lang:scala-library:$scalaVersion")
38+
implementation("org.scala-lang:scala-compiler:$scalaVersion")
3239
// logger
3340
implementation("org.apache.logging.log4j:log4j-slf4j-impl:${project.extra["log4jVersion"]}")
3441
implementation("org.apache.logging.log4j:log4j-api:${project.extra["log4jVersion"]}")
3542
implementation("org.apache.logging.log4j:log4j-1.2-api:${project.extra["log4jVersion"]}")
3643
implementation("org.apache.logging.log4j:log4j-core:${project.extra["log4jVersion"]}")
3744

3845
// ClickHouse Client Libraries
39-
implementation("com.clickhouse:clickhouse-client:${project.extra["clickHouseDriverVersion"]}")
40-
implementation("com.clickhouse:clickhouse-http-client:${project.extra["clickHouseDriverVersion"]}")
41-
implementation("com.clickhouse:clickhouse-data:${project.extra["clickHouseDriverVersion"]}")
42-
implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}")
46+
implementation("com.clickhouse:client-v2:${project.extra["clickHouseDriverVersion"]}:all")
4347
// Apache Flink Libraries
4448
implementation("org.apache.flink:flink-connector-base:${project.extra["flinkVersion"]}")
4549
implementation("org.apache.flink:flink-streaming-java:${project.extra["flinkVersion"]}")
@@ -60,6 +64,28 @@ dependencies {
6064
//
6165
testImplementation("org.testcontainers:testcontainers:${project.extra["testContainersVersion"]}")
6266
testImplementation("org.testcontainers:clickhouse:${project.extra["testContainersVersion"]}")
67+
testImplementation("org.scalatest:scalatest_2.13:3.2.19")
68+
testRuntimeOnly("org.scalatestplus:junit-4-13_2.13:3.2.18.0")
69+
// testRuntimeOnly("org.pegdown:pegdown:1.6.0") // sometimes required by ScalaTest
70+
}
71+
72+
sourceSets {
73+
main {
74+
scala {
75+
srcDirs("src/main/scala")
76+
}
77+
java {
78+
srcDirs("src/main/java")
79+
}
80+
}
81+
test {
82+
scala {
83+
srcDirs("src/test/scala")
84+
}
85+
java {
86+
srcDirs("src/test/java")
87+
}
88+
}
6389
}
6490

6591
// Apply a specific Java toolchain to ease working on different environments.
@@ -69,7 +95,36 @@ java {
6995
}
7096
}
7197

98+
tasks.test {
99+
useJUnitPlatform()
100+
101+
include("**/*Test.class", "**/*Tests.class", "**/*Spec.class")
102+
testLogging {
103+
events("passed", "failed", "skipped")
104+
//showStandardStreams = true - , "standardOut", "standardError"
105+
}
106+
}
107+
108+
tasks.withType<ScalaCompile> {
109+
scalaCompileOptions.apply {
110+
encoding = "UTF-8"
111+
isDeprecation = true
112+
additionalParameters = listOf("-feature", "-unchecked")
113+
}
114+
}
115+
72116
tasks.named<Test>("test") {
73117
// Use JUnit Platform for unit tests.
74118
useJUnitPlatform()
75119
}
120+
121+
tasks.register<JavaExec>("runScalaTests") {
122+
group = "verification"
123+
mainClass.set("org.scalatest.tools.Runner")
124+
classpath = sourceSets["test"].runtimeClasspath
125+
args = listOf(
126+
"-R", "build/classes/scala/test",
127+
"-oD", // show durations
128+
"-s", "org.apache.flink.connector.clickhouse.test.scala.ClickHouseSinkTests"
129+
)
130+
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package com.clickhouse.utils;
2+
3+
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
4+
import com.clickhouse.data.ClickHouseDataType;
5+
import com.clickhouse.data.format.BinaryStreamUtils;
6+
import org.jline.utils.Log;
7+
8+
import java.io.IOException;
9+
import java.io.OutputStream;
10+
import java.lang.reflect.Method;
11+
import java.time.LocalDate;
12+
import java.time.ZoneId;
13+
import java.time.ZonedDateTime;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
public class Serialize {
18+
19+
public static boolean writePrimitiveValuePreamble(OutputStream out, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
20+
// since it is primitive we always have a value that is not null
21+
if (defaultsSupport) {
22+
// Add indicator since the table has default values
23+
SerializerUtils.writeNonNull(out);
24+
}
25+
// if the column is Nullable need to add an indicator for nullable
26+
if (isNullable) {
27+
SerializerUtils.writeNonNull(out);
28+
}
29+
return true;
30+
}
31+
public static boolean writeValuePreamble(OutputStream out, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column, Object value) throws IOException {
32+
Log.debug("writeValuePreamble[defaultsSupport='%s', isNullable='%s', dataType='%s', column='%s', value='%s']");
33+
if (defaultsSupport) {
34+
if (value != null) {
35+
SerializerUtils.writeNonNull(out);
36+
if (isNullable) {
37+
SerializerUtils.writeNonNull(out);
38+
}
39+
} else {
40+
if (hasDefault) {
41+
SerializerUtils.writeNull(out);
42+
return false;
43+
}
44+
45+
if (isNullable) {
46+
SerializerUtils.writeNonNull(out);
47+
SerializerUtils.writeNull(out);
48+
return false;
49+
}
50+
51+
if (dataType == ClickHouseDataType.Array) {
52+
SerializerUtils.writeNonNull(out);
53+
} else if (dataType != ClickHouseDataType.Dynamic) {
54+
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s' of type '%s'", column, dataType));
55+
}
56+
}
57+
} else if (isNullable) {
58+
if (value == null) {
59+
SerializerUtils.writeNull(out);
60+
return false;
61+
}
62+
63+
SerializerUtils.writeNonNull(out);
64+
} else if (value == null) {
65+
if (dataType == ClickHouseDataType.Array) {
66+
SerializerUtils.writeNonNull(out);
67+
} else if (dataType != ClickHouseDataType.Dynamic) {
68+
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s' of type '%s'", column, dataType));
69+
}
70+
}
71+
return true;
72+
}
73+
74+
public static String convertToString(Object value) {
75+
return java.lang.String.valueOf(value);
76+
}
77+
78+
public static Integer convertToInteger(Object value) {
79+
if (value instanceof Integer) {
80+
return (Integer) value;
81+
} else if (value instanceof Number) {
82+
return ((Number) value).intValue();
83+
} else if (value instanceof String) {
84+
return Integer.parseInt((String) value);
85+
} else if (value instanceof Boolean) {
86+
return ((Boolean) value) ? 1 : 0;
87+
} else {
88+
throw new IllegalArgumentException("Cannot convert object of type " +
89+
value.getClass().getName() + " to Integer: " + value);
90+
}
91+
}
92+
93+
public static Map<ClickHouseDataType, Method> mapClickHouseTypeToMethod() {
94+
Map<ClickHouseDataType, Method> map = new HashMap<>();
95+
for (Method method : Serialize.class.getMethods()) {
96+
String name = method.getName();
97+
if (name.startsWith("write")) {
98+
String chType = name.substring("write".length());
99+
try {
100+
ClickHouseDataType type = ClickHouseDataType.valueOf(chType);
101+
map.put(type, method);
102+
} catch (IllegalArgumentException e) {
103+
System.out.println(e.getMessage());
104+
}
105+
}
106+
}
107+
return map;
108+
}
109+
110+
/**
111+
*
112+
*/
113+
114+
// Method structure write[ClickHouse Type](OutputStream, Java type, ... )
115+
// Date support
116+
public static void writeDate(OutputStream out, LocalDate value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
117+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
118+
SerializerUtils.writeDate(out, value, ZoneId.of("UTC")); // TODO: check
119+
}
120+
}
121+
122+
public static void writeDate(OutputStream out, ZonedDateTime value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
123+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
124+
SerializerUtils.writeDate(out, value, ZoneId.of("UTC")); // TODO: check
125+
}
126+
}
127+
128+
// clickhouse type String support
129+
public static void writeString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
130+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
131+
BinaryStreamUtils.writeString(out, convertToString(value));
132+
}
133+
}
134+
135+
public static void writeFixedString(OutputStream out, String value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, int size, String column) throws IOException {
136+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
137+
BinaryStreamUtils.writeFixedString(out, convertToString(value), size);
138+
}
139+
}
140+
141+
// Int8
142+
public static void writeInt8(OutputStream out, Byte value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
143+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
144+
BinaryStreamUtils.writeInt8(out, convertToInteger(value));
145+
}
146+
}
147+
148+
// Int16
149+
public static void writeInt16(OutputStream out, Short value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
150+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
151+
BinaryStreamUtils.writeInt16(out, convertToInteger(value));
152+
}
153+
}
154+
155+
// Int32
156+
public static void writeInt32(OutputStream out, Integer value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
157+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
158+
BinaryStreamUtils.writeInt32(out, convertToInteger(value));
159+
}
160+
}
161+
162+
// Int64
163+
public static void writeInt64(OutputStream out, Long value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
164+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
165+
BinaryStreamUtils.writeInt64(out, convertToInteger(value));
166+
}
167+
}
168+
169+
// Float32
170+
public static void writeFloat32(OutputStream out, Float value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
171+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
172+
BinaryStreamUtils.writeFloat32(out, value);
173+
}
174+
}
175+
176+
// Float64
177+
public static void writeFloat64(OutputStream out, Double value, boolean defaultsSupport, boolean isNullable, ClickHouseDataType dataType, boolean hasDefault, String column) throws IOException {
178+
if (writeValuePreamble(out, defaultsSupport, isNullable, dataType, hasDefault, column, value)) {
179+
BinaryStreamUtils.writeFloat64(out, value);
180+
}
181+
}
182+
183+
}

0 commit comments

Comments
 (0)