Skip to content

Commit 2dd0f15

Browse files
add post 'Debezium Sink Connector로 DB 변경사항을 다른 DB에 자동으로 동기화하는 방법'
Signed-off-by: jonghoonpark <dev@jonghoonpark.com>
1 parent ff335cb commit 2dd0f15

File tree

3 files changed

+113
-3
lines changed

3 files changed

+113
-3
lines changed

_posts/2025-08-19-debezium-with-kafka-4.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,17 @@ yml 파일을 작성한 후 podman compose 로 실행 시켰다.
187187
podman compose up -d
188188
```
189189

190-
### mysql table 설정
190+
### mysql 설정
191191

192-
아래와 같이 테이블을 생성해주었다.
192+
참고로 mysql에서 debezium 을 사용할 때는 `binlog_format` 이 반드시 'ROW' 여야 한다. (MySQL 기본 값은 `ROW` 이다. 따라서 별도로 설정하지 않았다면 문제가 없을 것이다.)
193+
194+
아래 명령어로 어떻게 설정되어있는지 확인해볼 수 있다.
195+
196+
```SQL
197+
SHOW VARIABLES LIKE 'binlog_format';
198+
```
199+
200+
테스트를 위해 아래와 같이 테이블을 생성해주었다.
193201

194202
```sql
195203
CREATE TABLE IF NOT EXISTS customers (
@@ -347,7 +355,7 @@ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json"
347355
Update 는 PUT 으로 가능하다.
348356

349357
```bash
350-
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/<connector-name> -d '{
358+
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/<connector-name>/config -d '{
351359
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
352360
"database.hostname": "mysql",
353361
"database.port": "3306",
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
---
2+
layout: "post"
3+
title: "Debezium Sink Connector로 DB 변경사항을 다른 DB에 자동으로 동기화하는 방법"
4+
description:
5+
"Debezium Sink Connector를 사용하여 DB 변경사항을 다른 DB에 자동으로 동기화하는 방법을 설명합니다.\
6+
\ 이 글에서는 Sink Connector의 개념과 설정 방법을 다루며, Kafka Connector API를 이용한 JdbcSinkConnector\
7+
\ 설정 예시를 제공합니다. 이를 통해 애플리케이션에서 직접 이벤트를 소비하지 않고도 데이터 복제 파이프라인을 쉽게 구성할 수 있습니다."
8+
categories:
9+
- "개발"
10+
tags:
11+
- "Debezium"
12+
- "Connector"
13+
- "Kafka"
14+
- "Kafka Connect"
15+
- "Sink"
16+
- "Sink Connector"
17+
- "MySQL"
18+
- "sync"
19+
- "database sync"
20+
date: "2025-08-22 15:00:00 +0000"
21+
toc: true
22+
image:
23+
path: "/assets/thumbnails/2025-08-24-debezium-sink-connector.jpg"
24+
---
25+
26+
# Debezium Sink Connector로 DB 변경사항을 다른 DB에 자동으로 동기화하는 방법
27+
28+
최근 [Debezium 으로 DB 변경사항 캡처하기](https://jonghoonpark.com/2025/08/19/debezium-with-kafka-4) 라는 글을 작성했었다.
29+
해당 글에서는 변경사항을 캡쳐 한 후, 어플리케이션 에서 직접 컨슘하여 활용하는 방법에 대해서 소개했었다.
30+
이번 글에서는 Sink Connector 를 사용하여 캡처된 변경사항을 다른 DB에 동기화 하는 방법에 대해 설명한다.
31+
32+
## Sink Connector
33+
34+
여기서 Sink는 싱크대 할 때의 싱크이다.
35+
36+
싱크대에서 물이 흘러 하수구로 모이는 것처럼, Sink Connector 는 이벤트를 컨슘하여 데이터를 외부 시스템으로 전달해주는 역할을 한다.
37+
38+
참고로 데이터를 produce 하는 커넥터는 Source Sonnector 라고 한다.
39+
40+
Source Connector가 DB 변경사항을 Kafka 토픽으로 보내면, Sink Connector는 해당 토픽에서 데이터를 읽어 다른 DB로 동기화한다.
41+
42+
## 시스템 구성 (compose)
43+
44+
[지난 글에서 작성한 compose.yml](https://jonghoonpark.com/2025/08/19/debezium-with-kafka-4#%EC%8B%9C%EC%8A%A4%ED%85%9C-%EA%B5%AC%EC%84%B1-compose) 에서 데이터 복제를 위한 2번째 mysql 설정만 추가해준다.
45+
46+
```yaml
47+
mysql2:
48+
image: mysql:8.0.32
49+
container_name: mysql_db2
50+
ports:
51+
- "3307:3306"
52+
environment:
53+
- MYSQL_ROOT_PASSWORD=debeziumrootpassword
54+
- MYSQL_USER=mysqluser
55+
- MYSQL_PASSWORD=mysqlpw
56+
- MYSQL_DATABASE=mydb
57+
volumes:
58+
- poc_debezium_mysql_data2:/var/lib/mysql
59+
networks:
60+
- kafka-net
61+
```
62+
63+
volumes 에 poc_debezium_mysql_data2 도 놓치지 말고 추가해준다.
64+
65+
## Connector 설정
66+
67+
Sink Connector도 Source Connector와 동일하게 Kafka Connector API를 사용한다.
68+
69+
JdbcSinkConnector를 커넥터 클래스도 사용한다.
70+
71+
```bash
72+
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
73+
"name": "my-debezium-sink-connector",
74+
"config": {
75+
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
76+
"tasks.max": "1",
77+
"connection.url": "jdbc:mysql://mysql2:3306/mydb?useSSL=false&allowPublicKeyRetrieval=true",
78+
"connection.username": "mysqluser",
79+
"connection.password": "mysqlpw",
80+
"insert.mode": "upsert",
81+
"delete.enabled": "true",
82+
"primary.key.mode": "record_key",
83+
"schema.evolution": "basic",
84+
"use.time.zone": "UTC",
85+
"topics": "dbserver1.mydb.customers",
86+
"table.name.format": "customers"
87+
}
88+
}'
89+
```
90+
91+
table.name.format 을 지정해주지 않으면, 토픽 이름을 기반으로 테이블이 요구된다.
92+
(ex. `dbserver1_mydb_customers`)
93+
94+
정상적으로 connector 가 설정되었다면, 데이터가 복제되는 것을 볼 수 있다.
95+
96+
## 마무리
97+
98+
Debezium Sink Connector를 활용하여 DB 변경사항을 다른 DB에 자동으로 동기화하는 방법을 알아보았다.
99+
100+
애플리케이션에서 직접 이벤트를 컨슘하지 않고도, Kafka Connect의 Sink Connector를 통해 데이터 복제 파이프라인을 손쉽게 구성할 수 있는 것을 확인해볼 수 있었다.
101+
102+
다음 글에서는 **Single Message Transform(SMT)**을 활용해, 이벤트를 필터링 하거나 가공처리 하는 방법에 대해 작성해보도록 하겠다.
1.13 MB
Loading

0 commit comments

Comments
 (0)