Skip to content

Commit 288afce

Browse files
author
Hoang Phan
committed
Add mysql source
1 parent f5673b5 commit 288afce

File tree

12 files changed

+693
-0
lines changed

12 files changed

+693
-0
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
**/ref/
2+
**/obj/
3+
**/bin/
4+
DotSettings.user
5+
**DotSettings.user
6+
ca.cert
7+
.idea/
8+
__pycache__/
9+
certificates/
10+
state/

python/sources/mysql_cdc/README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# MySQL CDC
2+
3+
This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication.
4+
5+
## How to run
6+
7+
1. Set up your MySQL database with binary logging enabled
8+
2. Configure environment variables for MySQL connection
9+
3. Install dependencies: `pip install -r requirements.txt`
10+
4. Run: `python main.py`
11+
12+
## Environment variables
13+
14+
The connector uses the following environment variables:
15+
16+
- **output**: Name of the output topic to write into.
17+
- **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server.
18+
- **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306).
19+
- **MYSQL_DATABASE**: The name of the database for CDC.
20+
- **MYSQL_USER**: The username that should be used to interact with the database.
21+
- **MYSQL_PASSWORD**: The password for the user configured above.
22+
- **MYSQL_SCHEMA**: The name of the schema/database for CDC (same as MYSQL_DATABASE).
23+
- **MYSQL_TABLE**: The name of the table for CDC.
24+
25+
## Requirements / Prerequisites
26+
27+
- A MySQL Database with binary logging enabled.
28+
- Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration.
29+
- MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges.
30+
31+
## Contribute
32+
33+
Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
34+
35+
## Open source
36+
37+
This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.
38+
39+
Please star us and mention us on social to show your appreciation.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# MySQL CDC Setup
2+
3+
This application implements MySQL CDC using MySQL binary log replication.
4+
5+
## Prerequisites
6+
7+
1. **MySQL Configuration**: Your MySQL server must have binary logging enabled with ROW format:
8+
```ini
9+
# Add to MySQL configuration file (my.cnf or my.ini)
10+
log-bin=mysql-bin
11+
binlog-format=ROW
12+
server-id=1
13+
```
14+
15+
2. **MySQL User Permissions**: The MySQL user needs REPLICATION SLAVE and REPLICATION CLIENT privileges:
16+
```sql
17+
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'your_user'@'%';
18+
GRANT SELECT ON your_database.your_table TO 'your_user'@'%';
19+
FLUSH PRIVILEGES;
20+
```
21+
22+
## Environment Variables
23+
24+
Set the following environment variables:
25+
26+
### MySQL Connection
27+
- `MYSQL_HOST` - MySQL server hostname (e.g., localhost)
28+
- `MYSQL_PORT` - MySQL server port (default: 3306)
29+
- `MYSQL_USER` - MySQL username
30+
- `MYSQL_PASSWORD` - MySQL password
31+
- `MYSQL_DATABASE` - MySQL database name
32+
- `MYSQL_SCHEMA` - MySQL database name (same as MYSQL_DATABASE)
33+
- `MYSQL_TABLE` - Table name to monitor for changes
34+
35+
### Kafka Output (unchanged)
36+
- `output` - Kafka topic name for publishing changes
37+
38+
## Example .env file
39+
40+
```env
41+
# MySQL Connection
42+
MYSQL_HOST=localhost
43+
MYSQL_PORT=3306
44+
MYSQL_USER=replication_user
45+
MYSQL_PASSWORD=your_password
46+
MYSQL_DATABASE=your_database
47+
MYSQL_SCHEMA=your_database
48+
MYSQL_TABLE=your_table
49+
50+
# Kafka Output
51+
output=cdc-changes-topic
52+
```
53+
54+
## Dependencies
55+
56+
Install the required Python packages:
57+
```bash
58+
pip install -r requirements.txt
59+
```
60+
61+
The key MySQL-specific dependencies are:
62+
- `pymysql` - MySQL database connector
63+
- `mysql-replication` - MySQL binary log replication library
64+
65+
## Change Data Format
66+
67+
The MySQL CDC produces change events in the following format:
68+
69+
### INSERT Event
70+
```json
71+
{
72+
"kind": "insert",
73+
"schema": "database_name",
74+
"table": "table_name",
75+
"columnnames": ["col1", "col2"],
76+
"columnvalues": ["value1", "value2"],
77+
"oldkeys": {}
78+
}
79+
```
80+
81+
### UPDATE Event
82+
```json
83+
{
84+
"kind": "update",
85+
"schema": "database_name",
86+
"table": "table_name",
87+
"columnnames": ["col1", "col2"],
88+
"columnvalues": ["new_value1", "new_value2"],
89+
"oldkeys": {
90+
"keynames": ["col1", "col2"],
91+
"keyvalues": ["old_value1", "old_value2"]
92+
}
93+
}
94+
```
95+
96+
### DELETE Event
97+
```json
98+
{
99+
"kind": "delete",
100+
"schema": "database_name",
101+
"table": "table_name",
102+
"columnnames": [],
103+
"columnvalues": [],
104+
"oldkeys": {
105+
"keynames": ["col1", "col2"],
106+
"keyvalues": ["deleted_value1", "deleted_value2"]
107+
}
108+
}
109+
```
110+
111+
## Running the Application
112+
113+
1. Ensure MySQL is configured with binary logging
114+
2. Set environment variables
115+
3. Run the application:
116+
```bash
117+
python main.py
118+
```
119+
120+
The application will:
121+
1. Connect to MySQL and validate binary logging is enabled
122+
2. Create a binary log stream reader
123+
3. Monitor the specified table for changes
124+
4. Buffer changes and publish them to Kafka every 500ms

python/sources/mysql_cdc/app.yaml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
name: video-static-feature
2+
language: Python
3+
variables:
4+
- name: output
5+
inputType: OutputTopic
6+
description: This is the output topic
7+
defaultValue: none
8+
required: true
9+
- name: MYSQL_HOST
10+
inputType: FreeText
11+
description: Host name of MySQL
12+
required: true
13+
- name: MYSQL_PORT
14+
inputType: FreeText
15+
description: Port of MySQL
16+
defaultValue: 3306
17+
required: true
18+
- name: MYSQL_USER
19+
inputType: FreeText
20+
description: Username of MySQL
21+
required: true
22+
- name: MYSQL_PASSWORD
23+
inputType: HiddenText
24+
description: Password of MySQL
25+
required: true
26+
- name: MYSQL_DATABASE
27+
inputType: FreeText
28+
description: Database name of MySQL
29+
required: true
30+
- name: MYSQL_SCHEMA
31+
inputType: FreeText
32+
description: Name of schema/database for CDC
33+
required: true
34+
- name: MYSQL_TABLE
35+
inputType: FreeText
36+
description: Name of table for CDC
37+
required: true
38+
dockerfile: dockerfile
39+
runEntryPoint: main.py
40+
defaultFile: main.py
41+
libraryItemId: mysql-cdc-source
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
FROM python:3.12.5-slim-bookworm
2+
3+
# Set environment variables for non-interactive setup and unbuffered output
4+
ENV DEBIAN_FRONTEND=noninteractive \
5+
PYTHONUNBUFFERED=1 \
6+
PYTHONIOENCODING=UTF-8 \
7+
PYTHONPATH="/app"
8+
9+
# Build argument for setting the main app path
10+
ARG MAINAPPPATH=.
11+
12+
# Set working directory inside the container
13+
WORKDIR /app
14+
15+
# Copy requirements to leverage Docker cache
16+
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"
17+
18+
# Install dependencies without caching
19+
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"
20+
21+
# Copy entire application into container
22+
COPY . .
23+
24+
# Set working directory to main app path
25+
WORKDIR "/app/${MAINAPPPATH}"
26+
27+
# Define the container's startup command
28+
ENTRYPOINT ["python3", "main.py"]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from datetime import timedelta
2+
import os
3+
import json
4+
5+
6+
def load_config():
7+
driver = os.environ["driver"]
8+
server = os.environ["server"]
9+
user_id = os.environ["userid"]
10+
password = os.environ["password"]
11+
database = os.environ["database"]
12+
table_name = os.environ["table_name"]
13+
last_modified_column = os.environ["last_modified_column"]
14+
time_delta_config = os.environ["time_delta"]
15+
16+
try:
17+
use_utc_for_offset = bool(os.environ["offset_is_utc"])
18+
except Exception as e:
19+
raise Exception("Use UTC For Offset must be True or False", e)
20+
21+
drop_cols = os.getenv("columns_to_drop")
22+
rename_cols = None
23+
passed_rename_cols = os.getenv("columns_to_rename")
24+
25+
try:
26+
poll_interval = int(os.environ["poll_interval_seconds"])
27+
except Exception as e:
28+
raise Exception("Poll Interval must be an integer", e)
29+
30+
if poll_interval < 1:
31+
poll_interval = 1
32+
33+
try:
34+
if passed_rename_cols != None and passed_rename_cols != "":
35+
rename_cols = json.loads(passed_rename_cols)
36+
except Exception as e:
37+
raise Exception("Invalid JSON supplied for column renames", e)
38+
39+
return {
40+
"driver": driver,
41+
"server": server,
42+
"user_id": user_id,
43+
"password": password,
44+
"database": database,
45+
"table_name": table_name,
46+
"last_modified_column": last_modified_column,
47+
"time_delta": make_time_delta_from_config(time_delta_config),
48+
"drop_cols": drop_cols,
49+
"rename_cols": rename_cols,
50+
"use_utc": use_utc_for_offset,
51+
"poll_interval": poll_interval
52+
}
53+
54+
55+
def make_time_delta_from_config(time_delta_config) -> timedelta:
56+
time_delta_values = time_delta_config.split(",")
57+
58+
if len(time_delta_values) != 5:
59+
raise Exception(
60+
"time_delta_config must contain 5 values, one for each of seconds, minutes, hours, days and weeks")
61+
62+
try:
63+
seconds = int(time_delta_values[0])
64+
minutes = int(time_delta_values[1])
65+
hours = int(time_delta_values[2])
66+
days = int(time_delta_values[3])
67+
weeks = int(time_delta_values[4])
68+
return timedelta(seconds = seconds, minutes = minutes, hours = hours, days = days, weeks = weeks)
69+
except TypeError as te:
70+
raise Exception("Unable to cast one of the supplied values to int", te)
71+
except Exception as e:
72+
raise Exception("Something went wrong configuring the time delta", e)
73+
74+
75+
def check_table_exists(conn, table) -> bool:
76+
if not conn.cursor().tables(table).fetchone():
77+
print("Table does not exist")
78+
return False
79+
return True
80+
81+
82+
def check_column_exists(conn, table, column_name) -> bool:
83+
for c in conn.cursor().columns(table = table):
84+
if column_name == c.column_name:
85+
return True
86+
print("Key column [{}] not found in table [{}]".format(column_name, table))
87+
return False
88+
89+
90+
def drop_columns(conn, cols_to_drop, table_data, table_name) -> any:
91+
for col in cols_to_drop:
92+
if check_column_exists(conn, table_name, col):
93+
table_data = table_data.drop(col, 1)
94+
return table_data
95+
96+
97+
def rename_columns(conn, cols_to_rename, table_data, table_name) -> any:
98+
for col in cols_to_rename:
99+
if check_column_exists(conn, table_name, col):
100+
table_data = table_data.rename(columns={col: cols_to_rename[col]})
101+
return table_data

python/sources/mysql_cdc/icon.png

2.31 KB
Loading

0 commit comments

Comments
 (0)