Skip to content

Commit e675c9c

Browse files
authored
docs: Migrating from MySQL with Flink CDC (#1602)
* Migrating from MySQL with Flink CDC * Update migrating-from-mysql-with-flink-cdc.md
1 parent b894447 commit e675c9c

File tree

3 files changed

+222
-32
lines changed

3 files changed

+222
-32
lines changed

docs/en/tutorials/migrate/migrating-from-mysql-with-flink-cdc.md

Lines changed: 222 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,99 @@
22
title: Migrating from MySQL with Flink CDC
33
---
44

5-
In this tutorial, you will set up a real-time data loading from MySQL to Databend with the Flink SQL connector for Databend. Before you start, make sure you have successfully set up Databend and MySQL in your environment.
5+
In this tutorial, we'll walk you through the process of migrating from MySQL to Databend Cloud using Apache Flink CDC.
66

7-
1. Create a table in MySQL and populate it with sample data. Then, create a corresponding target table in Databend.
7+
## Before You Start
88

9-
```sql title='In MySQL:'
10-
CREATE DATABASE mydb;
11-
USE mydb;
9+
Before you start, ensure you have the following prerequisites in place:
1210

11+
- [Docker](https://www.docker.com/) is installed on your local machine, as it will be used to launch MySQL.
12+
- Java 8 or 11 is installed on your local machine, as it is required by the [Flink Databend Connector](https://github.com/databendcloud/flink-connector-databend).
13+
- BendSQL is installed on your local machine. See [Installing BendSQL](/guides/sql-clients/bendsql/#installing-bendsql) for instructions on how to install BendSQL using various package managers.
14+
15+
## Step 1: Launch MySQL in Docker
16+
17+
1. Create a configuration file named **mysql.cnf** with the following content, and save this file in a local directory that will be mapped to the MySQL container, e.g., `/Users/eric/Downloads/mysql.cnf`:
18+
19+
```cnf
20+
[mysqld]
21+
# Basic settings
22+
server-id=1
23+
log-bin=mysql-bin
24+
binlog_format=ROW
25+
binlog_row_image=FULL
26+
expire_logs_days=3
27+
28+
# Character set settings
29+
character_set_server=utf8mb4
30+
collation-server=utf8mb4_unicode_ci
31+
32+
# Authentication settings
33+
default-authentication-plugin=mysql_native_password
34+
```
35+
36+
2. Start a MySQL container on your local machine. The command below launches a MySQL container named **mysql-server**, creates a database named **mydb**, and sets the root password to `root`:
37+
38+
```bash
39+
docker run \
40+
--platform linux/amd64 \
41+
--name mysql-server \
42+
-v /Users/eric/Downloads/mysql.cnf:/etc/mysql/conf.d/custom.cnf \
43+
-e MYSQL_ROOT_PASSWORD=root \
44+
-e MYSQL_DATABASE=mydb \
45+
-e MYSQL_ROOT_HOST=% \
46+
-p 3306:3306 \
47+
-d mysql:5.7
48+
```
49+
50+
3. Verify MySQL is running:
51+
52+
```bash
53+
docker ps
54+
```
55+
56+
Check the output for a container named **mysql-server**:
57+
58+
```bash
59+
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
60+
aac4c28be56e mysql:5.7 "docker-entrypoint.s…" 17 hours ago Up 17 hours 0.0.0.0:3306->3306/tcp, 33060/tcp mysql-server
61+
```
62+
63+
## Step 2: Populate MySQL with Sample Data
64+
65+
1. Log in to the MySQL container and enter the password `root` when prompted:
66+
67+
```bash
68+
docker exec -it mysql-server mysql -u root -p
69+
```
70+
71+
```
72+
Enter password:
73+
Welcome to the MySQL monitor. Commands end with ; or \g.
74+
Your MySQL connection id is 71
75+
Server version: 5.7.44-log MySQL Community Server (GPL)
76+
77+
Copyright (c) 2000, 2023, Oracle and/or its affiliates.
78+
79+
Oracle is a registered trademark of Oracle Corporation and/or its
80+
affiliates. Other names may be trademarks of their respective
81+
owners.
82+
83+
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
84+
```
85+
86+
2. Switch to the **mydb** database:
87+
88+
```bash
89+
mysql> USE mydb;
90+
Database changed
91+
```
92+
93+
3. Copy and paste the following SQL to create a table named **products** and insert data:
94+
95+
```sql
1396
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
97+
1498
ALTER TABLE products AUTO_INCREMENT = 10;
1599

16100
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
@@ -25,29 +109,86 @@ INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
25109
(default,"spare tire","24 inch spare tire");
26110
```
27111

28-
```sql title='In Databend:'
29-
CREATE TABLE products (id INT NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512) );
112+
4. Verify the data:
113+
114+
```bash
115+
mysql> select * from products;
116+
+----+--------------------+---------------------------------------------------------+
117+
| id | name | description |
118+
+----+--------------------+---------------------------------------------------------+
119+
| 10 | scooter | Small 2-wheel scooter |
120+
| 11 | car battery | 12V car battery |
121+
| 12 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |
122+
| 13 | hammer | 12oz carpenter's hammer |
123+
| 14 | hammer | 14oz carpenter's hammer |
124+
| 15 | hammer | 16oz carpenter's hammer |
125+
| 16 | rocks | box of assorted rocks |
126+
| 17 | jacket | black wind breaker |
127+
| 18 | cloud | test for databend |
128+
| 19 | spare tire | 24 inch spare tire |
129+
+----+--------------------+---------------------------------------------------------+
130+
10 rows in set (0.01 sec)
30131
```
31132
32-
2. Download [Flink](https://flink.apache.org/downloads/) and the following SQL connectors to your system:
33-
- Flink SQL connector for Databend: [https://github.com/databendcloud/flink-connector-databend/releases](https://github.com/databendcloud/flink-connector-databend/releases)
34-
- Flink SQL connector for MySQL: [https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar)
35-
3. Move the both connector JAR files to the _lib_ folder in your Flink installation directory.
36-
4. Start Flink:
133+
## Step 3: Set Up Target in Databend Cloud
134+
135+
1. Connect to Databend Cloud using BendSQL. If you're unfamiliar with BendSQL, refer to this tutorial: [Connecting to Databend Cloud using BendSQL](../connect/connect-to-databendcloud-bendsql.md).
136+
137+
2. Copy and paste the following SQL to create a target table named **products**:
138+
139+
```sql
140+
CREATE TABLE products (
141+
id INT NOT NULL,
142+
name VARCHAR(255) NOT NULL,
143+
description VARCHAR(512)
144+
);
145+
```
146+
147+
## Step 4: Install Flink CDC
148+
149+
1. Download and extract Flink 1.17.1:
150+
151+
```bash
152+
curl -O https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
153+
tar -xvzf flink-1.17.1-bin-scala_2.12.tgz
154+
cd flink-1.17.1
155+
```
156+
157+
2. Download the Databend and MySQL connectors into the **lib** folder:
158+
159+
```bash
160+
curl -Lo lib/flink-connector-databend.jar https://github.com/databendcloud/flink-connector-databend/releases/latest/download/flink-connector-databend.jar
161+
162+
curl -Lo lib/flink-sql-connector-mysql-cdc-2.4.1.jar https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
163+
```
164+
165+
3. Open the file **flink-conf.yaml** under `flink-1.17.1/conf/`, update `taskmanager.memory.process.size` to `4096m`, and save the file.
166+
167+
```yaml
168+
taskmanager.memory.process.size: 4096m
169+
```
170+
171+
4. Start a Flink cluster:
37172

38173
```shell
39-
cd flink-16.0
40174
./bin/start-cluster.sh
41175
```
42176

43177
You can now open the Apache Flink Dashboard if you go to [http://localhost:8081](http://localhost:8081) in your browser:
44178

45179
![Alt text](/img/load/cdc-dashboard.png)
46180

47-
5. Start the Flink SQL Client:
181+
## Step 5: Start Migration
48182

49-
```shell
183+
1. Start the Flink SQL Client:
184+
185+
```bash
50186
./bin/sql-client.sh
187+
```
188+
189+
You will see the Flink SQL Client startup banner, confirming that the client has launched successfully.
190+
191+
```bash
51192
52193
▒▓██▓██▒
53194
▓████▒▒█▓▒▓███▓▒
@@ -92,49 +233,98 @@ You can now open the Apache Flink Dashboard if you go to [http://localhost:8081]
92233
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
93234
```
94235

95-
6. Set the checkpointing interval to 3 seconds, and create corresponding tables with MySQL and Databend connectors in the Flink SQL Client. For the available connection parameters, see [https://github.com/databendcloud/flink-connector-databend#connector-options](https://github.com/databendcloud/flink-connector-databend#connector-options):
236+
2. Set the checkpointing interval to 3 seconds.
96237

97-
```sql
238+
```bash
98239
Flink SQL> SET execution.checkpointing.interval = 3s;
99-
[INFO] Session property has been set.
240+
```
100241

101-
Flink SQL> CREATE TABLE mysql_products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED)
242+
3. Create corresponding tables with MySQL and Databend connectors in the Flink SQL Client (replace the placeholders with your actual values):
243+
244+
```sql
245+
CREATE TABLE mysql_products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED)
102246
WITH ('connector' = 'mysql-cdc',
103-
'hostname' = 'localhost',
247+
'hostname' = '127.0.0.1',
104248
'port' = '3306',
105249
'username' = 'root',
106-
'password' = '123456',
250+
'password' = 'root',
107251
'database-name' = 'mydb',
108252
'table-name' = 'products',
109253
'server-time-zone' = 'UTC'
110254
);
111-
[INFO] Execute statement succeed.
112255
113-
Flink SQL> CREATE TABLE databend_products (id INT,name String,description String, PRIMARY KEY (`id`) NOT ENFORCED)
256+
CREATE TABLE databend_products (id INT,name String,description String, PRIMARY KEY (`id`) NOT ENFORCED)
114257
WITH ('connector' = 'databend',
115-
'url'='databend://localhost:8000',
116-
'username'='databend',
117-
'password'='databend',
118-
'database-name'='default',
258+
'url'='databend://cloudapp:{password}@{host}:443/{database}?warehouse={warehouse_name}&ssl=true',
259+
'database-name'='{database}',
119260
'table-name'='products',
120-
'sink.batch-size' = '5',
261+
'sink.batch-size' = '1',
121262
'sink.flush-interval' = '1000',
122263
'sink.ignore-delete' = 'false',
123264
'sink.max-retries' = '3');
124-
[INFO] Execute statement succeed.
125265
```
126266

127-
7. In the Flink SQL Client, synchronize the data from the _mysql_products_ table to the _databend_products_ table:
267+
4. In the Flink SQL Client, synchronize the data from the mysql_products table to the databend_products table:
128268

129269
```sql
130270
Flink SQL> INSERT INTO databend_products SELECT * FROM mysql_products;
271+
>
131272
[INFO] Submitting SQL update statement to the cluster...
132273
[INFO] SQL update statement has been successfully submitted to the cluster:
133-
Job ID: b14645f34937c7cf3672ffba35733734
274+
Job ID: 5b505d752b7c211cbdcb5566175b9182
134275
```
135276

136277
You can now see a running job in the Apache Flink Dashboard:
137278

138279
![Alt text](/img/load/cdc-job.png)
139280

140-
You're all set! If you query the _products_ table in Databend, you will see that the data from MySQL has been successfully synchronized. Feel free to perform insertions, updates, or deletions in MySQL, and you will observe the corresponding changes reflected in Databend as well.
281+
You're all set! If you go back to the BendSQL terminal and query the **products** table in Databend Cloud, you will see that the data from MySQL has been successfully synchronized:
282+
283+
```sql
284+
SELECT * FROM products;
285+
286+
┌──────────────────────────────────────────────────────────────────────────────────────┐
287+
│ id │ name │ description │
288+
│ Int32 │ String │ Nullable(String) │
289+
├───────┼────────────────────┼─────────────────────────────────────────────────────────┤
290+
│ 18 │ cloud │ test for databend │
291+
│ 19 │ spare tire │ 24 inch spare tire │
292+
│ 16 │ rocks │ box of assorted rocks │
293+
│ 17 │ jacket │ black wind breaker │
294+
│ 14 │ hammer │ 14oz carpenter's hammer │
295+
│ 15 │ hammer │ 16oz carpenter's hammer │
296+
│ 12 │ 12-pack drill bits │ 12-pack of drill bits with sizes ranging from #40 to #3 │
297+
│ 13 │ hammer │ 12oz carpenter's hammer │
298+
│ 10 │ scooter │ Small 2-wheel scooter │
299+
│ 11 │ car battery │ 12V car battery │
300+
└──────────────────────────────────────────────────────────────────────────────────────┘
301+
```
302+
303+
5. Return to the MySQL terminal and insert a new product:
304+
305+
```sql
306+
INSERT INTO products VALUES (default, "bicycle", "Lightweight road bicycle");
307+
```
308+
309+
Next, in the BendSQL terminal, query the **products** table again to verify the new product has been synced:
310+
311+
```sql
312+
SELECT * FROM products;
313+
314+
┌──────────────────────────────────────────────────────────────────────────────────────┐
315+
│ id │ name │ description │
316+
│ Int32 │ String │ Nullable(String) │
317+
├───────┼────────────────────┼─────────────────────────────────────────────────────────┤
318+
│ 12 │ 12-pack drill bits │ 12-pack of drill bits with sizes ranging from #40 to #3 │
319+
│ 11 │ car battery │ 12V car battery │
320+
│ 14 │ hammer │ 14oz carpenter's hammer │
321+
│ 13 │ hammer │ 12oz carpenter's hammer │
322+
│ 10 │ scooter │ Small 2-wheel scooter │
323+
│ 20 │ bicycle │ Lightweight road bicycle │
324+
│ 19 │ spare tire │ 24 inch spare tire │
325+
│ 16 │ rocks │ box of assorted rocks │
326+
│ 15 │ hammer │ 16oz carpenter's hammer │
327+
│ 18 │ cloud │ test for databend │
328+
│ 17 │ jacket │ black wind breaker │
329+
└──────────────────────────────────────────────────────────────────────────────────────┘
330+
```

static/img/load/cdc-dashboard.png

-418 KB
Loading

static/img/load/cdc-job.png

-448 KB
Loading

0 commit comments

Comments
 (0)