Skip to content

Commit 1ad18aa

Browse files
committed
Add example for flink based 1.17 & fix workflow
1 parent 86ea06f commit 1ad18aa

File tree

14 files changed

+517
-114
lines changed

14 files changed

+517
-114
lines changed

.github/workflows/integration-tests.yaml

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,34 @@ jobs:
2929
java-version: '11'
3030
distribution: 'adopt'
3131
architecture: x64
32-
- name: Publish locally base
33-
if: env.SKIP_STEP != 'true'
34-
uses: gradle/gradle-build-action@v2
35-
with:
36-
arguments: :flink-connector-clickhouse-base:publishToMavenLocal --info --stacktrace --continue
3732
- name: Publish locally flink-connector-clickhouse-1.17
3833
if: env.SKIP_STEP != 'true'
3934
uses: gradle/gradle-build-action@v2
4035
with:
41-
arguments: :flink-connector-clickhouse-1.17:publishToMavenLocal --info --stacktrace --continue
36+
arguments: :flink-connector-clickhouse-1.17:publishToMavenLocal
4237
- name: Publish locally flink-connector-clickhouse-2.0.0
4338
if: env.SKIP_STEP != 'true'
4439
uses: gradle/gradle-build-action@v2
4540
with:
46-
arguments: :flink-connector-clickhouse-2.0.0:publishToMavenLocal --info --stacktrace --continue
47-
- name: Generate Flink Covid App example
41+
arguments: :flink-connector-clickhouse-2.0.0:publishToMavenLocal
42+
- name: Generate Flink Covid App example 2.X
43+
if: env.SKIP_STEP != 'true'
44+
working-directory: ./examples/maven/flink-v2/covid
45+
run: mvn clean install
46+
- name: Generate Flink Covid App example 1.17+
4847
if: env.SKIP_STEP != 'true'
49-
working-directory: ./examples/maven/covid
48+
working-directory: ./examples/maven/flink-v1.7/covid
5049
run: mvn clean install
50+
- name: Setup and execute Gradle 'integration-test' task
51+
if: env.SKIP_STEP != 'true'
52+
uses: gradle/gradle-build-action@v2
53+
env:
54+
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
55+
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
56+
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
57+
FLINK_VERSION: ${{ matrix.flink }}
58+
with:
59+
arguments: :flink-connector-clickhouse-integration:test
5160

5261

5362

build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ plugins {
33
java
44
signing
55
id("com.gradleup.nmcp") version "0.0.8"
6-
id("com.github.johnrengelman.shadow") version "8.1.1"
6+
id("com.gradleup.shadow") version "9.0.2"
7+
// id("com.github.johnrengelman.shadow") version "8.1.1"
78

89
}
910

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
23+
<groupId>com.clickhouse.example.covid</groupId>
24+
<artifactId>covid</artifactId>
25+
<version>1.0-SNAPSHOT</version>
26+
<packaging>jar</packaging>
27+
28+
<name>Flink Quickstart Job</name>
29+
30+
<properties>
31+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
32+
<flink.version>1.17.2</flink.version>
33+
<target.java.version>11</target.java.version>
34+
<scala.binary.version>2.12</scala.binary.version>
35+
<maven.compiler.source>${target.java.version}</maven.compiler.source>
36+
<maven.compiler.target>${target.java.version}</maven.compiler.target>
37+
<log4j.version>2.24.1</log4j.version>
38+
</properties>
39+
40+
<repositories>
41+
42+
<repository>
43+
<id>apache.snapshots</id>
44+
<name>Apache Development Snapshot Repository</name>
45+
<url>https://repository.apache.org/content/repositories/snapshots/</url>
46+
<releases>
47+
<enabled>false</enabled>
48+
</releases>
49+
<snapshots>
50+
<enabled>true</enabled>
51+
</snapshots>
52+
</repository>
53+
</repositories>
54+
55+
<dependencies>
56+
<!-- Apache Flink dependencies -->
57+
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
58+
<dependency>
59+
<groupId>org.apache.flink</groupId>
60+
<artifactId>flink-streaming-java</artifactId>
61+
<version>${flink.version}</version>
62+
<scope>provided</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.apache.flink</groupId>
66+
<artifactId>flink-clients</artifactId>
67+
<version>${flink.version}</version>
68+
<scope>provided</scope>
69+
</dependency>
70+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
71+
<dependency>
72+
<groupId>org.apache.flink</groupId>
73+
<artifactId>flink-connector-files</artifactId>
74+
<version>${flink.version}</version>
75+
<scope>provided</scope>
76+
</dependency>
77+
78+
<dependency>
79+
<groupId>com.clickhouse.flink</groupId>
80+
<artifactId>flink-connector-clickhouse-1.17</artifactId>
81+
<version>0.0.1</version>
82+
<classifier>all</classifier>
83+
</dependency>
84+
85+
86+
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
87+
88+
<!-- Example:
89+
90+
<dependency>
91+
<groupId>org.apache.flink</groupId>
92+
<artifactId>flink-connector-kafka</artifactId>
93+
<version>3.0.0-1.17</version>
94+
</dependency>
95+
-->
96+
97+
<!-- Add logging framework, to produce console output when running in the IDE. -->
98+
<!-- These dependencies are excluded from the application JAR by default. -->
99+
<dependency>
100+
<groupId>org.apache.logging.log4j</groupId>
101+
<artifactId>log4j-slf4j-impl</artifactId>
102+
<version>${log4j.version}</version>
103+
<scope>runtime</scope>
104+
</dependency>
105+
<dependency>
106+
<groupId>org.apache.logging.log4j</groupId>
107+
<artifactId>log4j-api</artifactId>
108+
<version>${log4j.version}</version>
109+
<scope>runtime</scope>
110+
</dependency>
111+
<dependency>
112+
<groupId>org.apache.logging.log4j</groupId>
113+
<artifactId>log4j-core</artifactId>
114+
<version>${log4j.version}</version>
115+
<scope>runtime</scope>
116+
</dependency>
117+
</dependencies>
118+
119+
<build>
120+
<plugins>
121+
122+
<!-- Java Compiler -->
123+
<plugin>
124+
<groupId>org.apache.maven.plugins</groupId>
125+
<artifactId>maven-compiler-plugin</artifactId>
126+
<version>3.1</version>
127+
<configuration>
128+
<source>${target.java.version}</source>
129+
<target>${target.java.version}</target>
130+
</configuration>
131+
</plugin>
132+
133+
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
134+
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
135+
<plugin>
136+
<groupId>org.apache.maven.plugins</groupId>
137+
<artifactId>maven-shade-plugin</artifactId>
138+
<version>3.1.1</version>
139+
<executions>
140+
<!-- Run shade goal on package phase -->
141+
<execution>
142+
<phase>package</phase>
143+
<goals>
144+
<goal>shade</goal>
145+
</goals>
146+
<configuration>
147+
<createDependencyReducedPom>false</createDependencyReducedPom>
148+
<artifactSet>
149+
<excludes>
150+
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
151+
<exclude>com.google.code.findbugs:jsr305</exclude>
152+
<exclude>org.slf4j:*</exclude>
153+
<exclude>org.apache.logging.log4j:*</exclude>
154+
</excludes>
155+
</artifactSet>
156+
<filters>
157+
<filter>
158+
<!-- Do not copy the signatures in the META-INF folder.
159+
Otherwise, this might cause SecurityExceptions when using the JAR. -->
160+
<artifact>*:*</artifact>
161+
<excludes>
162+
<exclude>META-INF/*.SF</exclude>
163+
<exclude>META-INF/*.DSA</exclude>
164+
<exclude>META-INF/*.RSA</exclude>
165+
</excludes>
166+
</filter>
167+
</filters>
168+
<transformers>
169+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
170+
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
171+
<mainClass>com.clickhouse.example.covid.DataStreamJob</mainClass>
172+
</transformer>
173+
</transformers>
174+
</configuration>
175+
</execution>
176+
</executions>
177+
</plugin>
178+
</plugins>
179+
180+
<pluginManagement>
181+
<plugins>
182+
183+
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
184+
<plugin>
185+
<groupId>org.eclipse.m2e</groupId>
186+
<artifactId>lifecycle-mapping</artifactId>
187+
<version>1.0.0</version>
188+
<configuration>
189+
<lifecycleMappingMetadata>
190+
<pluginExecutions>
191+
<pluginExecution>
192+
<pluginExecutionFilter>
193+
<groupId>org.apache.maven.plugins</groupId>
194+
<artifactId>maven-shade-plugin</artifactId>
195+
<versionRange>[3.1.1,)</versionRange>
196+
<goals>
197+
<goal>shade</goal>
198+
</goals>
199+
</pluginExecutionFilter>
200+
<action>
201+
<ignore/>
202+
</action>
203+
</pluginExecution>
204+
<pluginExecution>
205+
<pluginExecutionFilter>
206+
<groupId>org.apache.maven.plugins</groupId>
207+
<artifactId>maven-compiler-plugin</artifactId>
208+
<versionRange>[3.1,)</versionRange>
209+
<goals>
210+
<goal>testCompile</goal>
211+
<goal>compile</goal>
212+
</goals>
213+
</pluginExecutionFilter>
214+
<action>
215+
<ignore/>
216+
</action>
217+
</pluginExecution>
218+
</pluginExecutions>
219+
</lifecycleMappingMetadata>
220+
</configuration>
221+
</plugin>
222+
</plugins>
223+
</pluginManagement>
224+
</build>
225+
</project>
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.clickhouse.example.covid;
20+
21+
import com.clickhouse.data.ClickHouseFormat;
22+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
23+
import org.apache.flink.api.java.utils.ParameterTool;
24+
import org.apache.flink.connector.base.sink.writer.ElementConverter;
25+
import org.apache.flink.connector.clickhouse.convertor.ClickHouseConvertor;
26+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload;
27+
import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncSink;
28+
import org.apache.flink.connector.clickhouse.sink.ClickHouseClientConfig;
29+
import org.apache.flink.connector.file.src.FileSource;
30+
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
31+
import org.apache.flink.core.fs.Path;
32+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
33+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34+
35+
36+
/**
37+
* Skeleton for a Flink DataStream Job.
38+
*
39+
* <p>For a tutorial how to write a Flink application, check the
40+
* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
41+
*
42+
* <p>To package your application into a JAR file for execution, run
43+
* 'mvn clean package' on the command line.
44+
*
45+
* <p>If you change the name of the main class (with the public static void main(String[] args))
46+
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
47+
*/
48+
public class DataStreamJob {
49+
50+
static final int MAX_BATCH_SIZE = 5000;
51+
static final int MAX_IN_FLIGHT_REQUESTS = 2;
52+
static final int MAX_BUFFERED_REQUESTS = 20000;
53+
static final long MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024;
54+
static final long MAX_TIME_IN_BUFFER_MS = 5 * 1000;
55+
static final long MAX_RECORD_SIZE_IN_BYTES = 1000;
56+
57+
/*
58+
Create covid table before running the example
59+
60+
CREATE TABLE `default`.`covid` (
61+
date Date,
62+
location_key LowCardinality(String),
63+
new_confirmed Int32,
64+
new_deceased Int32,
65+
new_recovered Int32,
66+
new_tested Int32,
67+
cumulative_confirmed Int32,
68+
cumulative_deceased Int32,
69+
cumulative_recovered Int32,
70+
cumulative_tested Int32
71+
)
72+
ENGINE = MergeTree
73+
ORDER BY (location_key, date);
74+
*/
75+
76+
public static void main(String[] args) throws Exception {
77+
// Sets up the execution environment, which is the main entry point
78+
ParameterTool parameters = ParameterTool.fromArgs(args);
79+
final String fileFullName = parameters.get("input");
80+
final String url = parameters.get("url");
81+
final String username = parameters.get("username");
82+
final String password = parameters.get("password");
83+
final String database = parameters.get("database");
84+
final String tableName = parameters.get("table");
85+
86+
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);
87+
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
88+
89+
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
90+
convertorString,
91+
MAX_BATCH_SIZE,
92+
MAX_IN_FLIGHT_REQUESTS,
93+
MAX_BUFFERED_REQUESTS,
94+
MAX_BATCH_SIZE_IN_BYTES,
95+
MAX_TIME_IN_BUFFER_MS,
96+
MAX_RECORD_SIZE_IN_BYTES,
97+
clickHouseClientConfig
98+
);
99+
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
100+
101+
102+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
103+
env.setParallelism(2);
104+
105+
Path filePath = new Path(fileFullName);
106+
FileSource<String> source = FileSource
107+
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
108+
.build();
109+
110+
DataStreamSource<String> lines = env.fromSource(
111+
source,
112+
WatermarkStrategy.noWatermarks(),
113+
"GzipCsvSource"
114+
);
115+
lines.sinkTo(csvSink);
116+
env.execute("Flink Java API Read CSV (covid)");
117+
}
118+
}

0 commit comments

Comments
 (0)