|
| 1 | +--- |
| 2 | +title: 使用 Debezium 从 MySQL 迁移数据 |
| 3 | +--- |
| 4 | + |
| 5 | +在本教程中,您将使用 Debezium 将数据从 MySQL 加载到 Databend。开始之前,请确保您已在环境中成功部署 Databend、MySQL 和 Debezium。 |
| 6 | + |
| 7 | +## 步骤 1. 在 MySQL 中准备数据 {#step-1-prepare-data-in-mysql} |
| 8 | + |
| 9 | +在 MySQL 中创建数据库和表,并插入示例数据。 |
| 10 | + |
| 11 | +```sql |
| 12 | +CREATE DATABASE mydb; |
| 13 | +USE mydb; |
| 14 | + |
| 15 | +CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)); |
| 16 | +ALTER TABLE products AUTO_INCREMENT = 10; |
| 17 | + |
| 18 | +INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), |
| 19 | +(default,"car battery","12V car battery"), |
| 20 | +(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), |
| 21 | +(default,"hammer","12oz carpenter's hammer"), |
| 22 | +(default,"hammer","14oz carpenter's hammer"), |
| 23 | +(default,"hammer","16oz carpenter's hammer"), |
| 24 | +(default,"rocks","box of assorted rocks"), |
| 25 | +(default,"jacket","water-proof black wind breaker"), |
| 26 | +(default,"cloud","test for databend"), |
| 27 | +(default,"spare tire","24 inch spare tire"); |
| 28 | +``` |
| 29 | + |
| 30 | +## 步骤 2. 在 Databend 中创建数据库 {#step-2-create-database-in-databend} |
| 31 | + |
| 32 | +在 Databend 中创建对应的数据库。请注意,您无需创建与 MySQL 对应的表。 |
| 33 | + |
| 34 | +```sql |
| 35 | +CREATE DATABASE debezium; |
| 36 | +``` |
| 37 | + |
| 38 | +## 步骤 3. 创建 application.properties {#step-3-create-applicationproperties} |
| 39 | + |
| 40 | +创建文件 _application.properties_,然后启动 debezium-server-databend。关于如何安装和启动该工具,请参阅[安装 debezium-server-databend](#installing-debezium-server-databend)。 |
| 41 | + |
| 42 | +首次启动时,该工具会使用指定的批处理大小将 MySQL 数据全量同步到 Databend。因此,成功复制后,MySQL 中的数据将出现在 Databend 中。 |
| 43 | + |
| 44 | +```text title='application.properties' |
| 45 | +debezium.sink.type=databend |
| 46 | +debezium.sink.databend.upsert=true |
| 47 | +debezium.sink.databend.upsert-keep-deletes=false |
| 48 | +debezium.sink.databend.database.databaseName=debezium |
| 49 | +debezium.sink.databend.database.url=jdbc:databend://<your-databend-host>:<port> |
| 50 | +debezium.sink.databend.database.username=<your-username> |
| 51 | +debezium.sink.databend.database.password=<your-password> |
| 52 | +debezium.sink.databend.database.primaryKey=id |
| 53 | +debezium.sink.databend.database.tableName=products |
| 54 | +debezium.sink.databend.database.param.ssl=true |
| 55 | +
|
| 56 | +# 启用事件模式 |
| 57 | +debezium.format.value.schemas.enable=true |
| 58 | +debezium.format.key.schemas.enable=true |
| 59 | +debezium.format.value=json |
| 60 | +debezium.format.key=json |
| 61 | +
|
| 62 | +# MySQL 源 |
| 63 | +debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector |
| 64 | +debezium.source.offset.storage.file.filename=data/offsets.dat |
| 65 | +debezium.source.offset.flush.interval.ms=60000 |
| 66 | +
|
| 67 | +debezium.source.database.hostname=127.0.0.1 |
| 68 | +debezium.source.database.port=3306 |
| 69 | +debezium.source.database.user=root |
| 70 | +debezium.source.database.password=123456 |
| 71 | +debezium.source.database.dbname=mydb |
| 72 | +debezium.source.database.server.name=from_mysql |
| 73 | +debezium.source.include.schema.changes=false |
| 74 | +debezium.source.table.include.list=mydb.products |
| 75 | +# debezium.source.database.ssl.mode=required |
| 76 | +# 不使用 Kafka,使用本地文件存储检查点 |
| 77 | +debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory |
| 78 | +debezium.source.database.history.file.filename=data/status.dat |
| 79 | +# 执行事件扁平化。解包消息! |
| 80 | +debezium.transforms=unwrap |
| 81 | +debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState |
| 82 | +debezium.transforms.unwrap.delete.handling.mode=rewrite |
| 83 | +debezium.transforms.unwrap.drop.tombstones=true |
| 84 | +
|
| 85 | +# ############ 设置日志级别 ############ |
| 86 | +quarkus.log.level=INFO |
| 87 | +# 忽略 Jetty 的 warning 级别以下日志,因其较为冗长 |
| 88 | +quarkus.log.category."org.eclipse.jetty".level=WARN |
| 89 | +``` |
| 90 | + |
| 91 | +一切就绪!如果您查询 Databend 中的 products 表,将会看到 MySQL 的数据已成功同步。您可以在 MySQL 中执行插入、更新或删除操作,相应的变更也会实时反映到 Databend 中。 |
0 commit comments