diff --git a/python/sources/mysql_cdc/.gitignore b/python/sources/mysql_cdc/.gitignore new file mode 100644 index 00000000..70409a14 --- /dev/null +++ b/python/sources/mysql_cdc/.gitignore @@ -0,0 +1,10 @@ +**/ref/ +**/obj/ +**/bin/ +DotSettings.user +**DotSettings.user +ca.cert +.idea/ +__pycache__/ +certificates/ +state/ \ No newline at end of file diff --git a/python/sources/mysql_cdc/README.md b/python/sources/mysql_cdc/README.md new file mode 100644 index 00000000..6d1bf30d --- /dev/null +++ b/python/sources/mysql_cdc/README.md @@ -0,0 +1,163 @@ +# MySQL CDC + +This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It's built using **Quix Streams StatefulSource** to ensure exactly-once processing and automatic recovery after restarts. + +## Key Features + +- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework +- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking +- **Exactly-Once Processing**: No data loss during application restarts or failures +- **Initial Snapshot**: Optionally capture existing data before starting CDC +- **Automatic Recovery**: Seamlessly resume processing after interruptions +- **Change Buffering**: Batches changes for efficient Kafka publishing + +## How to run + +1. Set up your MySQL database with binary logging enabled +2. Configure environment variables for MySQL connection +3. Install dependencies: `pip install -r requirements.txt` +4. Run: `python main.py` + +## Environment variables + +### Required MySQL Connection +- **output**: Name of the output topic to write into. +- **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server. +- **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306). +- **MYSQL_DATABASE**: The name of the database for CDC. +- **MYSQL_USER**: The username that should be used to interact with the database. +- **MYSQL_PASSWORD**: The password for the user configured above. +- **MYSQL_SCHEMA**: The name of the schema/database for CDC (same as MYSQL_DATABASE). +- **MYSQL_TABLE**: The name of the table for CDC. + +### Optional Configuration +- **MYSQL_SNAPSHOT_HOST**: MySQL host for initial snapshot (defaults to MYSQL_HOST if not set). Use this if you want to perform initial snapshot from a different MySQL instance (e.g., read replica). +- **INITIAL_SNAPSHOT**: Set to "true" to perform initial snapshot of existing data (default: false). +- **SNAPSHOT_BATCH_SIZE**: Number of rows to process in each snapshot batch (default: 1000). +- **FORCE_SNAPSHOT**: Set to "true" to force snapshot even if already completed (default: false). + +## Quix Streams StatefulSource + +The connector uses Quix Streams' `StatefulSource` class which provides: + +- **Automatic State Persistence**: Binlog positions and snapshot status are automatically saved to the state store +- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication +- **Fault Tolerance**: Automatic recovery from failures with consistent state +- **Production-Ready**: Built on Quix Streams' proven architecture + +### State Management: +- **Binlog Position**: Automatically tracked as `binlog_position_{schema}_{table}` +- **Snapshot Completion**: Tracked as `snapshot_completed_{schema}_{table}` +- **Transactional Processing**: State changes are committed atomically with message production + +Example state data: +```json +{ + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0 +} +``` + +## Initial Snapshot + +Enable initial snapshot to capture existing table data before starting CDC: + +```env +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional: use read replica +``` + +The initial snapshot: +- Processes data in configurable batches to avoid memory issues +- Sends snapshot records with `"kind": "snapshot_insert"` to distinguish from real inserts +- Marks completion in the StatefulSource state store to avoid re-processing on restart +- Can be forced to re-run with `FORCE_SNAPSHOT=true` + +## Requirements / Prerequisites + +- A MySQL Database with binary logging enabled. +- Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration. +- MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges. +- For initial snapshot: `SELECT` privilege on the target table. + +### MySQL Configuration Example +```ini +[mysqld] +server-id = 1 +log_bin = /var/log/mysql/mysql-bin.log +binlog_expire_logs_seconds = 864000 +max_binlog_size = 100M +binlog-format = ROW +binlog_row_metadata = FULL +binlog_row_image = FULL +``` + +### MySQL User Permissions +```sql +-- Create replication user +CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password'; + +-- Grant replication privileges +GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; + +-- Grant select for initial snapshot +GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%'; + +FLUSH PRIVILEGES; +``` + +## Change Event Format + +### INSERT/Snapshot Insert +```json +{ + "kind": "insert", // or "snapshot_insert" for initial snapshot + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "John Doe"], + "oldkeys": {} +} +``` + +### UPDATE +```json +{ + "kind": "update", + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "Jane Doe"], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "John Doe"] + } +} +``` + +### DELETE +```json +{ + "kind": "delete", + "schema": "database_name", + "table": "table_name", + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "Jane Doe"] + } +} +``` + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/python/sources/mysql_cdc/README_MYSQL_CDC.md b/python/sources/mysql_cdc/README_MYSQL_CDC.md new file mode 100644 index 00000000..8e3c4c6a --- /dev/null +++ b/python/sources/mysql_cdc/README_MYSQL_CDC.md @@ -0,0 +1,281 @@ +# MySQL CDC Setup + +This application implements MySQL CDC using MySQL binary log replication with **Quix Streams StatefulSource** for exactly-once processing and automatic recovery. + +## Key Features + +- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework +- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking +- **Exactly-Once Processing**: No data loss during application restarts or failures +- **Initial Snapshot**: Optionally capture existing data before starting CDC +- **Automatic Recovery**: Seamlessly resume processing after interruptions +- **Change Buffering**: Batches changes for efficient Kafka publishing +- **Built-in Reliability**: Leverages Quix Streams' production-ready state management + +## Prerequisites + +1. **MySQL Configuration**: Your MySQL server must have binary logging enabled with ROW format: + ```ini + # Add to MySQL configuration file (my.cnf or my.ini) + [mysqld] + server-id = 1 + log_bin = /var/log/mysql/mysql-bin.log + binlog_expire_logs_seconds = 864000 + max_binlog_size = 100M + binlog-format = ROW + binlog_row_metadata = FULL + binlog_row_image = FULL + ``` + +2. **MySQL User Permissions**: The MySQL user needs REPLICATION SLAVE and REPLICATION CLIENT privileges: + ```sql + -- Create replication user + CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password'; + + -- Grant replication privileges for CDC + GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; + + -- Grant select for initial snapshot (if using snapshot feature) + GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%'; + + FLUSH PRIVILEGES; + ``` + +## Environment Variables + +Set the following environment variables: + +### Required MySQL Connection +- `MYSQL_HOST` - MySQL server hostname (e.g., localhost) +- `MYSQL_PORT` - MySQL server port (default: 3306) +- `MYSQL_USER` - MySQL username +- `MYSQL_PASSWORD` - MySQL password +- `MYSQL_DATABASE` - MySQL database name +- `MYSQL_SCHEMA` - MySQL database name (same as MYSQL_DATABASE) +- `MYSQL_TABLE` - Table name to monitor for changes + +### Optional Configuration +- `MYSQL_SNAPSHOT_HOST` - MySQL host for initial snapshot (defaults to MYSQL_HOST). Use this to snapshot from a read replica +- `INITIAL_SNAPSHOT` - Set to "true" to perform initial snapshot (default: false) +- `SNAPSHOT_BATCH_SIZE` - Rows per snapshot batch (default: 1000) +- `FORCE_SNAPSHOT` - Set to "true" to force re-snapshot (default: false) + +### Kafka Output +- `output` - Kafka topic name for publishing changes + +## Example .env file + +```env +# MySQL Connection +MYSQL_HOST=localhost +MYSQL_PORT=3306 +MYSQL_USER=cdc_user +MYSQL_PASSWORD=secure_password +MYSQL_DATABASE=your_database +MYSQL_SCHEMA=your_database +MYSQL_TABLE=your_table + +# Optional: Use read replica for initial snapshot +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com + +# Initial Snapshot Configuration +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +FORCE_SNAPSHOT=false + +# Kafka Output +output=cdc-changes-topic +``` + +## Quix Streams StatefulSource Architecture + +The application uses Quix Streams' `StatefulSource` class which provides: + +### Built-in State Management: +- **Automatic Persistence**: State is automatically saved to the configured state store +- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication +- **Transactional Processing**: State changes are committed atomically with message production +- **Fault Tolerance**: Automatic recovery from failures with consistent state + +### State Storage: +The StatefulSource manages two types of state: +1. **Binlog Position**: `binlog_position_{schema}_{table}` + ```json + { + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0 + } + ``` + +2. **Snapshot Completion**: `snapshot_completed_{schema}_{table}` + ```json + { + "completed_at": 1704067200.0, + "schema": "database_name", + "table": "table_name", + "timestamp": "2024-01-01 12:00:00 UTC" + } + ``` + +### Benefits: +- ✅ **Production-Ready**: Built on Quix Streams' proven architecture +- ✅ **No Manual State Management**: Automatic state persistence and recovery +- ✅ **Exactly-Once Processing**: Guaranteed delivery semantics +- ✅ **Simplified Operations**: Reduced complexity compared to manual state management +- ✅ **Scalable**: Can be easily deployed and scaled in production environments + +## Initial Snapshot + +Capture existing table data before starting real-time CDC: + +### Configuration: +```env +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional +``` + +### Features: +- **Batched Processing**: Configurable batch sizes to handle large tables +- **Memory Efficient**: Processes data in chunks to avoid memory issues +- **Read Replica Support**: Use `MYSQL_SNAPSHOT_HOST` to snapshot from replica +- **Completion Tracking**: Marks snapshot completion in StatefulSource state store +- **Force Re-snapshot**: Use `FORCE_SNAPSHOT=true` to re-run if needed + +### Snapshot Process: +1. Connects to snapshot host (or main host if not specified) +2. Processes table data in batches +3. Sends records with `"kind": "snapshot_insert"` +4. Marks completion in StatefulSource state store +5. Proceeds to real-time CDC + +## Dependencies + +Install the required Python packages: +```bash +pip install -r requirements.txt +``` + +The key dependencies are: +- `quixstreams` - Quix Streams library with StatefulSource support +- `pymysql` - MySQL database connector +- `mysql-replication` - MySQL binary log replication library + +## Change Data Format + +The MySQL CDC produces change events in the following format: + +### Snapshot Insert Event +```json +{ + "kind": "snapshot_insert", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["value1", "value2"], + "oldkeys": {} +} +``` + +### INSERT Event +```json +{ + "kind": "insert", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["value1", "value2"], + "oldkeys": {} +} +``` + +### UPDATE Event +```json +{ + "kind": "update", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["new_value1", "new_value2"], + "oldkeys": { + "keynames": ["col1", "col2"], + "keyvalues": ["old_value1", "old_value2"] + } +} +``` + +### DELETE Event +```json +{ + "kind": "delete", + "schema": "database_name", + "table": "table_name", + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": ["col1", "col2"], + "keyvalues": ["deleted_value1", "deleted_value2"] + } +} +``` + +## Running the Application + +1. **Configure MySQL** with binary logging enabled +2. **Set environment variables** (see example above) +3. **Run the application**: + ```bash + python main.py + ``` + +### Application Flow: +1. **StatefulSource Initialization**: Quix Streams creates the MySQL CDC source +2. **State Recovery**: Automatically loads saved binlog position and snapshot status +3. **Initial Snapshot** (if enabled and not completed): + - Connects to snapshot host + - Processes existing data in batches + - Sends snapshot events to Kafka + - Marks completion in state store +4. **Real-time CDC**: + - Connects to MySQL binlog stream + - Resumes from saved position (or current if first run) + - Monitors specified table for changes + - Buffers changes and publishes to Kafka every 500ms + - Automatically commits state after successful delivery +5. **Automatic Recovery**: On restart, StatefulSource handles state recovery + +### Monitoring: +- Check application logs for binlog position updates +- Monitor Quix Streams state store for position and snapshot data +- Verify Kafka topic for change events +- Use MySQL's `SHOW MASTER STATUS` to compare positions + +## Troubleshooting + +### Common Issues: + +1. **Binary logging not enabled**: + - Error: "Binary logging must be enabled for CDC" + - Solution: Enable binlog in MySQL configuration and restart + +2. **Insufficient privileges**: + - Error: Access denied + - Solution: Grant REPLICATION SLAVE, REPLICATION CLIENT privileges + +3. **StatefulSource state issues**: + - StatefulSource automatically handles state recovery + - Check Quix Streams configuration and state store connectivity + - Review application logs for state-related errors + +4. **Snapshot issues**: + - Check `MYSQL_SNAPSHOT_HOST` connectivity + - Verify SELECT privileges on target table + - Review batch size for memory constraints + +### Best Practices: +- Use read replicas for initial snapshots on large tables +- Configure appropriate Quix Streams state store settings +- Set appropriate `SNAPSHOT_BATCH_SIZE` based on available memory +- Monitor Quix Streams metrics for source performance +- Ensure proper Kafka connectivity for reliable message delivery \ No newline at end of file diff --git a/python/sources/mysql_cdc/dockerfile b/python/sources/mysql_cdc/dockerfile new file mode 100644 index 00000000..692316bb --- /dev/null +++ b/python/sources/mysql_cdc/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/sources/mysql_cdc/helper_functions.py b/python/sources/mysql_cdc/helper_functions.py new file mode 100644 index 00000000..779146c6 --- /dev/null +++ b/python/sources/mysql_cdc/helper_functions.py @@ -0,0 +1,101 @@ +from datetime import timedelta +import os +import json + + +def load_config(): + driver = os.environ["driver"] + server = os.environ["server"] + user_id = os.environ["userid"] + password = os.environ["password"] + database = os.environ["database"] + table_name = os.environ["table_name"] + last_modified_column = os.environ["last_modified_column"] + time_delta_config = os.environ["time_delta"] + + try: + use_utc_for_offset = bool(os.environ["offset_is_utc"]) + except Exception as e: + raise Exception("Use UTC For Offset must be True or False", e) + + drop_cols = os.getenv("columns_to_drop") + rename_cols = None + passed_rename_cols = os.getenv("columns_to_rename") + + try: + poll_interval = int(os.environ["poll_interval_seconds"]) + except Exception as e: + raise Exception("Poll Interval must be an integer", e) + + if poll_interval < 1: + poll_interval = 1 + + try: + if passed_rename_cols != None and passed_rename_cols != "": + rename_cols = json.loads(passed_rename_cols) + except Exception as e: + raise Exception("Invalid JSON supplied for column renames", e) + + return { + "driver": driver, + "server": server, + "user_id": user_id, + "password": password, + "database": database, + "table_name": table_name, + "last_modified_column": last_modified_column, + "time_delta": make_time_delta_from_config(time_delta_config), + "drop_cols": drop_cols, + "rename_cols": rename_cols, + "use_utc": use_utc_for_offset, + "poll_interval": poll_interval + } + + +def make_time_delta_from_config(time_delta_config) -> timedelta: + time_delta_values = time_delta_config.split(",") + + if len(time_delta_values) != 5: + raise Exception( + "time_delta_config must contain 5 values, one for each of seconds, minutes, hours, days and weeks") + + try: + seconds = int(time_delta_values[0]) + minutes = int(time_delta_values[1]) + hours = int(time_delta_values[2]) + days = int(time_delta_values[3]) + weeks = int(time_delta_values[4]) + return timedelta(seconds = seconds, minutes = minutes, hours = hours, days = days, weeks = weeks) + except TypeError as te: + raise Exception("Unable to cast one of the supplied values to int", te) + except Exception as e: + raise Exception("Something went wrong configuring the time delta", e) + + +def check_table_exists(conn, table) -> bool: + if not conn.cursor().tables(table).fetchone(): + print("Table does not exist") + return False + return True + + +def check_column_exists(conn, table, column_name) -> bool: + for c in conn.cursor().columns(table = table): + if column_name == c.column_name: + return True + print("Key column [{}] not found in table [{}]".format(column_name, table)) + return False + + +def drop_columns(conn, cols_to_drop, table_data, table_name) -> any: + for col in cols_to_drop: + if check_column_exists(conn, table_name, col): + table_data = table_data.drop(col, 1) + return table_data + + +def rename_columns(conn, cols_to_rename, table_data, table_name) -> any: + for col in cols_to_rename: + if check_column_exists(conn, table_name, col): + table_data = table_data.rename(columns={col: cols_to_rename[col]}) + return table_data \ No newline at end of file diff --git a/python/sources/mysql_cdc/icon.png b/python/sources/mysql_cdc/icon.png new file mode 100644 index 00000000..725e0be4 Binary files /dev/null and b/python/sources/mysql_cdc/icon.png differ diff --git a/python/sources/mysql_cdc/library.json b/python/sources/mysql_cdc/library.json new file mode 100644 index 00000000..8ac9c220 --- /dev/null +++ b/python/sources/mysql_cdc/library.json @@ -0,0 +1,89 @@ +{ + "libraryItemId": "mysql-cdc-source", + "name": "MySQL CDC Source", + "language": "Python", + "IconFile": "icon.png", + "tags": { + "Type": ["Connectors"], + "Pipeline Stage": ["Source"], + "Category": ["SQL DB"] + }, + "shortDescription": "Capture changes to a MySQL database table and publish the change events to a Kafka topic using binary log replication.", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "DefaultFile": "main.py", + "Variables": [ + { + "Name": "output", + "Type": "EnvironmentVariable", + "InputType": "OutputTopic", + "Description": "This is the output topic", + "DefaultValue": "mysql-cdc-source", + "Required": true + }, + { + "Name": "MYSQL_HOST", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Host name of MySQL", + "DefaultValue": "", + "Required": true + }, + { + "Name": "MYSQL_PORT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Port of MySQL", + "DefaultValue": "3306", + "Required": true + }, + { + "Name": "MYSQL_USER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Username of MySQL", + "DefaultValue": "", + "Required": true + }, + { + "Name": "MYSQL_PASSWORD", + "Type": "EnvironmentVariable", + "InputType": "HiddenText", + "Description": "Password of MySQL", + "DefaultValue": "", + "Required": true + }, + { + "Name": "MYSQL_DATABASE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Database name of MySQL", + "DefaultValue": "", + "Required": true + }, + { + "Name": "MYSQL_SCHEMA", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Name of schema/database for CDC", + "DefaultValue": "", + "Required": true + }, + { + "Name": "MYSQL_TABLE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Name of table for CDC", + "DefaultValue": "", + "Required": true + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 200, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": true + } +} \ No newline at end of file diff --git a/python/sources/mysql_cdc/main.py b/python/sources/mysql_cdc/main.py new file mode 100644 index 00000000..7d652e2b --- /dev/null +++ b/python/sources/mysql_cdc/main.py @@ -0,0 +1,257 @@ +from quixstreams import Application +from quixstreams.sources.base import StatefulSource +import time +import os +import json +from setup_logger import logger +from mysql_helper import connect_mysql, enable_binlog_if_needed, setup_mysql_cdc, create_binlog_stream, get_changes, perform_initial_snapshot + +# Load environment variables (useful when working locally) +from dotenv import load_dotenv +load_dotenv() + +class MySqlCdcSource(StatefulSource): + def __init__(self, name: str = "mysql-cdc-source"): + super().__init__(name=name) + + # Load configuration from environment variables + self.mysql_schema = os.environ["MYSQL_SCHEMA"] # MySQL database name + self.mysql_table = os.environ["MYSQL_TABLE"] # MySQL table name + self.mysql_table_name = f"{self.mysql_schema}.{self.mysql_table}" + self.wait_interval = 0.1 + + # Initial snapshot configuration + self.initial_snapshot = os.getenv("INITIAL_SNAPSHOT", "false").lower() == "true" + self.snapshot_batch_size = int(os.getenv("SNAPSHOT_BATCH_SIZE", "1000")) + self.force_snapshot = os.getenv("FORCE_SNAPSHOT", "false").lower() == "true" + + # Connection objects - will be initialized in setup() + self.conn = None + self.binlog_stream = None + + # Message buffering + self.buffer = [] + self.last_flush_time = time.time() + self.flush_interval = 0.5 # 500ms + + def setup(self): + """Initialize MySQL connection and CDC setup""" + try: + enable_binlog_if_needed() + setup_mysql_cdc(self.mysql_table) + self.conn = connect_mysql() + self.binlog_stream = create_binlog_stream() + logger.info("MySQL CDC CONNECTED!") + except Exception as e: + logger.error(f"ERROR during MySQL CDC setup - {e}") + raise + + def is_snapshot_completed(self): + """Check if initial snapshot has been completed using state store""" + snapshot_key = f"snapshot_completed_{self.mysql_schema}_{self.mysql_table}" + return self.state.get(snapshot_key, False) and not self.force_snapshot + + def mark_snapshot_completed(self): + """Mark initial snapshot as completed in state store""" + snapshot_key = f"snapshot_completed_{self.mysql_schema}_{self.mysql_table}" + snapshot_info = { + "completed_at": time.time(), + "schema": self.mysql_schema, + "table": self.mysql_table, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()) + } + self.state.set(snapshot_key, True) + self.state.set(f"snapshot_info_{self.mysql_schema}_{self.mysql_table}", snapshot_info) + logger.info(f"Snapshot completion marked in state store for {self.mysql_table_name}") + + def get_snapshot_info(self): + """Get information about when snapshot was completed""" + info_key = f"snapshot_info_{self.mysql_schema}_{self.mysql_table}" + return self.state.get(info_key, None) + + def save_binlog_position(self, log_file, log_pos): + """Save binlog position to state store""" + binlog_key = f"binlog_position_{self.mysql_schema}_{self.mysql_table}" + position_info = { + "log_file": log_file, + "log_pos": log_pos, + "timestamp": time.time() + } + self.state.set(binlog_key, position_info) + + def get_binlog_position(self): + """Get saved binlog position from state store""" + binlog_key = f"binlog_position_{self.mysql_schema}_{self.mysql_table}" + return self.state.get(binlog_key, None) + + def perform_initial_snapshot_if_needed(self): + """Perform initial snapshot if enabled and not already completed""" + if not self.initial_snapshot: + logger.info("Initial snapshot is disabled - starting CDC stream only") + return + + if self.is_snapshot_completed(): + snapshot_info = self.get_snapshot_info() + if self.force_snapshot: + logger.info("Initial snapshot already completed but FORCE_SNAPSHOT=true - performing snapshot again...") + else: + logger.info(f"Initial snapshot already completed at {snapshot_info.get('timestamp', 'unknown time')} - skipping") + return + else: + logger.info("Initial snapshot is enabled and not yet completed - performing snapshot...") + + if not self.is_snapshot_completed() or self.force_snapshot: + try: + snapshot_changes = perform_initial_snapshot( + self.mysql_schema, + self.mysql_table, + self.snapshot_batch_size + ) + + # Send snapshot data to Kafka immediately + for change in snapshot_changes: + msg = self.serialize( + key=self.mysql_table_name, + value=change + ) + self.produce( + key=msg.key, + value=msg.value, + ) + + # Flush to ensure all snapshot data is sent and commit state + self.flush() + logger.info(f"Initial snapshot completed - {len(snapshot_changes)} records sent to Kafka") + + # Mark snapshot as completed + self.mark_snapshot_completed() + # Flush again to save the snapshot completion state + self.flush() + + except Exception as e: + logger.error(f"Failed to perform initial snapshot: {e}") + raise + + def process_buffered_messages(self): + """Process and send buffered messages if flush interval has passed""" + current_time = time.time() + + if (current_time - self.last_flush_time) >= self.flush_interval and len(self.buffer) > 0: + logger.debug(f"Processing {len(self.buffer)} buffered messages") + + # Send all buffered messages + for message in self.buffer: + msg = self.serialize( + key=self.mysql_table_name, + value=message + ) + self.produce( + key=msg.key, + value=msg.value, + ) + + # Save binlog position if available + if hasattr(self.binlog_stream, 'log_file') and hasattr(self.binlog_stream, 'log_pos'): + self.save_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos) + + # Flush the producer and commit state changes + self.flush() + + # Clear the buffer and update flush time + self.buffer = [] + self.last_flush_time = current_time + + logger.debug("Buffered messages sent and state committed") + + def run(self): + """Main CDC loop - runs while self.running is True""" + logger.info(f"Starting MySQL CDC source for {self.mysql_table_name}") + + # Perform initial snapshot if needed + self.perform_initial_snapshot_if_needed() + + # Log binlog position if available + saved_position = self.get_binlog_position() + if saved_position: + logger.info(f"Resuming from binlog position: {saved_position}") + + # Start CDC loop + while self.running: + try: + # Get changes from MySQL binlog + changes = get_changes(self.binlog_stream, self.mysql_schema, self.mysql_table) + + # Add changes to buffer + for change in changes: + self.buffer.append(change) + + if len(self.buffer) > 0: + logger.debug(f"Buffer length: {len(self.buffer)}") + + # Process buffered messages if flush interval has passed + self.process_buffered_messages() + + # Small sleep to prevent excessive CPU usage + time.sleep(self.wait_interval) + + except Exception as e: + logger.error(f"Error in CDC loop: {e}") + # Still continue running unless it's a fatal error + time.sleep(1) # Wait a bit longer on error + + def stop(self): + """Clean up resources when stopping""" + logger.info("Stopping MySQL CDC source") + + # Process any remaining buffered messages + if len(self.buffer) > 0: + logger.info(f"Processing {len(self.buffer)} remaining buffered messages") + self.process_buffered_messages() + + # Clean up connections + if self.conn: + self.conn.close() + logger.info("MySQL connection closed") + + if self.binlog_stream: + self.binlog_stream.close() + logger.info("Binlog stream closed") + + super().stop() + +def main(): + """Main function to run the MySQL CDC source""" + # Create a Quix Application + app = Application() + + # Check the output topic is configured + output_topic_name = os.getenv("output", "") + if output_topic_name == "": + raise ValueError("output_topic environment variable is required") + + # Create the MySQL CDC source + mysql_source = MySqlCdcSource(name="mysql-cdc-source") + + # Create a StreamingDataFrame from the source + sdf = app.dataframe(source=mysql_source) + + # Print messages for debugging (you can replace this with your processing logic) + # sdf.print(metadata=True) # Commented out to reduce verbose output + + # Send CDC data to output topic + sdf.to_topic(output_topic_name) + + # Run the application + try: + logger.info("Starting MySQL CDC application") + app.run() + except KeyboardInterrupt: + logger.info("Application interrupted by user") + except Exception as e: + logger.error(f"Application error: {e}") + raise + finally: + logger.info("MySQL CDC application stopped") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/python/sources/mysql_cdc/mysql_helper.py b/python/sources/mysql_cdc/mysql_helper.py new file mode 100644 index 00000000..a9f24d99 --- /dev/null +++ b/python/sources/mysql_cdc/mysql_helper.py @@ -0,0 +1,300 @@ +import pymysql +import os +import json +import time +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.row_event import ( + DeleteRowsEvent, + UpdateRowsEvent, + WriteRowsEvent, +) +from setup_logger import logger + +def serialize_value(value): + """Convert a value to JSON-serializable format""" + if value is None: + return None + elif isinstance(value, (bytes, bytearray)): + # Convert binary data to base64 string + import base64 + return base64.b64encode(value).decode('utf-8') + elif hasattr(value, 'isoformat'): + # Handle datetime, date, time objects + return value.isoformat() + elif isinstance(value, (int, float, str, bool)): + # Already JSON serializable + return value + else: + # Convert other types to string + return str(value) + +def connect_mysql(host=None): + """Connect to MySQL database""" + MYSQL_HOST = host or os.environ["MYSQL_HOST"] + MYSQL_PORT = int(os.environ.get("MYSQL_PORT", 3306)) + MYSQL_USER = os.environ["MYSQL_USER"] + MYSQL_PASSWORD = os.environ["MYSQL_PASSWORD"] + MYSQL_DATABASE = os.environ["MYSQL_DATABASE"] + + conn = pymysql.connect( + host=MYSQL_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + database=MYSQL_DATABASE, + charset='utf8mb4' + ) + return conn + +def run_query(conn, query: str): + """Execute a query on MySQL""" + with conn.cursor() as cursor: + cursor.execute(query) + conn.commit() + +def enable_binlog_if_needed(): + """Check and enable binary logging if not already enabled""" + conn = connect_mysql() + try: + with conn.cursor() as cursor: + # Check if binary logging is enabled + cursor.execute("SHOW VARIABLES LIKE 'log_bin'") + result = cursor.fetchone() + + if result and result[1] == 'ON': + logger.info("Binary logging is already enabled") + else: + logger.warning("Binary logging is not enabled. Please enable it in MySQL configuration.") + logger.warning("Add the following to your MySQL config:") + logger.warning("log-bin=mysql-bin") + logger.warning("binlog-format=ROW") + raise Exception("Binary logging must be enabled for CDC") + + # Check binlog format + cursor.execute("SHOW VARIABLES LIKE 'binlog_format'") + result = cursor.fetchone() + + if result and result[1] != 'ROW': + logger.warning(f"Binlog format is {result[1]}, should be ROW for CDC") + logger.warning("Please set binlog_format=ROW in MySQL configuration") + + finally: + conn.close() + +def setup_mysql_cdc(table_name: str): + """Setup MySQL for CDC - mainly validation""" + conn = connect_mysql() + try: + with conn.cursor() as cursor: + # Check if table exists + cursor.execute(f"SHOW TABLES LIKE '{table_name.split('.')[-1]}'") + result = cursor.fetchone() + + if not result: + raise Exception(f"Table {table_name} not found") + + logger.info(f"Table {table_name} found and ready for CDC") + + finally: + conn.close() + +# Binlog position management +def get_binlog_position_file(): + """Get the path to the binlog position file""" + state_dir = os.getenv("Quix__State__Dir", "state") + if not os.path.exists(state_dir): + os.makedirs(state_dir) + schema = os.environ["MYSQL_SCHEMA"] + table = os.environ["MYSQL_TABLE"] + return os.path.join(state_dir, f"binlog_position_{schema}_{table}.json") + +def save_binlog_position(log_file, log_pos): + """Save the current binlog position to disk""" + position_file = get_binlog_position_file() + position_data = { + "log_file": log_file, + "log_pos": log_pos, + "timestamp": time.time(), + "readable_time": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()) + } + try: + with open(position_file, 'w') as f: + json.dump(position_data, f, indent=2) + logger.debug(f"Saved binlog position: {log_file}:{log_pos}") + except Exception as e: + logger.error(f"Failed to save binlog position: {e}") + +def load_binlog_position(): + """Load the last saved binlog position from disk""" + position_file = get_binlog_position_file() + if os.path.exists(position_file): + try: + with open(position_file, 'r') as f: + position_data = json.load(f) + logger.info(f"Loaded binlog position: {position_data['log_file']}:{position_data['log_pos']} from {position_data.get('readable_time', 'unknown time')}") + return position_data['log_file'], position_data['log_pos'] + except Exception as e: + logger.error(f"Failed to load binlog position: {e}") + return None, None + +def create_binlog_stream(server_id=1): + """Create and return a MySQL binlog stream reader with position resumption""" + MYSQL_HOST = os.environ["MYSQL_HOST"] + MYSQL_PORT = int(os.environ.get("MYSQL_PORT", 3306)) + MYSQL_USER = os.environ["MYSQL_USER"] + MYSQL_PASSWORD = os.environ["MYSQL_PASSWORD"] + + mysql_settings = { + "host": MYSQL_HOST, + "port": MYSQL_PORT, + "user": MYSQL_USER, + "passwd": MYSQL_PASSWORD, + } + + # Load saved binlog position + log_file, log_pos = load_binlog_position() + + stream_kwargs = { + "connection_settings": mysql_settings, + "server_id": server_id, + "only_events": [DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], + "resume_stream": True, + "blocking": False + } + + # If we have a saved position, use it + if log_file and log_pos: + stream_kwargs["log_file"] = log_file + stream_kwargs["log_pos"] = log_pos + logger.info(f"Resuming binlog stream from saved position: {log_file}:{log_pos}") + else: + logger.info("No saved binlog position found, starting from current position") + + stream = BinLogStreamReader(**stream_kwargs) + return stream + +def get_changes(stream, schema_name: str, table_name: str): + """Get changes from MySQL binlog stream and save position after processing""" + changes = [] + last_position = None + + # Read available events (non-blocking) + for binlogevent in stream: + # Update position tracking + if hasattr(stream, 'log_file') and hasattr(stream, 'log_pos'): + last_position = (stream.log_file, stream.log_pos) + + # Filter by schema and table + if binlogevent.schema == schema_name and binlogevent.table == table_name: + + if isinstance(binlogevent, WriteRowsEvent): + # INSERT operation + for row in binlogevent.rows: + change = { + "kind": "insert", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": list(row["values"].keys()), + "columnvalues": [serialize_value(v) for v in row["values"].values()], + "oldkeys": {} + } + changes.append(change) + + elif isinstance(binlogevent, UpdateRowsEvent): + # UPDATE operation + for row in binlogevent.rows: + change = { + "kind": "update", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": list(row["after_values"].keys()), + "columnvalues": [serialize_value(v) for v in row["after_values"].values()], + "oldkeys": { + "keynames": list(row["before_values"].keys()), + "keyvalues": [serialize_value(v) for v in row["before_values"].values()] + } + } + changes.append(change) + + elif isinstance(binlogevent, DeleteRowsEvent): + # DELETE operation + for row in binlogevent.rows: + change = { + "kind": "delete", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": list(row["values"].keys()), + "keyvalues": [serialize_value(v) for v in row["values"].values()] + } + } + changes.append(change) + + # Save position if we processed any events + if last_position and changes: + save_binlog_position(last_position[0], last_position[1]) + + return changes + +def perform_initial_snapshot(schema_name: str, table_name: str, batch_size: int = 1000): + """Perform initial snapshot of the table and return all existing rows as insert events""" + conn = connect_mysql(os.environ["MYSQL_SNAPSHOT_HOST"]) + changes = [] + + try: + with conn.cursor() as cursor: + # Get total row count for logging + cursor.execute(f"SELECT COUNT(*) FROM `{schema_name}`.`{table_name}`") + total_rows = cursor.fetchone()[0] + logger.info(f"Starting initial snapshot of {schema_name}.{table_name} - {total_rows} rows") + + # Use LIMIT with OFFSET for batching to avoid memory issues with large tables + offset = 0 + processed_rows = 0 + + while True: + # Fetch batch of rows + cursor.execute(f"SELECT * FROM `{schema_name}`.`{table_name}` LIMIT {batch_size} OFFSET {offset}") + rows = cursor.fetchall() + + if not rows: + break + + # Get column names + column_names = [desc[0] for desc in cursor.description] + + # Convert each row to a change event + for row in rows: + # Convert row tuple to dictionary + row_dict = dict(zip(column_names, row)) + + # Convert values to JSON-serializable format + serialized_values = [serialize_value(value) for value in row_dict.values()] + + change = { + "kind": "snapshot_insert", # Different kind to distinguish from real inserts + "schema": schema_name, + "table": table_name, + "columnnames": column_names, + "columnvalues": serialized_values, + "oldkeys": {} + } + changes.append(change) + + processed_rows += len(rows) + offset += batch_size + + if processed_rows % 50000 == 0: # Log progress every 50k rows + logger.info(f"Snapshot progress: {processed_rows}/{total_rows} rows processed") + + logger.info(f"Initial snapshot completed: {processed_rows} rows captured") + + except Exception as e: + logger.error(f"Error during initial snapshot: {e}") + raise + finally: + conn.close() + + return changes \ No newline at end of file diff --git a/python/sources/mysql_cdc/requirements.txt b/python/sources/mysql_cdc/requirements.txt new file mode 100644 index 00000000..98619bb5 --- /dev/null +++ b/python/sources/mysql_cdc/requirements.txt @@ -0,0 +1,4 @@ +quixstreams==3.15.0 +pymysql +mysql-replication +python-dotenv \ No newline at end of file diff --git a/python/sources/mysql_cdc/setup_logger.py b/python/sources/mysql_cdc/setup_logger.py new file mode 100644 index 00000000..e6713f05 --- /dev/null +++ b/python/sources/mysql_cdc/setup_logger.py @@ -0,0 +1,13 @@ +import logging + +# Set up logger +PROD_ENV = True +logger = logging.getLogger("MySQL CDC") +logging.basicConfig() + +if PROD_ENV: + logger.setLevel(logging.INFO) + logger.info("Running in Production Mode...") +else: + logger.setLevel(logging.DEBUG) + logger.info("Running in Debug Mode...") \ No newline at end of file