Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 66 additions & 51 deletions docs/content/docs/get-started/quickstart/mysql-to-doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,32 @@ under the License.

# Streaming ELT from MySQL to Doris

This tutorial is to show how to quickly build a Streaming ELT job from MySQL to Doris using Flink CDC, including the
This tutorial is to show how to quickly build a Streaming ELT job from MySQL to Apache Doris using Flink CDC, including the
feature of sync all table of one database, schema change evolution and sync sharding tables into one table.
All exercises in this tutorial are performed in the Flink CDC CLI, and the entire process uses standard SQL syntax,
without a single line of Java/Scala code or IDE installation.

## Preparation
Prepare a Linux or MacOS computer with Docker installed.

### Prepare Flink Standalone cluster
### Run a standalone Flink cluster

1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,unzip and get flink-1.18.0 directory.
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located.

```shell
cd flink-1.18.0
```

2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
2. Enable checkpointing by appending the following parameters to the `conf/flink-conf.yaml` configuration file to perform a checkpoint every 3 seconds.

```yaml
execution.checkpointing.interval: 3000
```

3. Start the Flink cluster using the following command.
3. Put the [MySQL Connector JAR](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar) in the `lib` folder

4. Start the Flink cluster using the following command.

```shell
./bin/start-cluster.sh
Expand All @@ -60,7 +63,7 @@ If successfully started, you can access the Flink Web UI at [http://localhost:80

Executing `start-cluster.sh` multiple times can start multiple `TaskManager`s.

### Prepare docker compose
### Run Doris and MySQL with Docker Compose
The following tutorial will prepare the required components using `docker-compose`.

1. Host Machine Configuration
Expand All @@ -69,21 +72,10 @@ Since `Doris` requires memory mapping support for operation, execute the followi
```shell
sysctl -w vm.max_map_count=2000000
```
Due to the different ways of implementing containers internally on MacOS, it may not be possible to directly modify the value of max_map_count on the host during deployment. You need to create the following containers first:

```shell
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
```

The container was created successfully executing the following command:
```shell
sysctl -w vm.max_map_count=2000000
```

Then `exit` exits and creates the Doris Docker cluster.

For Mac you can set the `privileged` flag for the Docker container directly, see below.

2. Start docker compose
Create a `docker-compose.yml` file using the content provided below:
2. Create a `docker-compose.yml` file using the content provided below:

```yaml
version: '2.1'
Expand All @@ -94,6 +86,8 @@ Then `exit` exits and creates the Doris Docker cluster.
- "8030:8030"
- "8040:8040"
- "9030:9030"
# Uncomment if running on Mac
# privileged: true
mysql:
image: debezium/example-mysql:1.1
ports:
Expand All @@ -104,25 +98,26 @@ Then `exit` exits and creates the Doris Docker cluster.
- MYSQL_PASSWORD=mysqlpw
```

The Docker Compose should include the following services (containers):
- MySQL: include a database named `app_db`
- Doris: to store tables from MySQL
The Docker Compose should include the following services (containers):
- MySQL: include a database named `app_db`
- Doris: to store tables from MySQL

To start all containers, run the following command in the directory that contains the `docker-compose.yml` file.
3. Start all containers by running the following command in the directory that contains the `docker-compose.yml` file:

```shell
docker-compose up -d
```

This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run docker ps to check whether these containers are running properly. You can also visit [http://localhost:8030/](http://localhost:8030/) to check whether Doris is running.
#### Prepare records for MySQL
This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run `docker compose ps` to check whether these containers are running properly. You can also visit [http://localhost:8030/](http://localhost:8030/) to check whether Doris is running.

### Prepare records for MySQL
1. Enter MySQL container

```shell
docker-compose exec mysql mysql -uroot -p123456
```

2. create `app_db` database and `orders`,`products`,`shipments` tables, then insert records
2. Create `app_db` database and `orders`,`products`,`shipments` tables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put all the tables create table sql statements here?

```sql
-- create database
Expand All @@ -136,7 +131,11 @@ This command automatically starts all the containers defined in the Docker Compo
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);
```

3. Insert some test records:

```sql
-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
Expand Down Expand Up @@ -165,8 +164,10 @@ This command automatically starts all the containers defined in the Docker Compo
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
```

#### Create database in Doris
### Create database in Doris

`Doris` connector currently does not support automatic database creation and needs to first create a database corresponding to the write table.

1. Enter Doris Web UI。
[http://localhost:8030/](http://localhost:8030/)
The default username is `root`, and the default password is empty.
Expand All @@ -181,19 +182,26 @@ This command automatically starts all the containers defined in the Docker Compo

{{< img src="/fig/mysql-doris-tutorial/doris-create-table.png" alt="Doris create table" >}}

## Submit job with Flink CDC CLI
## Set up Flink CDC

1. Download the binary compressed packages listed below and extract them to the directory `flink cdc-3.1.0'`:
[flink-cdc-3.1.0-bin.tar.gz](https://www.apache.org/dyn/closer.lua/flink/flink-cdc-3.1.0/flink-cdc-3.1.0-bin.tar.gz)
flink-cdc-3.1.0 directory will contain four directory: `bin`, `lib`, `log`, and `conf`.

2. Download the connector package listed below and move it to the `lib` directory
**Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself.**
After unpacking, the `flink-cdc-3.1.0` directory will contain four directories:
* `bin`
* `lib`
* `log`
* `conf`

2. Download the connector package listed below and move it to the `flink-cdc-3.1.0/lib` directory.
**Please note that you need to move the jar to the lib directory of Flink CDC Home, not to the lib directory of Flink Home.**

- [MySQL pipeline connector 3.1.0](https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.0/flink-cdc-pipeline-connector-mysql-3.1.0.jar)
- [Apache Doris pipeline connector 3.1.0](https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-cdc-pipeline-connector-doris/3.1.0/flink-cdc-pipeline-connector-doris-3.1.0.jar)

You also need to place MySQL connector into Flink `lib` folder or pass it with `--jar` argument, since they're no longer packaged with CDC connectors:
- [MySQL Connector Java](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar)
_NOTE: Download links are available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself._

## Create a Flink CDC configuration

3. Write task configuration yaml file
Here is an example file for synchronizing the entire database `mysql-to-doris.yaml`:
Expand Down Expand Up @@ -222,58 +230,65 @@ This command automatically starts all the containers defined in the Docker Compo

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
parallelism: 1

```

Notice that:
`tables: app_db.\.*` in source synchronize all tables in `app_db` through Regular Matching.
`table.create.properties.replication_num` in sink is because there is only one Doris BE node in the Docker image.

4. Finally, submit job to Flink Standalone cluster using Cli.
```shell
bash bin/flink-cdc.sh mysql-to-doris.yaml
```
## Submit the Flink CDC job

Submit the job to Flink Standalone cluster using CLI:

```shell
bash bin/flink-cdc.sh mysql-to-doris.yaml
```

After successful submission, the return information is as follows:
```shell
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
```
We can find a job named `Sync MySQL Database to Doris` is running through Flink Web UI.

```shell
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
```

We can find a job named `Sync MySQL Database to Doris` is running through Flink Web UI.

{{< img src="/fig/mysql-doris-tutorial/mysql-to-doris.png" alt="MySQL-to-Doris" >}}

We can find that tables are created and inserted through Doris Web UI.

{{< img src="/fig/mysql-doris-tutorial/doris-display-data.png" alt="Doris display data" >}}

### Synchronize Schema and Data changes
## Synchronize Schema and Data changes

Enter MySQL container

```shell
docker-compose exec mysql mysql -uroot -p123456
```

Then, modify schema and record in MySQL, and the tables of Doris will change the same in real time:
1. insert one record in `orders` from MySQL:
1. Insert one record in `orders` from MySQL:

```sql
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
```

2. add one column in `orders` from MySQL:
2. Add one column in `orders` from MySQL:

```sql
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
```

3. update one record in `orders` from MySQL:
3. Update one record in `orders` from MySQL:

```sql
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
```
4. delete one record in `orders` from MySQL:
4. Delete one record in `orders` from MySQL:

```sql
DELETE FROM app_db.orders WHERE id=2;
Expand All @@ -285,7 +300,7 @@ Refresh the Doris Web UI every time you execute a step, and you can see that the

Similarly, by modifying the 'shipments' and' products' tables, you can also see the results of synchronized changes in real-time in Doris.

### Route the changes
## Route the changes
Flink CDC provides the configuration to route the table structure/data of the source table to other table names.
With this ability, we can achieve functions such as table name, database name replacement, and whole database synchronization.
Here is an example file for using `route` feature:
Expand Down Expand Up @@ -322,7 +337,7 @@ Here is an example file for using `route` feature:

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
parallelism: 1
```

Using the upper `route` configuration, we can synchronize the table schema and data of `app_db.orders` to `ods_db.ods_orders`, thus achieving the function of database migration.
Expand Down