From 288afce2fa1d13b606e6406362da3c1d04925f50 Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Tue, 3 Jun 2025 14:37:50 -0400 Subject: [PATCH 1/4] Add mysql source --- python/sources/mysql_cdc/.gitignore | 10 ++ python/sources/mysql_cdc/README.md | 39 +++++ python/sources/mysql_cdc/README_MYSQL_CDC.md | 124 +++++++++++++++ python/sources/mysql_cdc/app.yaml | 41 +++++ python/sources/mysql_cdc/dockerfile | 28 ++++ python/sources/mysql_cdc/helper_functions.py | 101 ++++++++++++ python/sources/mysql_cdc/icon.png | Bin 0 -> 2367 bytes python/sources/mysql_cdc/library.json | 89 +++++++++++ python/sources/mysql_cdc/main.py | 85 ++++++++++ python/sources/mysql_cdc/mysql_helper.py | 159 +++++++++++++++++++ python/sources/mysql_cdc/requirements.txt | 4 + python/sources/mysql_cdc/setup_logger.py | 13 ++ 12 files changed, 693 insertions(+) create mode 100644 python/sources/mysql_cdc/.gitignore create mode 100644 python/sources/mysql_cdc/README.md create mode 100644 python/sources/mysql_cdc/README_MYSQL_CDC.md create mode 100644 python/sources/mysql_cdc/app.yaml create mode 100644 python/sources/mysql_cdc/dockerfile create mode 100644 python/sources/mysql_cdc/helper_functions.py create mode 100644 python/sources/mysql_cdc/icon.png create mode 100644 python/sources/mysql_cdc/library.json create mode 100644 python/sources/mysql_cdc/main.py create mode 100644 python/sources/mysql_cdc/mysql_helper.py create mode 100644 python/sources/mysql_cdc/requirements.txt create mode 100644 python/sources/mysql_cdc/setup_logger.py diff --git a/python/sources/mysql_cdc/.gitignore b/python/sources/mysql_cdc/.gitignore new file mode 100644 index 000000000..70409a142 --- /dev/null +++ b/python/sources/mysql_cdc/.gitignore @@ -0,0 +1,10 @@ +**/ref/ +**/obj/ +**/bin/ +DotSettings.user +**DotSettings.user +ca.cert +.idea/ +__pycache__/ +certificates/ +state/ \ No newline at end of file diff --git a/python/sources/mysql_cdc/README.md b/python/sources/mysql_cdc/README.md new file mode 100644 index 000000000..586887896 --- /dev/null +++ b/python/sources/mysql_cdc/README.md @@ -0,0 +1,39 @@ +# MySQL CDC + +This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. + +## How to run + +1. Set up your MySQL database with binary logging enabled +2. Configure environment variables for MySQL connection +3. Install dependencies: `pip install -r requirements.txt` +4. Run: `python main.py` + +## Environment variables + +The connector uses the following environment variables: + +- **output**: Name of the output topic to write into. +- **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server. +- **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306). +- **MYSQL_DATABASE**: The name of the database for CDC. +- **MYSQL_USER**: The username that should be used to interact with the database. +- **MYSQL_PASSWORD**: The password for the user configured above. +- **MYSQL_SCHEMA**: The name of the schema/database for CDC (same as MYSQL_DATABASE). +- **MYSQL_TABLE**: The name of the table for CDC. + +## Requirements / Prerequisites + +- A MySQL Database with binary logging enabled. +- Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration. +- MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/python/sources/mysql_cdc/README_MYSQL_CDC.md b/python/sources/mysql_cdc/README_MYSQL_CDC.md new file mode 100644 index 000000000..fe73f3152 --- /dev/null +++ b/python/sources/mysql_cdc/README_MYSQL_CDC.md @@ -0,0 +1,124 @@ +# MySQL CDC Setup + +This application implements MySQL CDC using MySQL binary log replication. + +## Prerequisites + +1. **MySQL Configuration**: Your MySQL server must have binary logging enabled with ROW format: + ```ini + # Add to MySQL configuration file (my.cnf or my.ini) + log-bin=mysql-bin + binlog-format=ROW + server-id=1 + ``` + +2. **MySQL User Permissions**: The MySQL user needs REPLICATION SLAVE and REPLICATION CLIENT privileges: + ```sql + GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'your_user'@'%'; + GRANT SELECT ON your_database.your_table TO 'your_user'@'%'; + FLUSH PRIVILEGES; + ``` + +## Environment Variables + +Set the following environment variables: + +### MySQL Connection +- `MYSQL_HOST` - MySQL server hostname (e.g., localhost) +- `MYSQL_PORT` - MySQL server port (default: 3306) +- `MYSQL_USER` - MySQL username +- `MYSQL_PASSWORD` - MySQL password +- `MYSQL_DATABASE` - MySQL database name +- `MYSQL_SCHEMA` - MySQL database name (same as MYSQL_DATABASE) +- `MYSQL_TABLE` - Table name to monitor for changes + +### Kafka Output (unchanged) +- `output` - Kafka topic name for publishing changes + +## Example .env file + +```env +# MySQL Connection +MYSQL_HOST=localhost +MYSQL_PORT=3306 +MYSQL_USER=replication_user +MYSQL_PASSWORD=your_password +MYSQL_DATABASE=your_database +MYSQL_SCHEMA=your_database +MYSQL_TABLE=your_table + +# Kafka Output +output=cdc-changes-topic +``` + +## Dependencies + +Install the required Python packages: +```bash +pip install -r requirements.txt +``` + +The key MySQL-specific dependencies are: +- `pymysql` - MySQL database connector +- `mysql-replication` - MySQL binary log replication library + +## Change Data Format + +The MySQL CDC produces change events in the following format: + +### INSERT Event +```json +{ + "kind": "insert", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["value1", "value2"], + "oldkeys": {} +} +``` + +### UPDATE Event +```json +{ + "kind": "update", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["new_value1", "new_value2"], + "oldkeys": { + "keynames": ["col1", "col2"], + "keyvalues": ["old_value1", "old_value2"] + } +} +``` + +### DELETE Event +```json +{ + "kind": "delete", + "schema": "database_name", + "table": "table_name", + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": ["col1", "col2"], + "keyvalues": ["deleted_value1", "deleted_value2"] + } +} +``` + +## Running the Application + +1. Ensure MySQL is configured with binary logging +2. Set environment variables +3. Run the application: + ```bash + python main.py + ``` + +The application will: +1. Connect to MySQL and validate binary logging is enabled +2. Create a binary log stream reader +3. Monitor the specified table for changes +4. Buffer changes and publish them to Kafka every 500ms \ No newline at end of file diff --git a/python/sources/mysql_cdc/app.yaml b/python/sources/mysql_cdc/app.yaml new file mode 100644 index 000000000..b7fdd2434 --- /dev/null +++ b/python/sources/mysql_cdc/app.yaml @@ -0,0 +1,41 @@ +name: video-static-feature +language: Python +variables: + - name: output + inputType: OutputTopic + description: This is the output topic + defaultValue: none + required: true + - name: MYSQL_HOST + inputType: FreeText + description: Host name of MySQL + required: true + - name: MYSQL_PORT + inputType: FreeText + description: Port of MySQL + defaultValue: 3306 + required: true + - name: MYSQL_USER + inputType: FreeText + description: Username of MySQL + required: true + - name: MYSQL_PASSWORD + inputType: HiddenText + description: Password of MySQL + required: true + - name: MYSQL_DATABASE + inputType: FreeText + description: Database name of MySQL + required: true + - name: MYSQL_SCHEMA + inputType: FreeText + description: Name of schema/database for CDC + required: true + - name: MYSQL_TABLE + inputType: FreeText + description: Name of table for CDC + required: true +dockerfile: dockerfile +runEntryPoint: main.py +defaultFile: main.py +libraryItemId: mysql-cdc-source diff --git a/python/sources/mysql_cdc/dockerfile b/python/sources/mysql_cdc/dockerfile new file mode 100644 index 000000000..692316bbb --- /dev/null +++ b/python/sources/mysql_cdc/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/sources/mysql_cdc/helper_functions.py b/python/sources/mysql_cdc/helper_functions.py new file mode 100644 index 000000000..779146c61 --- /dev/null +++ b/python/sources/mysql_cdc/helper_functions.py @@ -0,0 +1,101 @@ +from datetime import timedelta +import os +import json + + +def load_config(): + driver = os.environ["driver"] + server = os.environ["server"] + user_id = os.environ["userid"] + password = os.environ["password"] + database = os.environ["database"] + table_name = os.environ["table_name"] + last_modified_column = os.environ["last_modified_column"] + time_delta_config = os.environ["time_delta"] + + try: + use_utc_for_offset = bool(os.environ["offset_is_utc"]) + except Exception as e: + raise Exception("Use UTC For Offset must be True or False", e) + + drop_cols = os.getenv("columns_to_drop") + rename_cols = None + passed_rename_cols = os.getenv("columns_to_rename") + + try: + poll_interval = int(os.environ["poll_interval_seconds"]) + except Exception as e: + raise Exception("Poll Interval must be an integer", e) + + if poll_interval < 1: + poll_interval = 1 + + try: + if passed_rename_cols != None and passed_rename_cols != "": + rename_cols = json.loads(passed_rename_cols) + except Exception as e: + raise Exception("Invalid JSON supplied for column renames", e) + + return { + "driver": driver, + "server": server, + "user_id": user_id, + "password": password, + "database": database, + "table_name": table_name, + "last_modified_column": last_modified_column, + "time_delta": make_time_delta_from_config(time_delta_config), + "drop_cols": drop_cols, + "rename_cols": rename_cols, + "use_utc": use_utc_for_offset, + "poll_interval": poll_interval + } + + +def make_time_delta_from_config(time_delta_config) -> timedelta: + time_delta_values = time_delta_config.split(",") + + if len(time_delta_values) != 5: + raise Exception( + "time_delta_config must contain 5 values, one for each of seconds, minutes, hours, days and weeks") + + try: + seconds = int(time_delta_values[0]) + minutes = int(time_delta_values[1]) + hours = int(time_delta_values[2]) + days = int(time_delta_values[3]) + weeks = int(time_delta_values[4]) + return timedelta(seconds = seconds, minutes = minutes, hours = hours, days = days, weeks = weeks) + except TypeError as te: + raise Exception("Unable to cast one of the supplied values to int", te) + except Exception as e: + raise Exception("Something went wrong configuring the time delta", e) + + +def check_table_exists(conn, table) -> bool: + if not conn.cursor().tables(table).fetchone(): + print("Table does not exist") + return False + return True + + +def check_column_exists(conn, table, column_name) -> bool: + for c in conn.cursor().columns(table = table): + if column_name == c.column_name: + return True + print("Key column [{}] not found in table [{}]".format(column_name, table)) + return False + + +def drop_columns(conn, cols_to_drop, table_data, table_name) -> any: + for col in cols_to_drop: + if check_column_exists(conn, table_name, col): + table_data = table_data.drop(col, 1) + return table_data + + +def rename_columns(conn, cols_to_rename, table_data, table_name) -> any: + for col in cols_to_rename: + if check_column_exists(conn, table_name, col): + table_data = table_data.rename(columns={col: cols_to_rename[col]}) + return table_data \ No newline at end of file diff --git a/python/sources/mysql_cdc/icon.png b/python/sources/mysql_cdc/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..725e0be48f0292fa996055f48b72f5ba0be05833 GIT binary patch literal 2367 zcmV-F3BdM=P)(_4vfFKBhAP9mW2!bF8f*=Tj zAP9mW2!bF8f|$!_c_nDE1C)FMw73>FCQqI1F7*#Oc{XUVAGCBp%iBSV0w{3^w73m) zIx*Yb>wR?U63}vc`TzC91E9p0eWQqHL5WdNVm0X0Dlq1?v)b35L?su37A~mNCqmbA z;_aa1W1yr5T29%5e=D&Slz7*y^|vR<%FlojyI?GJLQNy7`2f4#uFMm_^Md0yB0*wdKwF`(o>LCMdDuH)2{OI=V= ze=L0>X!(18!A@=LVo!S*B|Z&WJ{MYSH;T8|(Y`?*S5Y79Y4Mff`tWT?r;qG%k9!Ow zjs_(sLCa5uuHy*LWaigwr{sFOE@<%sFy^o>^t6Z3;xTRIJ%XTvs7bO?_ zdj;`F+Bvra&gDux1=-;R@!O^@_dO$#Z>tY|O(ruRwD>D%k%`?FJs~Y0w!3TetcVuC z9kCKGKvsUVmFA$u$BO%dk(S!`Jvn(MXt}4Z|6h;c;UhrF|9}>c1Hin#*Jo^0Qh*je zY)^*F7Vm-&wKl}}7qnOxxf*{eQSwXv<}7)3$NJV6 zX!&!rlwCVLm;Laz$);IkBNA z;|0*-YCAO%b?`_1)1bx2AuBH~yOT@(_Sc8h{w?;l0vbK`Hup^A3_zYJx(lwG*M zGkj-6#xB>e2P)d&HN*FWlx+CFCJbkF4S?WmqQvity7&={CErl_i>!sJD9)C8mJ5Ip z#ac3y*xNbv19*mgD>5>#iZ6qwA_{%zMYE`867F7g++x?Thr^2Ix}gCOoOb+>2fDet zK+6w?rH2*vH~SRHwDKNl;`hu0CANhW?q?U5rn3Tc5YOR{1%EJ(=D0Fk!@n9bVxBcT zk}R0SSrMb%*^~iF-OEz?AS{{o?!?HNey`95JU1P|B#l3~jtCDhz;V`Sp-eZN(~N z0et29(ZN!DI~tU-E|xupD=GT^fXo>)0OIBOg`VM$H|xKnukSj{ zv}J&?#ED%tK=PdMVw_wPYNzC;NZa$fWq=X|vt*f>lrrU88&Q-!#BBx zxna*Gac=8(09(;r10c8=n^xqb;gr6BtBEo1U^YRQ4A2Oig2D!1PV097KVdcuaA{;? zwyJo2B;tBr7Loa?Fo1`5N0m^-+%bTCK1x0pQ0Dji0}T1+K%1Qv>q9RZ!0_eWH-P`S z6ZrgKW!NE*)=|#@=*Ne@>%N&bfNK(Kswu|?@`46%*y>=r;<^%u39(xW`NXQobcTZw z3eMnA*vM>iXAMx{?S!JHE||=srVL<@<^mf$EnbB2(UWJ&08=J$v}^cp!y|p*efSwF zJgiLqcl2_C?OnriL3h`6m={vC!#9;SXd$=Q@SPam?wkR9Wo)StG2K){X7x~nmY*sf zN(AqqXYDCbw1kWm-iWyl-`+%-2uv3kvc7WKIB=L}*v}$U$|2WbJ3=xHKTyN1-4Q%n za2MOtgy#%+QiKN;Ds*SnFC_y+LJ_rn ze}2d{>>1DDH+g38%xD+!aK$xzt!vnWp5gnN-&62|UY=LHM*^oJ*Wq`&hCSV^EG(sW zB>1J7_abaq`}&4;@R6Ug%+DQLx2_6&f3Rrzoygtx9;}XGW8B#`DpLmPzm6cngk~I1 zv#QCOF#H2}UJw}FyKqgYC2PtoIjWgHMtA714lWijp?PJ}$s8)nWue5ClOG1VIo4K@bE% l5ClOG1VIo4K@c$a_&= 0.5 and len(buffer) > 0: + # If 500ms have passed, produce all buffered messages + for message in buffer: + producer.produce(topic=output_topic.name, + key=MYSQL_TABLE_NAME, + value=json.dumps(message)) + print("Message sent to Kafka") + # Flush the producer to send the messages + + # Clear the buffer + buffer = [] + # Update the last flush time + last_flush_time = current_time + + time.sleep(WAIT_INTERVAL) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + logger.info("Exiting.") + run = False + finally: + conn.close() + binlog_stream.close() + logger.info("Connection to MySQL closed") + logger.info("Exiting") \ No newline at end of file diff --git a/python/sources/mysql_cdc/mysql_helper.py b/python/sources/mysql_cdc/mysql_helper.py new file mode 100644 index 000000000..5ecb95e8a --- /dev/null +++ b/python/sources/mysql_cdc/mysql_helper.py @@ -0,0 +1,159 @@ +import pymysql +import os +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.row_event import ( + DeleteRowsEvent, + UpdateRowsEvent, + WriteRowsEvent, +) +from setup_logger import logger + +def connect_mysql(): + """Connect to MySQL database""" + MYSQL_HOST = os.environ["MYSQL_HOST"] + MYSQL_PORT = int(os.environ.get("MYSQL_PORT", 3306)) + MYSQL_USER = os.environ["MYSQL_USER"] + MYSQL_PASSWORD = os.environ["MYSQL_PASSWORD"] + MYSQL_DATABASE = os.environ["MYSQL_DATABASE"] + + conn = pymysql.connect( + host=MYSQL_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + database=MYSQL_DATABASE, + charset='utf8mb4' + ) + return conn + +def run_query(conn, query: str): + """Execute a query on MySQL""" + with conn.cursor() as cursor: + cursor.execute(query) + conn.commit() + +def enable_binlog_if_needed(): + """Check and enable binary logging if not already enabled""" + conn = connect_mysql() + try: + with conn.cursor() as cursor: + # Check if binary logging is enabled + cursor.execute("SHOW VARIABLES LIKE 'log_bin'") + result = cursor.fetchone() + + if result and result[1] == 'ON': + logger.info("Binary logging is already enabled") + else: + logger.warning("Binary logging is not enabled. Please enable it in MySQL configuration.") + logger.warning("Add the following to your MySQL config:") + logger.warning("log-bin=mysql-bin") + logger.warning("binlog-format=ROW") + raise Exception("Binary logging must be enabled for CDC") + + # Check binlog format + cursor.execute("SHOW VARIABLES LIKE 'binlog_format'") + result = cursor.fetchone() + + if result and result[1] != 'ROW': + logger.warning(f"Binlog format is {result[1]}, should be ROW for CDC") + logger.warning("Please set binlog_format=ROW in MySQL configuration") + + finally: + conn.close() + +def setup_mysql_cdc(table_name: str): + """Setup MySQL for CDC - mainly validation""" + conn = connect_mysql() + try: + with conn.cursor() as cursor: + # Check if table exists + cursor.execute(f"SHOW TABLES LIKE '{table_name.split('.')[-1]}'") + result = cursor.fetchone() + + if not result: + raise Exception(f"Table {table_name} not found") + + logger.info(f"Table {table_name} found and ready for CDC") + + finally: + conn.close() + +def create_binlog_stream(server_id=1): + """Create and return a MySQL binlog stream reader""" + MYSQL_HOST = os.environ["MYSQL_HOST"] + MYSQL_PORT = int(os.environ.get("MYSQL_PORT", 3306)) + MYSQL_USER = os.environ["MYSQL_USER"] + MYSQL_PASSWORD = os.environ["MYSQL_PASSWORD"] + + mysql_settings = { + "host": MYSQL_HOST, + "port": MYSQL_PORT, + "user": MYSQL_USER, + "passwd": MYSQL_PASSWORD, + } + + stream = BinLogStreamReader( + connection_settings=mysql_settings, + server_id=server_id, + only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], + resume_stream=True, + blocking=False + ) + + return stream + +def get_changes(stream, schema_name: str, table_name: str): + """Get changes from MySQL binlog stream""" + changes = [] + + # Read available events (non-blocking) + for binlogevent in stream: + # Filter by schema and table + if binlogevent.schema == schema_name and binlogevent.table == table_name: + + if isinstance(binlogevent, WriteRowsEvent): + # INSERT operation + for row in binlogevent.rows: + change = { + "kind": "insert", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": list(row["values"].keys()), + "columnvalues": list(row["values"].values()), + "oldkeys": {} + } + changes.append(change) + + elif isinstance(binlogevent, UpdateRowsEvent): + # UPDATE operation + for row in binlogevent.rows: + change = { + "kind": "update", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": list(row["after_values"].keys()), + "columnvalues": list(row["after_values"].values()), + "oldkeys": { + "keynames": list(row["before_values"].keys()), + "keyvalues": list(row["before_values"].values()) + } + } + changes.append(change) + + elif isinstance(binlogevent, DeleteRowsEvent): + # DELETE operation + for row in binlogevent.rows: + change = { + "kind": "delete", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": list(row["values"].keys()), + "keyvalues": list(row["values"].values()) + } + } + changes.append(change) + + return changes \ No newline at end of file diff --git a/python/sources/mysql_cdc/requirements.txt b/python/sources/mysql_cdc/requirements.txt new file mode 100644 index 000000000..529dc4ec2 --- /dev/null +++ b/python/sources/mysql_cdc/requirements.txt @@ -0,0 +1,4 @@ +quixstreams==2.9.0 +pymysql +mysql-replication +python-dotenv \ No newline at end of file diff --git a/python/sources/mysql_cdc/setup_logger.py b/python/sources/mysql_cdc/setup_logger.py new file mode 100644 index 000000000..24859698d --- /dev/null +++ b/python/sources/mysql_cdc/setup_logger.py @@ -0,0 +1,13 @@ +import logging + +# Set up logger +PROD_ENV = False +logger = logging.getLogger("MySQL CDC") +logging.basicConfig() + +if PROD_ENV: + logger.setLevel(logging.INFO) + logger.info("Running in Production Mode...") +else: + logger.setLevel(logging.DEBUG) + logger.info("Running in Debug Mode...") \ No newline at end of file From 53d008c2d810112edad22d86a3336fa0b931f9af Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Tue, 3 Jun 2025 17:05:30 -0400 Subject: [PATCH 2/4] Add support for binlog state --- python/sources/mysql_cdc/README.md | 129 ++++++++++++- python/sources/mysql_cdc/README_MYSQL_CDC.md | 184 +++++++++++++++++-- python/sources/mysql_cdc/app.yaml | 17 +- python/sources/mysql_cdc/main.py | 106 ++++++++++- python/sources/mysql_cdc/mysql_helper.py | 171 +++++++++++++++-- 5 files changed, 561 insertions(+), 46 deletions(-) diff --git a/python/sources/mysql_cdc/README.md b/python/sources/mysql_cdc/README.md index 586887896..4e039a931 100644 --- a/python/sources/mysql_cdc/README.md +++ b/python/sources/mysql_cdc/README.md @@ -1,6 +1,14 @@ # MySQL CDC -This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. +This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It features **persistent binlog position tracking** to ensure exactly-once processing and automatic recovery after restarts. + +## Key Features + +- **Persistent Binlog Position**: Automatically saves and resumes from the last processed binlog position +- **Exactly-Once Processing**: No data loss during application restarts or failures +- **Initial Snapshot**: Optionally capture existing data before starting CDC +- **Automatic Recovery**: Seamlessly resume processing after interruptions +- **Change Buffering**: Batches changes for efficient Kafka publishing ## How to run @@ -11,8 +19,7 @@ This connector demonstrates how to capture changes to a MySQL database table (us ## Environment variables -The connector uses the following environment variables: - +### Required MySQL Connection - **output**: Name of the output topic to write into. - **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server. - **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306). @@ -22,11 +29,127 @@ The connector uses the following environment variables: - **MYSQL_SCHEMA**: The name of the schema/database for CDC (same as MYSQL_DATABASE). - **MYSQL_TABLE**: The name of the table for CDC. +### Optional Configuration +- **MYSQL_SNAPSHOT_HOST**: MySQL host for initial snapshot (defaults to MYSQL_HOST if not set). Use this if you want to perform initial snapshot from a different MySQL instance (e.g., read replica). +- **INITIAL_SNAPSHOT**: Set to "true" to perform initial snapshot of existing data (default: false). +- **SNAPSHOT_BATCH_SIZE**: Number of rows to process in each snapshot batch (default: 1000). +- **FORCE_SNAPSHOT**: Set to "true" to force snapshot even if already completed (default: false). + +### State Management +- **Quix__State__Dir**: Directory for storing application state including binlog positions (default: "state"). + +## Binlog Position Persistence + +The connector automatically tracks the MySQL binlog position and saves it to disk after successful Kafka delivery. This ensures: + +- **No data loss** during application restarts +- **Exactly-once processing** of database changes +- **Automatic resumption** from the last processed position + +Position files are stored in: `{STATE_DIR}/binlog_position_{schema}_{table}.json` + +Example position file: +```json +{ + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0, + "readable_time": "2024-01-01 12:00:00 UTC" +} +``` + +## Initial Snapshot + +Enable initial snapshot to capture existing table data before starting CDC: + +```env +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional: use read replica +``` + +The initial snapshot: +- Processes data in configurable batches to avoid memory issues +- Sends snapshot records with `"kind": "snapshot_insert"` to distinguish from real inserts +- Marks completion to avoid re-processing on restart +- Can be forced to re-run with `FORCE_SNAPSHOT=true` + ## Requirements / Prerequisites - A MySQL Database with binary logging enabled. - Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration. - MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges. +- For initial snapshot: `SELECT` privilege on the target table. + +### MySQL Configuration Example +```ini +[mysqld] +server-id = 1 +log_bin = /var/log/mysql/mysql-bin.log +binlog_expire_logs_seconds = 864000 +max_binlog_size = 100M +binlog-format = ROW +binlog_row_metadata = FULL +binlog_row_image = FULL +``` + +### MySQL User Permissions +```sql +-- Create replication user +CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password'; + +-- Grant replication privileges +GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; + +-- Grant select for initial snapshot +GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%'; + +FLUSH PRIVILEGES; +``` + +## Change Event Format + +### INSERT/Snapshot Insert +```json +{ + "kind": "insert", // or "snapshot_insert" for initial snapshot + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "John Doe"], + "oldkeys": {} +} +``` + +### UPDATE +```json +{ + "kind": "update", + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "Jane Doe"], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "John Doe"] + } +} +``` + +### DELETE +```json +{ + "kind": "delete", + "schema": "database_name", + "table": "table_name", + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "Jane Doe"] + } +} +``` ## Contribute diff --git a/python/sources/mysql_cdc/README_MYSQL_CDC.md b/python/sources/mysql_cdc/README_MYSQL_CDC.md index fe73f3152..521d69bcc 100644 --- a/python/sources/mysql_cdc/README_MYSQL_CDC.md +++ b/python/sources/mysql_cdc/README_MYSQL_CDC.md @@ -1,21 +1,42 @@ # MySQL CDC Setup -This application implements MySQL CDC using MySQL binary log replication. +This application implements MySQL CDC using MySQL binary log replication with **persistent binlog position tracking** for exactly-once processing and automatic recovery. + +## Key Features + +- **Persistent Binlog Position**: Automatically saves and resumes from the last processed binlog position +- **Exactly-Once Processing**: No data loss during application restarts or failures +- **Initial Snapshot**: Optionally capture existing data before starting CDC +- **Automatic Recovery**: Seamlessly resume processing after interruptions +- **Change Buffering**: Batches changes for efficient Kafka publishing +- **State Management**: Integrated state persistence for production reliability ## Prerequisites 1. **MySQL Configuration**: Your MySQL server must have binary logging enabled with ROW format: ```ini # Add to MySQL configuration file (my.cnf or my.ini) - log-bin=mysql-bin - binlog-format=ROW - server-id=1 + [mysqld] + server-id = 1 + log_bin = /var/log/mysql/mysql-bin.log + binlog_expire_logs_seconds = 864000 + max_binlog_size = 100M + binlog-format = ROW + binlog_row_metadata = FULL + binlog_row_image = FULL ``` 2. **MySQL User Permissions**: The MySQL user needs REPLICATION SLAVE and REPLICATION CLIENT privileges: ```sql - GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'your_user'@'%'; - GRANT SELECT ON your_database.your_table TO 'your_user'@'%'; + -- Create replication user + CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password'; + + -- Grant replication privileges for CDC + GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; + + -- Grant select for initial snapshot (if using snapshot feature) + GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%'; + FLUSH PRIVILEGES; ``` @@ -23,7 +44,7 @@ This application implements MySQL CDC using MySQL binary log replication. Set the following environment variables: -### MySQL Connection +### Required MySQL Connection - `MYSQL_HOST` - MySQL server hostname (e.g., localhost) - `MYSQL_PORT` - MySQL server port (default: 3306) - `MYSQL_USER` - MySQL username @@ -32,7 +53,16 @@ Set the following environment variables: - `MYSQL_SCHEMA` - MySQL database name (same as MYSQL_DATABASE) - `MYSQL_TABLE` - Table name to monitor for changes -### Kafka Output (unchanged) +### Optional Configuration +- `MYSQL_SNAPSHOT_HOST` - MySQL host for initial snapshot (defaults to MYSQL_HOST). Use this to snapshot from a read replica +- `INITIAL_SNAPSHOT` - Set to "true" to perform initial snapshot (default: false) +- `SNAPSHOT_BATCH_SIZE` - Rows per snapshot batch (default: 1000) +- `FORCE_SNAPSHOT` - Set to "true" to force re-snapshot (default: false) + +### State Management +- `Quix__State__Dir` - Directory for storing state files (default: "state") + +### Kafka Output - `output` - Kafka topic name for publishing changes ## Example .env file @@ -41,16 +71,80 @@ Set the following environment variables: # MySQL Connection MYSQL_HOST=localhost MYSQL_PORT=3306 -MYSQL_USER=replication_user -MYSQL_PASSWORD=your_password +MYSQL_USER=cdc_user +MYSQL_PASSWORD=secure_password MYSQL_DATABASE=your_database MYSQL_SCHEMA=your_database MYSQL_TABLE=your_table +# Optional: Use read replica for initial snapshot +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com + +# Initial Snapshot Configuration +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +FORCE_SNAPSHOT=false + +# State Management +Quix__State__Dir=./state + # Kafka Output output=cdc-changes-topic ``` +## Binlog Position Persistence + +The application automatically tracks MySQL binlog positions and persists them to disk: + +### How it works: +1. **Position Tracking**: Records current binlog file and position during processing +2. **Automatic Saving**: Saves position after successful Kafka delivery +3. **Recovery**: Automatically resumes from last saved position on restart +4. **Exactly-Once**: Ensures no data loss or duplication + +### Position Storage: +- Location: `{STATE_DIR}/binlog_position_{schema}_{table}.json` +- Format: + ```json + { + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0, + "readable_time": "2024-01-01 12:00:00 UTC" + } + ``` + +### Benefits: +- ✅ **No data loss** during application restarts +- ✅ **Exactly-once processing** of database changes +- ✅ **Automatic recovery** from last processed position +- ✅ **Production-ready** state management + +## Initial Snapshot + +Capture existing table data before starting real-time CDC: + +### Configuration: +```env +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional +``` + +### Features: +- **Batched Processing**: Configurable batch sizes to handle large tables +- **Memory Efficient**: Processes data in chunks to avoid memory issues +- **Read Replica Support**: Use `MYSQL_SNAPSHOT_HOST` to snapshot from replica +- **Completion Tracking**: Marks snapshot completion to avoid re-processing +- **Force Re-snapshot**: Use `FORCE_SNAPSHOT=true` to re-run if needed + +### Snapshot Process: +1. Connects to snapshot host (or main host if not specified) +2. Processes table data in batches +3. Sends records with `"kind": "snapshot_insert"` +4. Marks completion in state file +5. Proceeds to real-time CDC + ## Dependencies Install the required Python packages: @@ -66,6 +160,18 @@ The key MySQL-specific dependencies are: The MySQL CDC produces change events in the following format: +### Snapshot Insert Event +```json +{ + "kind": "snapshot_insert", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["value1", "value2"], + "oldkeys": {} +} +``` + ### INSERT Event ```json { @@ -110,15 +216,57 @@ The MySQL CDC produces change events in the following format: ## Running the Application -1. Ensure MySQL is configured with binary logging -2. Set environment variables -3. Run the application: +1. **Configure MySQL** with binary logging enabled +2. **Set environment variables** (see example above) +3. **Run the application**: ```bash python main.py ``` -The application will: -1. Connect to MySQL and validate binary logging is enabled -2. Create a binary log stream reader -3. Monitor the specified table for changes -4. Buffer changes and publish them to Kafka every 500ms \ No newline at end of file +### Application Flow: +1. **Load State**: Attempts to load saved binlog position +2. **Initial Snapshot** (if enabled and not completed): + - Connects to snapshot host + - Processes existing data in batches + - Sends snapshot events to Kafka + - Marks completion +3. **Real-time CDC**: + - Connects to MySQL binlog stream + - Resumes from saved position (or current if first run) + - Monitors specified table for changes + - Buffers changes and publishes to Kafka every 500ms + - Saves binlog position after successful delivery +4. **Recovery**: On restart, automatically resumes from last saved position + +### Monitoring: +- Check application logs for binlog position updates +- Monitor state directory for position files +- Verify Kafka topic for change events +- Use MySQL's `SHOW MASTER STATUS` to compare positions + +## Troubleshooting + +### Common Issues: + +1. **Binary logging not enabled**: + - Error: "Binary logging must be enabled for CDC" + - Solution: Enable binlog in MySQL configuration and restart + +2. **Insufficient privileges**: + - Error: Access denied + - Solution: Grant REPLICATION SLAVE, REPLICATION CLIENT privileges + +3. **Position file corruption**: + - Delete position file to restart from current position + - Location: `{STATE_DIR}/binlog_position_{schema}_{table}.json` + +4. **Snapshot issues**: + - Check `MYSQL_SNAPSHOT_HOST` connectivity + - Verify SELECT privileges on target table + - Review batch size for memory constraints + +### Best Practices: +- Use read replicas for initial snapshots on large tables +- Monitor disk space for state directory +- Set appropriate `SNAPSHOT_BATCH_SIZE` based on available memory +- Regularly backup state files for disaster recovery \ No newline at end of file diff --git a/python/sources/mysql_cdc/app.yaml b/python/sources/mysql_cdc/app.yaml index b7fdd2434..93da18b86 100644 --- a/python/sources/mysql_cdc/app.yaml +++ b/python/sources/mysql_cdc/app.yaml @@ -1,4 +1,4 @@ -name: video-static-feature +name: video-cdc-source language: Python variables: - name: output @@ -10,6 +10,10 @@ variables: inputType: FreeText description: Host name of MySQL required: true + - name: MYSQL_SNAPSHOT_HOST + inputType: FreeText + description: Host name of MySQL for snapshot + required: false - name: MYSQL_PORT inputType: FreeText description: Port of MySQL @@ -35,6 +39,17 @@ variables: inputType: FreeText description: Name of table for CDC required: true + - name: INITIAL_SNAPSHOT + inputType: FreeText + description: Whether to perform an initial snapshot + required: false + defaultValue: false + - name: SNAPSHOT_BATCH_SIZE + inputType: FreeText + description: Batch size for initial snapshot + required: false + defaultValue: 1000 + dockerfile: dockerfile runEntryPoint: main.py defaultFile: main.py diff --git a/python/sources/mysql_cdc/main.py b/python/sources/mysql_cdc/main.py index 00a0fb979..c81044c14 100644 --- a/python/sources/mysql_cdc/main.py +++ b/python/sources/mysql_cdc/main.py @@ -3,7 +3,7 @@ import os import json from setup_logger import logger -from mysql_helper import connect_mysql, enable_binlog_if_needed, setup_mysql_cdc, create_binlog_stream, get_changes +from mysql_helper import connect_mysql, enable_binlog_if_needed, setup_mysql_cdc, create_binlog_stream, get_changes, perform_initial_snapshot, save_binlog_position # Load environment variables (useful when working locally) from dotenv import load_dotenv @@ -15,6 +15,50 @@ MYSQL_TABLE_NAME = f"{MYSQL_SCHEMA}.{MYSQL_TABLE}" WAIT_INTERVAL = 0.1 +# Initial snapshot configuration +INITIAL_SNAPSHOT = os.getenv("INITIAL_SNAPSHOT", "false").lower() == "true" +SNAPSHOT_BATCH_SIZE = int(os.getenv("SNAPSHOT_BATCH_SIZE", "1000")) +FORCE_SNAPSHOT = os.getenv("FORCE_SNAPSHOT", "false").lower() == "true" + +# State management - use Quix state dir if available, otherwise default to "state" +STATE_DIR = os.getenv("Quix__State__Dir", "state") +SNAPSHOT_STATE_FILE = os.path.join(STATE_DIR, f"snapshot_completed_{MYSQL_SCHEMA}_{MYSQL_TABLE}.flag") + +def ensure_state_dir(): + """Create state directory if it doesn't exist""" + if not os.path.exists(STATE_DIR): + os.makedirs(STATE_DIR) + logger.info(f"Created state directory: {STATE_DIR}") + +def is_snapshot_completed(): + """Check if initial snapshot has been completed""" + return os.path.exists(SNAPSHOT_STATE_FILE) and not FORCE_SNAPSHOT + +def mark_snapshot_completed(): + """Mark initial snapshot as completed""" + ensure_state_dir() + with open(SNAPSHOT_STATE_FILE, 'w') as f: + f.write(json.dumps({ + "completed_at": time.time(), + "schema": MYSQL_SCHEMA, + "table": MYSQL_TABLE, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()) + })) + logger.info(f"Snapshot completion marked in: {SNAPSHOT_STATE_FILE}") + +def get_snapshot_info(): + """Get information about when snapshot was completed""" + if os.path.exists(SNAPSHOT_STATE_FILE): + try: + with open(SNAPSHOT_STATE_FILE, 'r') as f: + return json.loads(f.read()) + except: + return None + return None + +# Create a Quix Application, this manages the connection to the Quix platform +app = Application() + # Connect to MySQL and set up CDC try: enable_binlog_if_needed() @@ -23,15 +67,12 @@ binlog_stream = create_binlog_stream() logger.info("MySQL CDC CONNECTED!") except Exception as e: - logger.info(f"ERROR! - {e}") + logger.error(f"ERROR! - {e}") raise # should the main loop run? run = True -# Create a Quix Application, this manages the connection to the Quix platform -app = Application() - # Create the producer, this is used to write data to the output topic producer = app.get_producer() @@ -47,12 +88,51 @@ def main(): buffer = [] last_flush_time = time.time() + # Perform initial snapshot if enabled and not already completed + if INITIAL_SNAPSHOT: + if is_snapshot_completed(): + snapshot_info = get_snapshot_info() + if FORCE_SNAPSHOT: + logger.info("Initial snapshot already completed but FORCE_SNAPSHOT=true - performing snapshot again...") + else: + logger.info(f"Initial snapshot already completed at {snapshot_info.get('timestamp', 'unknown time')} - skipping") + else: + logger.info("Initial snapshot is enabled and not yet completed - performing snapshot...") + + if not is_snapshot_completed(): + try: + snapshot_changes = perform_initial_snapshot(MYSQL_SCHEMA, MYSQL_TABLE, SNAPSHOT_BATCH_SIZE) + + # Send snapshot data to Kafka immediately + for change in snapshot_changes: + producer.produce(topic=output_topic.name, + key=MYSQL_TABLE_NAME, + value=json.dumps(change)) + + # Flush to ensure all snapshot data is sent + producer.flush() + logger.info(f"Initial snapshot completed - {len(snapshot_changes)} records sent to Kafka") + + # Mark snapshot as completed + mark_snapshot_completed() + + except Exception as e: + logger.error(f"Failed to perform initial snapshot: {e}") + raise + else: + logger.info("Initial snapshot is disabled - starting CDC stream only") + + # Start CDC loop while run: # Get changes from MySQL binlog changes = get_changes(binlog_stream, MYSQL_SCHEMA, MYSQL_TABLE) for change in changes: buffer.append(change) - + + if len(buffer) > 0: + print(f"Buffer length: {len(buffer)}") + print(f"Buffer: {buffer}") + # Check if 500 milliseconds have passed current_time = time.time() if (current_time - last_flush_time) >= 0.5 and len(buffer) > 0: @@ -62,7 +142,13 @@ def main(): key=MYSQL_TABLE_NAME, value=json.dumps(message)) print("Message sent to Kafka") - # Flush the producer to send the messages + + # Flush the producer to send the messages + producer.flush() + + # Save binlog position after successful send + if hasattr(binlog_stream, 'log_file') and hasattr(binlog_stream, 'log_pos'): + save_binlog_position(binlog_stream.log_file, binlog_stream.log_pos) # Clear the buffer buffer = [] @@ -79,7 +165,9 @@ def main(): logger.info("Exiting.") run = False finally: - conn.close() - binlog_stream.close() + if 'conn' in locals(): + conn.close() + if 'binlog_stream' in locals(): + binlog_stream.close() logger.info("Connection to MySQL closed") logger.info("Exiting") \ No newline at end of file diff --git a/python/sources/mysql_cdc/mysql_helper.py b/python/sources/mysql_cdc/mysql_helper.py index 5ecb95e8a..3a477da30 100644 --- a/python/sources/mysql_cdc/mysql_helper.py +++ b/python/sources/mysql_cdc/mysql_helper.py @@ -1,5 +1,7 @@ import pymysql import os +import json +import time from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( DeleteRowsEvent, @@ -8,9 +10,27 @@ ) from setup_logger import logger -def connect_mysql(): +def serialize_value(value): + """Convert a value to JSON-serializable format""" + if value is None: + return None + elif isinstance(value, (bytes, bytearray)): + # Convert binary data to base64 string + import base64 + return base64.b64encode(value).decode('utf-8') + elif hasattr(value, 'isoformat'): + # Handle datetime, date, time objects + return value.isoformat() + elif isinstance(value, (int, float, str, bool)): + # Already JSON serializable + return value + else: + # Convert other types to string + return str(value) + +def connect_mysql(host=None): """Connect to MySQL database""" - MYSQL_HOST = os.environ["MYSQL_HOST"] + MYSQL_HOST = host or os.environ["MYSQL_HOST"] MYSQL_PORT = int(os.environ.get("MYSQL_PORT", 3306)) MYSQL_USER = os.environ["MYSQL_USER"] MYSQL_PASSWORD = os.environ["MYSQL_PASSWORD"] @@ -78,8 +98,47 @@ def setup_mysql_cdc(table_name: str): finally: conn.close() +# Binlog position management +def get_binlog_position_file(): + """Get the path to the binlog position file""" + state_dir = os.getenv("Quix__State__Dir", "state") + if not os.path.exists(state_dir): + os.makedirs(state_dir) + schema = os.environ["MYSQL_SCHEMA"] + table = os.environ["MYSQL_TABLE"] + return os.path.join(state_dir, f"binlog_position_{schema}_{table}.json") + +def save_binlog_position(log_file, log_pos): + """Save the current binlog position to disk""" + position_file = get_binlog_position_file() + position_data = { + "log_file": log_file, + "log_pos": log_pos, + "timestamp": time.time(), + "readable_time": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()) + } + try: + with open(position_file, 'w') as f: + json.dump(position_data, f, indent=2) + logger.debug(f"Saved binlog position: {log_file}:{log_pos}") + except Exception as e: + logger.error(f"Failed to save binlog position: {e}") + +def load_binlog_position(): + """Load the last saved binlog position from disk""" + position_file = get_binlog_position_file() + if os.path.exists(position_file): + try: + with open(position_file, 'r') as f: + position_data = json.load(f) + logger.info(f"Loaded binlog position: {position_data['log_file']}:{position_data['log_pos']} from {position_data.get('readable_time', 'unknown time')}") + return position_data['log_file'], position_data['log_pos'] + except Exception as e: + logger.error(f"Failed to load binlog position: {e}") + return None, None + def create_binlog_stream(server_id=1): - """Create and return a MySQL binlog stream reader""" + """Create and return a MySQL binlog stream reader with position resumption""" MYSQL_HOST = os.environ["MYSQL_HOST"] MYSQL_PORT = int(os.environ.get("MYSQL_PORT", 3306)) MYSQL_USER = os.environ["MYSQL_USER"] @@ -92,22 +151,39 @@ def create_binlog_stream(server_id=1): "passwd": MYSQL_PASSWORD, } - stream = BinLogStreamReader( - connection_settings=mysql_settings, - server_id=server_id, - only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], - resume_stream=True, - blocking=False - ) + # Load saved binlog position + log_file, log_pos = load_binlog_position() + + stream_kwargs = { + "connection_settings": mysql_settings, + "server_id": server_id, + "only_events": [DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], + "resume_stream": True, + "blocking": False + } + + # If we have a saved position, use it + if log_file and log_pos: + stream_kwargs["log_file"] = log_file + stream_kwargs["log_pos"] = log_pos + logger.info(f"Resuming binlog stream from saved position: {log_file}:{log_pos}") + else: + logger.info("No saved binlog position found, starting from current position") + stream = BinLogStreamReader(**stream_kwargs) return stream def get_changes(stream, schema_name: str, table_name: str): - """Get changes from MySQL binlog stream""" + """Get changes from MySQL binlog stream and save position after processing""" changes = [] + last_position = None # Read available events (non-blocking) for binlogevent in stream: + # Update position tracking + if hasattr(stream, 'log_file') and hasattr(stream, 'log_pos'): + last_position = (stream.log_file, stream.log_pos) + # Filter by schema and table if binlogevent.schema == schema_name and binlogevent.table == table_name: @@ -119,7 +195,7 @@ def get_changes(stream, schema_name: str, table_name: str): "schema": binlogevent.schema, "table": binlogevent.table, "columnnames": list(row["values"].keys()), - "columnvalues": list(row["values"].values()), + "columnvalues": [serialize_value(v) for v in row["values"].values()], "oldkeys": {} } changes.append(change) @@ -132,10 +208,10 @@ def get_changes(stream, schema_name: str, table_name: str): "schema": binlogevent.schema, "table": binlogevent.table, "columnnames": list(row["after_values"].keys()), - "columnvalues": list(row["after_values"].values()), + "columnvalues": [serialize_value(v) for v in row["after_values"].values()], "oldkeys": { "keynames": list(row["before_values"].keys()), - "keyvalues": list(row["before_values"].values()) + "keyvalues": [serialize_value(v) for v in row["before_values"].values()] } } changes.append(change) @@ -151,9 +227,74 @@ def get_changes(stream, schema_name: str, table_name: str): "columnvalues": [], "oldkeys": { "keynames": list(row["values"].keys()), - "keyvalues": list(row["values"].values()) + "keyvalues": [serialize_value(v) for v in row["values"].values()] } } changes.append(change) + # Save position if we processed any events + if last_position and changes: + save_binlog_position(last_position[0], last_position[1]) + + return changes + +def perform_initial_snapshot(schema_name: str, table_name: str, batch_size: int = 1000): + """Perform initial snapshot of the table and return all existing rows as insert events""" + conn = connect_mysql(os.environ["MYSQL_SNAPSHOT_HOST"]) + changes = [] + + try: + with conn.cursor() as cursor: + # Get total row count for logging + cursor.execute(f"SELECT COUNT(*) FROM `{schema_name}`.`{table_name}`") + total_rows = cursor.fetchone()[0] + logger.info(f"Starting initial snapshot of {schema_name}.{table_name} - {total_rows} rows") + + # Use LIMIT with OFFSET for batching to avoid memory issues with large tables + offset = 0 + processed_rows = 0 + + while True: + # Fetch batch of rows + cursor.execute(f"SELECT * FROM `{schema_name}`.`{table_name}` LIMIT {batch_size} OFFSET {offset}") + rows = cursor.fetchall() + + if not rows: + break + + # Get column names + column_names = [desc[0] for desc in cursor.description] + + # Convert each row to a change event + for row in rows: + # Convert row tuple to dictionary + row_dict = dict(zip(column_names, row)) + + # Convert values to JSON-serializable format + serialized_values = [serialize_value(value) for value in row_dict.values()] + + change = { + "kind": "snapshot_insert", # Different kind to distinguish from real inserts + "schema": schema_name, + "table": table_name, + "columnnames": column_names, + "columnvalues": serialized_values, + "oldkeys": {} + } + changes.append(change) + + processed_rows += len(rows) + offset += batch_size + + if processed_rows % 10000 == 0: # Log progress every 10k rows + logger.info(f"Snapshot progress: {processed_rows}/{total_rows} rows processed") + + logger.info(f"Initial snapshot completed: {processed_rows} rows captured") + + except Exception as e: + logger.error(f"Error during initial snapshot: {e}") + raise + finally: + conn.close() + return changes \ No newline at end of file From e08918fad0a12e109d7fc795229e6110d8a335ee Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Wed, 4 Jun 2025 15:39:58 -0400 Subject: [PATCH 3/4] Change to using stateful source instead --- python/sources/mysql_cdc/app.yaml | 56 ---- python/sources/mysql_cdc/main.py | 338 ++++++++++++++-------- python/sources/mysql_cdc/mysql_helper.py | 2 +- python/sources/mysql_cdc/requirements.txt | 2 +- python/sources/mysql_cdc/setup_logger.py | 2 +- 5 files changed, 214 insertions(+), 186 deletions(-) delete mode 100644 python/sources/mysql_cdc/app.yaml diff --git a/python/sources/mysql_cdc/app.yaml b/python/sources/mysql_cdc/app.yaml deleted file mode 100644 index 93da18b86..000000000 --- a/python/sources/mysql_cdc/app.yaml +++ /dev/null @@ -1,56 +0,0 @@ -name: video-cdc-source -language: Python -variables: - - name: output - inputType: OutputTopic - description: This is the output topic - defaultValue: none - required: true - - name: MYSQL_HOST - inputType: FreeText - description: Host name of MySQL - required: true - - name: MYSQL_SNAPSHOT_HOST - inputType: FreeText - description: Host name of MySQL for snapshot - required: false - - name: MYSQL_PORT - inputType: FreeText - description: Port of MySQL - defaultValue: 3306 - required: true - - name: MYSQL_USER - inputType: FreeText - description: Username of MySQL - required: true - - name: MYSQL_PASSWORD - inputType: HiddenText - description: Password of MySQL - required: true - - name: MYSQL_DATABASE - inputType: FreeText - description: Database name of MySQL - required: true - - name: MYSQL_SCHEMA - inputType: FreeText - description: Name of schema/database for CDC - required: true - - name: MYSQL_TABLE - inputType: FreeText - description: Name of table for CDC - required: true - - name: INITIAL_SNAPSHOT - inputType: FreeText - description: Whether to perform an initial snapshot - required: false - defaultValue: false - - name: SNAPSHOT_BATCH_SIZE - inputType: FreeText - description: Batch size for initial snapshot - required: false - defaultValue: 1000 - -dockerfile: dockerfile -runEntryPoint: main.py -defaultFile: main.py -libraryItemId: mysql-cdc-source diff --git a/python/sources/mysql_cdc/main.py b/python/sources/mysql_cdc/main.py index c81044c14..7d652e2b4 100644 --- a/python/sources/mysql_cdc/main.py +++ b/python/sources/mysql_cdc/main.py @@ -1,173 +1,257 @@ from quixstreams import Application +from quixstreams.sources.base import StatefulSource import time import os import json from setup_logger import logger -from mysql_helper import connect_mysql, enable_binlog_if_needed, setup_mysql_cdc, create_binlog_stream, get_changes, perform_initial_snapshot, save_binlog_position +from mysql_helper import connect_mysql, enable_binlog_if_needed, setup_mysql_cdc, create_binlog_stream, get_changes, perform_initial_snapshot # Load environment variables (useful when working locally) from dotenv import load_dotenv load_dotenv() -# Global Variables -MYSQL_SCHEMA = os.environ["MYSQL_SCHEMA"] # MySQL database name -MYSQL_TABLE = os.environ["MYSQL_TABLE"] # MySQL table name -MYSQL_TABLE_NAME = f"{MYSQL_SCHEMA}.{MYSQL_TABLE}" -WAIT_INTERVAL = 0.1 - -# Initial snapshot configuration -INITIAL_SNAPSHOT = os.getenv("INITIAL_SNAPSHOT", "false").lower() == "true" -SNAPSHOT_BATCH_SIZE = int(os.getenv("SNAPSHOT_BATCH_SIZE", "1000")) -FORCE_SNAPSHOT = os.getenv("FORCE_SNAPSHOT", "false").lower() == "true" - -# State management - use Quix state dir if available, otherwise default to "state" -STATE_DIR = os.getenv("Quix__State__Dir", "state") -SNAPSHOT_STATE_FILE = os.path.join(STATE_DIR, f"snapshot_completed_{MYSQL_SCHEMA}_{MYSQL_TABLE}.flag") +class MySqlCdcSource(StatefulSource): + def __init__(self, name: str = "mysql-cdc-source"): + super().__init__(name=name) + + # Load configuration from environment variables + self.mysql_schema = os.environ["MYSQL_SCHEMA"] # MySQL database name + self.mysql_table = os.environ["MYSQL_TABLE"] # MySQL table name + self.mysql_table_name = f"{self.mysql_schema}.{self.mysql_table}" + self.wait_interval = 0.1 + + # Initial snapshot configuration + self.initial_snapshot = os.getenv("INITIAL_SNAPSHOT", "false").lower() == "true" + self.snapshot_batch_size = int(os.getenv("SNAPSHOT_BATCH_SIZE", "1000")) + self.force_snapshot = os.getenv("FORCE_SNAPSHOT", "false").lower() == "true" + + # Connection objects - will be initialized in setup() + self.conn = None + self.binlog_stream = None + + # Message buffering + self.buffer = [] + self.last_flush_time = time.time() + self.flush_interval = 0.5 # 500ms -def ensure_state_dir(): - """Create state directory if it doesn't exist""" - if not os.path.exists(STATE_DIR): - os.makedirs(STATE_DIR) - logger.info(f"Created state directory: {STATE_DIR}") + def setup(self): + """Initialize MySQL connection and CDC setup""" + try: + enable_binlog_if_needed() + setup_mysql_cdc(self.mysql_table) + self.conn = connect_mysql() + self.binlog_stream = create_binlog_stream() + logger.info("MySQL CDC CONNECTED!") + except Exception as e: + logger.error(f"ERROR during MySQL CDC setup - {e}") + raise -def is_snapshot_completed(): - """Check if initial snapshot has been completed""" - return os.path.exists(SNAPSHOT_STATE_FILE) and not FORCE_SNAPSHOT + def is_snapshot_completed(self): + """Check if initial snapshot has been completed using state store""" + snapshot_key = f"snapshot_completed_{self.mysql_schema}_{self.mysql_table}" + return self.state.get(snapshot_key, False) and not self.force_snapshot -def mark_snapshot_completed(): - """Mark initial snapshot as completed""" - ensure_state_dir() - with open(SNAPSHOT_STATE_FILE, 'w') as f: - f.write(json.dumps({ + def mark_snapshot_completed(self): + """Mark initial snapshot as completed in state store""" + snapshot_key = f"snapshot_completed_{self.mysql_schema}_{self.mysql_table}" + snapshot_info = { "completed_at": time.time(), - "schema": MYSQL_SCHEMA, - "table": MYSQL_TABLE, + "schema": self.mysql_schema, + "table": self.mysql_table, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()) - })) - logger.info(f"Snapshot completion marked in: {SNAPSHOT_STATE_FILE}") - -def get_snapshot_info(): - """Get information about when snapshot was completed""" - if os.path.exists(SNAPSHOT_STATE_FILE): - try: - with open(SNAPSHOT_STATE_FILE, 'r') as f: - return json.loads(f.read()) - except: - return None - return None - -# Create a Quix Application, this manages the connection to the Quix platform -app = Application() - -# Connect to MySQL and set up CDC -try: - enable_binlog_if_needed() - setup_mysql_cdc(MYSQL_TABLE) - conn = connect_mysql() - binlog_stream = create_binlog_stream() - logger.info("MySQL CDC CONNECTED!") -except Exception as e: - logger.error(f"ERROR! - {e}") - raise + } + self.state.set(snapshot_key, True) + self.state.set(f"snapshot_info_{self.mysql_schema}_{self.mysql_table}", snapshot_info) + logger.info(f"Snapshot completion marked in state store for {self.mysql_table_name}") -# should the main loop run? -run = True + def get_snapshot_info(self): + """Get information about when snapshot was completed""" + info_key = f"snapshot_info_{self.mysql_schema}_{self.mysql_table}" + return self.state.get(info_key, None) -# Create the producer, this is used to write data to the output topic -producer = app.get_producer() + def save_binlog_position(self, log_file, log_pos): + """Save binlog position to state store""" + binlog_key = f"binlog_position_{self.mysql_schema}_{self.mysql_table}" + position_info = { + "log_file": log_file, + "log_pos": log_pos, + "timestamp": time.time() + } + self.state.set(binlog_key, position_info) -# Check the output topic is configured -output_topic_name = os.getenv("output", "") -if output_topic_name == "": - raise ValueError("output_topic environment variable is required") -output_topic = app.topic(output_topic_name) + def get_binlog_position(self): + """Get saved binlog position from state store""" + binlog_key = f"binlog_position_{self.mysql_schema}_{self.mysql_table}" + return self.state.get(binlog_key, None) -# get data from MySQL binlog and publish it to kafka -# to reduce network traffic, we buffer the messages for 500ms -def main(): - buffer = [] - last_flush_time = time.time() - - # Perform initial snapshot if enabled and not already completed - if INITIAL_SNAPSHOT: - if is_snapshot_completed(): - snapshot_info = get_snapshot_info() - if FORCE_SNAPSHOT: + def perform_initial_snapshot_if_needed(self): + """Perform initial snapshot if enabled and not already completed""" + if not self.initial_snapshot: + logger.info("Initial snapshot is disabled - starting CDC stream only") + return + + if self.is_snapshot_completed(): + snapshot_info = self.get_snapshot_info() + if self.force_snapshot: logger.info("Initial snapshot already completed but FORCE_SNAPSHOT=true - performing snapshot again...") else: logger.info(f"Initial snapshot already completed at {snapshot_info.get('timestamp', 'unknown time')} - skipping") + return else: logger.info("Initial snapshot is enabled and not yet completed - performing snapshot...") - - if not is_snapshot_completed(): + + if not self.is_snapshot_completed() or self.force_snapshot: try: - snapshot_changes = perform_initial_snapshot(MYSQL_SCHEMA, MYSQL_TABLE, SNAPSHOT_BATCH_SIZE) + snapshot_changes = perform_initial_snapshot( + self.mysql_schema, + self.mysql_table, + self.snapshot_batch_size + ) # Send snapshot data to Kafka immediately for change in snapshot_changes: - producer.produce(topic=output_topic.name, - key=MYSQL_TABLE_NAME, - value=json.dumps(change)) + msg = self.serialize( + key=self.mysql_table_name, + value=change + ) + self.produce( + key=msg.key, + value=msg.value, + ) - # Flush to ensure all snapshot data is sent - producer.flush() + # Flush to ensure all snapshot data is sent and commit state + self.flush() logger.info(f"Initial snapshot completed - {len(snapshot_changes)} records sent to Kafka") # Mark snapshot as completed - mark_snapshot_completed() + self.mark_snapshot_completed() + # Flush again to save the snapshot completion state + self.flush() except Exception as e: logger.error(f"Failed to perform initial snapshot: {e}") raise - else: - logger.info("Initial snapshot is disabled - starting CDC stream only") - # Start CDC loop - while run: - # Get changes from MySQL binlog - changes = get_changes(binlog_stream, MYSQL_SCHEMA, MYSQL_TABLE) - for change in changes: - buffer.append(change) - - if len(buffer) > 0: - print(f"Buffer length: {len(buffer)}") - print(f"Buffer: {buffer}") - - # Check if 500 milliseconds have passed + def process_buffered_messages(self): + """Process and send buffered messages if flush interval has passed""" current_time = time.time() - if (current_time - last_flush_time) >= 0.5 and len(buffer) > 0: - # If 500ms have passed, produce all buffered messages - for message in buffer: - producer.produce(topic=output_topic.name, - key=MYSQL_TABLE_NAME, - value=json.dumps(message)) - print("Message sent to Kafka") + + if (current_time - self.last_flush_time) >= self.flush_interval and len(self.buffer) > 0: + logger.debug(f"Processing {len(self.buffer)} buffered messages") - # Flush the producer to send the messages - producer.flush() + # Send all buffered messages + for message in self.buffer: + msg = self.serialize( + key=self.mysql_table_name, + value=message + ) + self.produce( + key=msg.key, + value=msg.value, + ) - # Save binlog position after successful send - if hasattr(binlog_stream, 'log_file') and hasattr(binlog_stream, 'log_pos'): - save_binlog_position(binlog_stream.log_file, binlog_stream.log_pos) - - # Clear the buffer - buffer = [] - # Update the last flush time - last_flush_time = current_time + # Save binlog position if available + if hasattr(self.binlog_stream, 'log_file') and hasattr(self.binlog_stream, 'log_pos'): + self.save_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos) + + # Flush the producer and commit state changes + self.flush() + + # Clear the buffer and update flush time + self.buffer = [] + self.last_flush_time = current_time + + logger.debug("Buffered messages sent and state committed") - time.sleep(WAIT_INTERVAL) + def run(self): + """Main CDC loop - runs while self.running is True""" + logger.info(f"Starting MySQL CDC source for {self.mysql_table_name}") + + # Perform initial snapshot if needed + self.perform_initial_snapshot_if_needed() + + # Log binlog position if available + saved_position = self.get_binlog_position() + if saved_position: + logger.info(f"Resuming from binlog position: {saved_position}") + + # Start CDC loop + while self.running: + try: + # Get changes from MySQL binlog + changes = get_changes(self.binlog_stream, self.mysql_schema, self.mysql_table) + + # Add changes to buffer + for change in changes: + self.buffer.append(change) + + if len(self.buffer) > 0: + logger.debug(f"Buffer length: {len(self.buffer)}") + + # Process buffered messages if flush interval has passed + self.process_buffered_messages() + + # Small sleep to prevent excessive CPU usage + time.sleep(self.wait_interval) + + except Exception as e: + logger.error(f"Error in CDC loop: {e}") + # Still continue running unless it's a fatal error + time.sleep(1) # Wait a bit longer on error + def stop(self): + """Clean up resources when stopping""" + logger.info("Stopping MySQL CDC source") + + # Process any remaining buffered messages + if len(self.buffer) > 0: + logger.info(f"Processing {len(self.buffer)} remaining buffered messages") + self.process_buffered_messages() + + # Clean up connections + if self.conn: + self.conn.close() + logger.info("MySQL connection closed") + + if self.binlog_stream: + self.binlog_stream.close() + logger.info("Binlog stream closed") + + super().stop() -if __name__ == "__main__": +def main(): + """Main function to run the MySQL CDC source""" + # Create a Quix Application + app = Application() + + # Check the output topic is configured + output_topic_name = os.getenv("output", "") + if output_topic_name == "": + raise ValueError("output_topic environment variable is required") + + # Create the MySQL CDC source + mysql_source = MySqlCdcSource(name="mysql-cdc-source") + + # Create a StreamingDataFrame from the source + sdf = app.dataframe(source=mysql_source) + + # Print messages for debugging (you can replace this with your processing logic) + # sdf.print(metadata=True) # Commented out to reduce verbose output + + # Send CDC data to output topic + sdf.to_topic(output_topic_name) + + # Run the application try: - main() + logger.info("Starting MySQL CDC application") + app.run() except KeyboardInterrupt: - logger.info("Exiting.") - run = False + logger.info("Application interrupted by user") + except Exception as e: + logger.error(f"Application error: {e}") + raise finally: - if 'conn' in locals(): - conn.close() - if 'binlog_stream' in locals(): - binlog_stream.close() - logger.info("Connection to MySQL closed") - logger.info("Exiting") \ No newline at end of file + logger.info("MySQL CDC application stopped") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/python/sources/mysql_cdc/mysql_helper.py b/python/sources/mysql_cdc/mysql_helper.py index 3a477da30..a9f24d996 100644 --- a/python/sources/mysql_cdc/mysql_helper.py +++ b/python/sources/mysql_cdc/mysql_helper.py @@ -286,7 +286,7 @@ def perform_initial_snapshot(schema_name: str, table_name: str, batch_size: int processed_rows += len(rows) offset += batch_size - if processed_rows % 10000 == 0: # Log progress every 10k rows + if processed_rows % 50000 == 0: # Log progress every 50k rows logger.info(f"Snapshot progress: {processed_rows}/{total_rows} rows processed") logger.info(f"Initial snapshot completed: {processed_rows} rows captured") diff --git a/python/sources/mysql_cdc/requirements.txt b/python/sources/mysql_cdc/requirements.txt index 529dc4ec2..98619bb5b 100644 --- a/python/sources/mysql_cdc/requirements.txt +++ b/python/sources/mysql_cdc/requirements.txt @@ -1,4 +1,4 @@ -quixstreams==2.9.0 +quixstreams==3.15.0 pymysql mysql-replication python-dotenv \ No newline at end of file diff --git a/python/sources/mysql_cdc/setup_logger.py b/python/sources/mysql_cdc/setup_logger.py index 24859698d..e6713f05b 100644 --- a/python/sources/mysql_cdc/setup_logger.py +++ b/python/sources/mysql_cdc/setup_logger.py @@ -1,7 +1,7 @@ import logging # Set up logger -PROD_ENV = False +PROD_ENV = True logger = logging.getLogger("MySQL CDC") logging.basicConfig() From 0bf5e50d17787d0e22799f14de7d9c601e88b38f Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Wed, 4 Jun 2025 15:45:11 -0400 Subject: [PATCH 4/4] Update comments --- python/sources/mysql_cdc/README.md | 31 +++--- python/sources/mysql_cdc/README_MYSQL_CDC.md | 107 ++++++++++--------- 2 files changed, 74 insertions(+), 64 deletions(-) diff --git a/python/sources/mysql_cdc/README.md b/python/sources/mysql_cdc/README.md index 4e039a931..6d1bf30d2 100644 --- a/python/sources/mysql_cdc/README.md +++ b/python/sources/mysql_cdc/README.md @@ -1,10 +1,11 @@ # MySQL CDC -This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It features **persistent binlog position tracking** to ensure exactly-once processing and automatic recovery after restarts. +This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It's built using **Quix Streams StatefulSource** to ensure exactly-once processing and automatic recovery after restarts. ## Key Features -- **Persistent Binlog Position**: Automatically saves and resumes from the last processed binlog position +- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework +- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking - **Exactly-Once Processing**: No data loss during application restarts or failures - **Initial Snapshot**: Optionally capture existing data before starting CDC - **Automatic Recovery**: Seamlessly resume processing after interruptions @@ -35,26 +36,26 @@ This connector demonstrates how to capture changes to a MySQL database table (us - **SNAPSHOT_BATCH_SIZE**: Number of rows to process in each snapshot batch (default: 1000). - **FORCE_SNAPSHOT**: Set to "true" to force snapshot even if already completed (default: false). -### State Management -- **Quix__State__Dir**: Directory for storing application state including binlog positions (default: "state"). +## Quix Streams StatefulSource -## Binlog Position Persistence +The connector uses Quix Streams' `StatefulSource` class which provides: -The connector automatically tracks the MySQL binlog position and saves it to disk after successful Kafka delivery. This ensures: +- **Automatic State Persistence**: Binlog positions and snapshot status are automatically saved to the state store +- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication +- **Fault Tolerance**: Automatic recovery from failures with consistent state +- **Production-Ready**: Built on Quix Streams' proven architecture -- **No data loss** during application restarts -- **Exactly-once processing** of database changes -- **Automatic resumption** from the last processed position +### State Management: +- **Binlog Position**: Automatically tracked as `binlog_position_{schema}_{table}` +- **Snapshot Completion**: Tracked as `snapshot_completed_{schema}_{table}` +- **Transactional Processing**: State changes are committed atomically with message production -Position files are stored in: `{STATE_DIR}/binlog_position_{schema}_{table}.json` - -Example position file: +Example state data: ```json { "log_file": "mysql-bin.000123", "log_pos": 45678, - "timestamp": 1704067200.0, - "readable_time": "2024-01-01 12:00:00 UTC" + "timestamp": 1704067200.0 } ``` @@ -71,7 +72,7 @@ MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional: use read replica The initial snapshot: - Processes data in configurable batches to avoid memory issues - Sends snapshot records with `"kind": "snapshot_insert"` to distinguish from real inserts -- Marks completion to avoid re-processing on restart +- Marks completion in the StatefulSource state store to avoid re-processing on restart - Can be forced to re-run with `FORCE_SNAPSHOT=true` ## Requirements / Prerequisites diff --git a/python/sources/mysql_cdc/README_MYSQL_CDC.md b/python/sources/mysql_cdc/README_MYSQL_CDC.md index 521d69bcc..8e3c4c6a6 100644 --- a/python/sources/mysql_cdc/README_MYSQL_CDC.md +++ b/python/sources/mysql_cdc/README_MYSQL_CDC.md @@ -1,15 +1,16 @@ # MySQL CDC Setup -This application implements MySQL CDC using MySQL binary log replication with **persistent binlog position tracking** for exactly-once processing and automatic recovery. +This application implements MySQL CDC using MySQL binary log replication with **Quix Streams StatefulSource** for exactly-once processing and automatic recovery. ## Key Features -- **Persistent Binlog Position**: Automatically saves and resumes from the last processed binlog position +- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework +- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking - **Exactly-Once Processing**: No data loss during application restarts or failures - **Initial Snapshot**: Optionally capture existing data before starting CDC - **Automatic Recovery**: Seamlessly resume processing after interruptions - **Change Buffering**: Batches changes for efficient Kafka publishing -- **State Management**: Integrated state persistence for production reliability +- **Built-in Reliability**: Leverages Quix Streams' production-ready state management ## Prerequisites @@ -59,9 +60,6 @@ Set the following environment variables: - `SNAPSHOT_BATCH_SIZE` - Rows per snapshot batch (default: 1000) - `FORCE_SNAPSHOT` - Set to "true" to force re-snapshot (default: false) -### State Management -- `Quix__State__Dir` - Directory for storing state files (default: "state") - ### Kafka Output - `output` - Kafka topic name for publishing changes @@ -85,40 +83,47 @@ INITIAL_SNAPSHOT=true SNAPSHOT_BATCH_SIZE=1000 FORCE_SNAPSHOT=false -# State Management -Quix__State__Dir=./state - # Kafka Output output=cdc-changes-topic ``` -## Binlog Position Persistence - -The application automatically tracks MySQL binlog positions and persists them to disk: - -### How it works: -1. **Position Tracking**: Records current binlog file and position during processing -2. **Automatic Saving**: Saves position after successful Kafka delivery -3. **Recovery**: Automatically resumes from last saved position on restart -4. **Exactly-Once**: Ensures no data loss or duplication - -### Position Storage: -- Location: `{STATE_DIR}/binlog_position_{schema}_{table}.json` -- Format: - ```json - { - "log_file": "mysql-bin.000123", - "log_pos": 45678, - "timestamp": 1704067200.0, - "readable_time": "2024-01-01 12:00:00 UTC" - } - ``` +## Quix Streams StatefulSource Architecture + +The application uses Quix Streams' `StatefulSource` class which provides: + +### Built-in State Management: +- **Automatic Persistence**: State is automatically saved to the configured state store +- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication +- **Transactional Processing**: State changes are committed atomically with message production +- **Fault Tolerance**: Automatic recovery from failures with consistent state + +### State Storage: +The StatefulSource manages two types of state: +1. **Binlog Position**: `binlog_position_{schema}_{table}` + ```json + { + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0 + } + ``` + +2. **Snapshot Completion**: `snapshot_completed_{schema}_{table}` + ```json + { + "completed_at": 1704067200.0, + "schema": "database_name", + "table": "table_name", + "timestamp": "2024-01-01 12:00:00 UTC" + } + ``` ### Benefits: -- ✅ **No data loss** during application restarts -- ✅ **Exactly-once processing** of database changes -- ✅ **Automatic recovery** from last processed position -- ✅ **Production-ready** state management +- ✅ **Production-Ready**: Built on Quix Streams' proven architecture +- ✅ **No Manual State Management**: Automatic state persistence and recovery +- ✅ **Exactly-Once Processing**: Guaranteed delivery semantics +- ✅ **Simplified Operations**: Reduced complexity compared to manual state management +- ✅ **Scalable**: Can be easily deployed and scaled in production environments ## Initial Snapshot @@ -135,14 +140,14 @@ MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional - **Batched Processing**: Configurable batch sizes to handle large tables - **Memory Efficient**: Processes data in chunks to avoid memory issues - **Read Replica Support**: Use `MYSQL_SNAPSHOT_HOST` to snapshot from replica -- **Completion Tracking**: Marks snapshot completion to avoid re-processing +- **Completion Tracking**: Marks snapshot completion in StatefulSource state store - **Force Re-snapshot**: Use `FORCE_SNAPSHOT=true` to re-run if needed ### Snapshot Process: 1. Connects to snapshot host (or main host if not specified) 2. Processes table data in batches 3. Sends records with `"kind": "snapshot_insert"` -4. Marks completion in state file +4. Marks completion in StatefulSource state store 5. Proceeds to real-time CDC ## Dependencies @@ -152,7 +157,8 @@ Install the required Python packages: pip install -r requirements.txt ``` -The key MySQL-specific dependencies are: +The key dependencies are: +- `quixstreams` - Quix Streams library with StatefulSource support - `pymysql` - MySQL database connector - `mysql-replication` - MySQL binary log replication library @@ -224,23 +230,24 @@ The MySQL CDC produces change events in the following format: ``` ### Application Flow: -1. **Load State**: Attempts to load saved binlog position -2. **Initial Snapshot** (if enabled and not completed): +1. **StatefulSource Initialization**: Quix Streams creates the MySQL CDC source +2. **State Recovery**: Automatically loads saved binlog position and snapshot status +3. **Initial Snapshot** (if enabled and not completed): - Connects to snapshot host - Processes existing data in batches - Sends snapshot events to Kafka - - Marks completion -3. **Real-time CDC**: + - Marks completion in state store +4. **Real-time CDC**: - Connects to MySQL binlog stream - Resumes from saved position (or current if first run) - Monitors specified table for changes - Buffers changes and publishes to Kafka every 500ms - - Saves binlog position after successful delivery -4. **Recovery**: On restart, automatically resumes from last saved position + - Automatically commits state after successful delivery +5. **Automatic Recovery**: On restart, StatefulSource handles state recovery ### Monitoring: - Check application logs for binlog position updates -- Monitor state directory for position files +- Monitor Quix Streams state store for position and snapshot data - Verify Kafka topic for change events - Use MySQL's `SHOW MASTER STATUS` to compare positions @@ -256,9 +263,10 @@ The MySQL CDC produces change events in the following format: - Error: Access denied - Solution: Grant REPLICATION SLAVE, REPLICATION CLIENT privileges -3. **Position file corruption**: - - Delete position file to restart from current position - - Location: `{STATE_DIR}/binlog_position_{schema}_{table}.json` +3. **StatefulSource state issues**: + - StatefulSource automatically handles state recovery + - Check Quix Streams configuration and state store connectivity + - Review application logs for state-related errors 4. **Snapshot issues**: - Check `MYSQL_SNAPSHOT_HOST` connectivity @@ -267,6 +275,7 @@ The MySQL CDC produces change events in the following format: ### Best Practices: - Use read replicas for initial snapshots on large tables -- Monitor disk space for state directory +- Configure appropriate Quix Streams state store settings - Set appropriate `SNAPSHOT_BATCH_SIZE` based on available memory -- Regularly backup state files for disaster recovery \ No newline at end of file +- Monitor Quix Streams metrics for source performance +- Ensure proper Kafka connectivity for reliable message delivery \ No newline at end of file