Skip to content

Commit 8fb7a03

Browse files
committed
Add README.md
1 parent 25b283b commit 8fb7a03

File tree

6 files changed

+120
-193
lines changed

6 files changed

+120
-193
lines changed

examples/README.md

Lines changed: 1 addition & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -1,194 +1,2 @@
1-
# Flink ClickHouse Connector Example
2-
3-
A comprehensive example demonstrating how to use Apache Flink with ClickHouse to process and store COVID-19 epidemiological data.
4-
5-
## 📋 Prerequisites
6-
7-
- **Java 11 or higher**
8-
- **Apache Maven 3.6+** or **Gradle 7.0+**
9-
- **ClickHouse instance** (local or cloud)
10-
- **Internet connection** for downloads
11-
- **Operating System**: Linux, macOS, or Windows with WSL
12-
13-
## 🚀 Quick Start
14-
15-
### 1. Download Apache Flink
16-
17-
```bash
18-
# Download Flink 2.0.0
19-
wget https://dlcdn.apache.org/flink/flink-2.0.0/flink-2.0.0-bin-scala_2.12.tgz
20-
21-
# Alternative with curl (shows progress)
22-
curl -L -# -O https://dlcdn.apache.org/flink/flink-2.0.0/flink-2.0.0-bin-scala_2.12.tgz
23-
24-
# Verify download (optional)
25-
sha512sum flink-2.0.0-bin-scala_2.12.tgz
26-
```
27-
28-
### 2. Extract and Start Flink
29-
30-
```bash
31-
# Extract Flink
32-
tar -xzf flink-2.0.0-bin-scala_2.12.tgz
33-
cd flink-2.0.0
34-
35-
# Start Flink cluster
36-
./bin/start-cluster.sh
37-
38-
# Verify Flink is running
39-
./bin/flink list
40-
# Or check web UI: http://localhost:8081
41-
```
42-
43-
### 3. Build the Connector and Application
44-
45-
```bash
46-
# Build connector (run from connector root directory)
47-
./gradlew publishToMavenLocal
48-
49-
# Verify connector was published
50-
ls ~/.m2/repository/org/apache/flink/connector/clickhouse/
51-
52-
# Build the application (run from maven example folder)
53-
cd examples/maven # adjust path as needed
54-
mvn clean package -DskipTests
55-
56-
# Verify JAR was created
57-
ls target/covid-1.0-SNAPSHOT.jar
58-
```
59-
60-
## 🗄️ ClickHouse Setup
61-
62-
### Option A: Docker (Recommended for testing)
63-
64-
```bash
65-
# Start ClickHouse with Docker
66-
docker run -d --name clickhouse-server \
67-
-p 8123:8123 -p 9000:9000 \
68-
--ulimit nofile=262144:262144 \
69-
clickhouse/clickhouse-server
70-
71-
# Wait for startup
72-
sleep 10
73-
74-
# Test connection
75-
curl http://localhost:8123/ping
76-
```
77-
78-
### Option B: ClickHouse Cloud
79-
80-
1. Go to [ClickHouse Cloud](https://clickhouse.com/)
81-
2. Create a new service
82-
3. Note down your connection details
83-
84-
### Create Database Table
85-
86-
```sql
87-
-- Connect to ClickHouse
88-
clickhouse-client # for local install
89-
-- or use web interface: http://localhost:8123/play
90-
91-
-- Create table
92-
CREATE TABLE IF NOT EXISTS `default`.`covid` (
93-
date Date,
94-
location_key LowCardinality(String),
95-
new_confirmed Int32,
96-
new_deceased Int32,
97-
new_recovered Int32,
98-
new_tested Int32,
99-
cumulative_confirmed Int32,
100-
cumulative_deceased Int32,
101-
cumulative_recovered Int32,
102-
cumulative_tested Int32
103-
) ENGINE = MergeTree
104-
ORDER BY (location_key, date);
105-
```
106-
107-
## 📊 Download Sample Data
108-
109-
```bash
110-
# Download COVID-19 epidemiological data
111-
wget https://storage.googleapis.com/covid19-open-data/v3/epidemiology.csv
112-
113-
# Alternative with curl
114-
curl -L -# -o epidemiology.csv https://storage.googleapis.com/covid19-open-data/v3/epidemiology.csv
115-
116-
# Check file size and first few lines
117-
ls -lh epidemiology.csv
118-
head -5 epidemiology.csv
119-
```
120-
121-
## ▶️ Run the Application
122-
123-
### Local ClickHouse
124-
125-
```bash
126-
# Navigate to Flink directory
127-
cd flink-2.0.0
128-
129-
# Run the application
130-
./bin/flink run \
131-
/path/to/your/covid-1.0-SNAPSHOT.jar \
132-
-input "/path/to/epidemiology.csv" \
133-
-url "http://localhost:8123/default" \
134-
-username "default" \
135-
-password "" \
136-
-database "default" \
137-
-table "covid"
138-
```
139-
140-
### ClickHouse Cloud
141-
142-
```bash
143-
./bin/flink run \
144-
/path/to/your/covid-1.0-SNAPSHOT.jar \
145-
-input "/path/to/epidemiology.csv" \
146-
-url "jdbc:clickhouse://your-cluster.clickhouse.cloud:8443/default?ssl=true" \
147-
-username "your-username" \
148-
-password "your-password" \
149-
-database "default" \
150-
-table "covid"
151-
```
152-
153-
## ✅ Verify Results
154-
155-
```sql
156-
-- Check data was inserted
157-
SELECT COUNT(*) FROM covid;
158-
159-
-- View sample data
160-
SELECT * FROM covid LIMIT 10;
161-
162-
-- Check by country
163-
SELECT location_key, COUNT(*) as records
164-
FROM covid
165-
GROUP BY location_key
166-
ORDER BY records DESC
167-
LIMIT 10;
168-
169-
-- Analyze data trends
170-
SELECT
171-
date,
172-
SUM(new_confirmed) as global_new_cases,
173-
SUM(cumulative_confirmed) as global_total_cases
174-
FROM covid
175-
WHERE date >= '2020-01-01'
176-
GROUP BY date
177-
ORDER BY date
178-
LIMIT 20;
179-
```
180-
181-
## 🔧 Configuration Options
182-
183-
### Application Parameters
184-
185-
| Parameter | Description | Required | Default |
186-
|-----------|-------------|----------|---------|
187-
| `-input` | Path to input CSV file | Yes | - |
188-
| `-url` | ClickHouse URL | Yes | - |
189-
| `-username` | ClickHouse username | Yes | - |
190-
| `-password` | ClickHouse password | No | "" |
191-
| `-database` | Target database name | Yes | - |
192-
| `-table` | Target table name | Yes | - |
193-
1+
# Apache Flink Connector ClickHouse Example App
1942

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
3+
ThisBuild / scalaVersion := "2.12.17"
4+
5+
name := "csv-flink-example"
6+
organization := "clickhouse"
7+
version := "1.0"
8+
9+
val flinkVersion = "2.0.0"
10+
11+
resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"
12+
13+
libraryDependencies ++= Seq(
14+
"org.apache.flink" % "flink-streaming-java" % flinkVersion % "provided",
15+
"org.apache.flink" % "flink-clients" % flinkVersion % "provided",
16+
"org.apache.flink" % "flink-connector-files" % "2.0.0" % "provided",
17+
"org.apache.flink.connector" % "clickhouse" % "0.0.1" classifier "all"
18+
)
19+
20+
assembly / assemblyJarName := "flink-covid.jar"
21+
22+
assembly / assemblyExcludedJars := {
23+
val cp = (assembly / fullClasspath).value
24+
cp filter { jar =>
25+
jar.data.getName.contains("flink-") ||
26+
jar.data.getName.contains("scala-library")
27+
}
28+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version=1.11.2
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
rootLogger.level = INFO
20+
rootLogger.appenderRef.console.ref = ConsoleAppender
21+
22+
appender.console.name = ConsoleAppender
23+
appender.console.type = CONSOLE
24+
appender.console.layout.type = PatternLayout
25+
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import com.clickhouse.data.ClickHouseFormat
2+
import org.apache.flink.connector.clickhouse.sink.{ClickHouseAsyncSink, ClickHouseClientConfig}
3+
import org.apache.flink.util.ParameterTool
4+
import org.apache.flink.core.fs.Path
5+
import org.apache.flink.connector.file.src.FileSource
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy
7+
import org.apache.flink.connector.base.sink.writer.ElementConverter
8+
import org.apache.flink.connector.clickhouse.convertor.ClickHouseConvertor
9+
import org.apache.flink.connector.clickhouse.data.ClickHousePayload
10+
import org.apache.flink.connector.file.src.reader.TextLineInputFormat
11+
import org.apache.flink.streaming.api.datastream.DataStreamSource
12+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
13+
14+
object Main extends App {
15+
16+
private val MAX_BATCH_SIZE = 5000
17+
private val MAX_IN_FLIGHT_REQUESTS = 2
18+
private val MAX_BUFFERED_REQUESTS = 20000
19+
private val MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024L
20+
private val MAX_TIME_IN_BUFFER_MS = 5 * 1000L
21+
private val MAX_RECORD_SIZE_IN_BYTES = 1000L
22+
23+
val parameters: ParameterTool = ParameterTool.fromArgs(args)
24+
val fileFullName = parameters.get("input")
25+
val url = parameters.get("url")
26+
val username = parameters.get("username")
27+
val password = parameters.get("password")
28+
val database = parameters.get("database")
29+
val tableName = parameters.get("table")
30+
31+
val clickHouseClientConfig : ClickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName)
32+
33+
val convertorString: ElementConverter[String, ClickHousePayload] =
34+
new ClickHouseConvertor[String](classOf[String])
35+
36+
val csvSink: ClickHouseAsyncSink[String] = new ClickHouseAsyncSink[String](
37+
convertorString,
38+
MAX_BATCH_SIZE,
39+
MAX_IN_FLIGHT_REQUESTS,
40+
MAX_BUFFERED_REQUESTS,
41+
MAX_BATCH_SIZE_IN_BYTES,
42+
MAX_TIME_IN_BUFFER_MS,
43+
MAX_RECORD_SIZE_IN_BYTES,
44+
clickHouseClientConfig
45+
)
46+
csvSink.setClickHouseFormat(ClickHouseFormat.CSV)
47+
48+
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
49+
env.setParallelism(2)
50+
51+
val filePath = new Path(fileFullName)
52+
val source: FileSource[String] = FileSource
53+
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
54+
.build()
55+
56+
val lines: DataStreamSource[String] = env.fromSource(
57+
source,
58+
WatermarkStrategy.noWatermarks[String](),
59+
"GzipCsvSource"
60+
)
61+
62+
lines.sinkTo(csvSink)
63+
env.execute("Flink Scala API Read CSV (covid)")
64+
}

0 commit comments

Comments
 (0)