diff --git a/README.md b/README.md index 81077e1..89057d9 100644 --- a/README.md +++ b/README.md @@ -236,6 +236,7 @@ http_port: 9128 # optional types_mapping: # optional 'char(36)': 'UUID' +ignore_deletes: false # optional, set to true to ignore DELETE operations ``` @@ -259,6 +260,7 @@ types_mapping: # optional - `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch. - `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands - `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc. +- `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database. Few more tables / dbs examples: diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 411a21a..c310899 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -264,7 +264,7 @@ def drop_database(self, db_name): self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`') def create_database(self, db_name): - self.cursor.execute(f'CREATE DATABASE `{db_name}`') + self.execute_command(f'CREATE DATABASE `{db_name}`') def select(self, table_name, where=None, final=None): query = f'SELECT * FROM {table_name}' diff --git a/mysql_ch_replicator/config.py b/mysql_ch_replicator/config.py index 57c7b3c..148a2b7 100644 --- a/mysql_ch_replicator/config.py +++ b/mysql_ch_replicator/config.py @@ -120,6 +120,7 @@ def __init__(self): self.types_mapping = {} self.target_databases = {} self.initial_replication_threads = 0 + self.ignore_deletes = False def load(self, settings_file): data = open(settings_file, 'r').read() @@ -145,6 +146,7 @@ def load(self, settings_file): self.http_port = data.pop('http_port', 0) self.target_databases = data.pop('target_databases', {}) self.initial_replication_threads = data.pop('initial_replication_threads', 0) + self.ignore_deletes = data.pop('ignore_deletes', False) indexes = data.pop('indexes', []) for index in indexes: diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 160a793..2e0b2bb 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -200,10 +200,22 @@ def run(self): self.run_realtime_replication() return - logger.info('recreating database') - self.clickhouse_api.database = self.target_database_tmp - if not self.is_parallel_worker: - self.clickhouse_api.recreate_database() + # If ignore_deletes is enabled, we don't create a temporary DB and don't swap DBs + # We replicate directly into the target DB + if self.config.ignore_deletes: + logger.info(f'using existing database (ignore_deletes=True)') + self.clickhouse_api.database = self.target_database + self.target_database_tmp = self.target_database + + # Create database if it doesn't exist + if self.target_database not in self.clickhouse_api.get_databases(): + logger.info(f'creating database {self.target_database}') + self.clickhouse_api.create_database(db_name=self.target_database) + else: + logger.info('recreating database') + self.clickhouse_api.database = self.target_database_tmp + if not self.is_parallel_worker: + self.clickhouse_api.recreate_database() self.state.tables = self.mysql_api.get_tables() self.state.tables = [ diff --git a/mysql_ch_replicator/db_replicator_initial.py b/mysql_ch_replicator/db_replicator_initial.py index 3c83d1f..ecc03d8 100644 --- a/mysql_ch_replicator/db_replicator_initial.py +++ b/mysql_ch_replicator/db_replicator_initial.py @@ -106,19 +106,22 @@ def perform_initial_replication(self): # Verify table structures after replication but before swapping databases self.verify_table_structures_after_replication() - logger.info(f'initial replication - swapping database') - if self.replicator.target_database in self.replicator.clickhouse_api.get_databases(): - self.replicator.clickhouse_api.execute_command( - f'RENAME DATABASE `{self.replicator.target_database}` TO `{self.replicator.target_database}_old`', - ) - self.replicator.clickhouse_api.execute_command( - f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`', - ) - self.replicator.clickhouse_api.drop_database(f'{self.replicator.target_database}_old') - else: - self.replicator.clickhouse_api.execute_command( - f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`', - ) + # If ignore_deletes is enabled, we don't swap databases, as we're directly replicating + # to the target database + if not self.replicator.config.ignore_deletes: + logger.info(f'initial replication - swapping database') + if self.replicator.target_database in self.replicator.clickhouse_api.get_databases(): + self.replicator.clickhouse_api.execute_command( + f'RENAME DATABASE `{self.replicator.target_database}` TO `{self.replicator.target_database}_old`', + ) + self.replicator.clickhouse_api.execute_command( + f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`', + ) + self.replicator.clickhouse_api.drop_database(f'{self.replicator.target_database}_old') + else: + self.replicator.clickhouse_api.execute_command( + f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`', + ) self.replicator.clickhouse_api.database = self.replicator.target_database logger.info(f'initial replication - done') diff --git a/mysql_ch_replicator/db_replicator_realtime.py b/mysql_ch_replicator/db_replicator_realtime.py index 409e55d..0856ba5 100644 --- a/mysql_ch_replicator/db_replicator_realtime.py +++ b/mysql_ch_replicator/db_replicator_realtime.py @@ -148,6 +148,17 @@ def handle_erase_event(self, event: LogEvent): f'table: {event.table_name}, ' f'records: {event.records}', ) + + # If ignore_deletes is enabled, skip processing delete events + if self.replicator.config.ignore_deletes: + if self.replicator.config.debug_log_level: + logger.debug( + f'ignoring erase event (ignore_deletes=True): {event.transaction_id}, ' + f'table: {event.table_name}, ' + f'records: {len(event.records)}', + ) + return + self.replicator.stats.erase_events_count += 1 self.replicator.stats.erase_records_count += len(event.records) diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 918ef1b..deac3f1 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -2454,3 +2454,145 @@ def test_dynamic_column_addition_user_config(): db_pid = get_db_replicator_pid(cfg, "test_replication") if db_pid: kill_process(db_pid) + + +def test_ignore_deletes(): + # Create a temporary config file with ignore_deletes=True + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file: + config_file = temp_config_file.name + + # Read the original config + with open(CONFIG_FILE, 'r') as original_config: + config_data = yaml.safe_load(original_config) + + # Add ignore_deletes=True + config_data['ignore_deletes'] = True + + # Write to the temp file + yaml.dump(config_data, temp_config_file) + + try: + cfg = config.Settings() + cfg.load(config_file) + + # Verify the ignore_deletes option was set + assert cfg.ignore_deletes is True + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + # Create a table with a composite primary key + mysql.execute(f''' + CREATE TABLE `{TEST_TABLE_NAME}` ( + departments int(11) NOT NULL, + termine int(11) NOT NULL, + data varchar(255) NOT NULL, + PRIMARY KEY (departments,termine) + ) + ''') + + # Insert initial records + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (10, 20, 'data1');", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (30, 40, 'data2');", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (50, 60, 'data3');", commit=True) + + # Run the replicator with ignore_deletes=True + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + # Wait for replication to complete + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + + # Delete some records from MySQL + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=10;", commit=True) + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=30;", commit=True) + + # Wait a moment to ensure replication processes the events + time.sleep(5) + + # Verify records are NOT deleted in ClickHouse (since ignore_deletes=True) + # The count should still be 3 + assert len(ch.select(TEST_TABLE_NAME)) == 3, "Deletions were processed despite ignore_deletes=True" + + # Insert a new record and verify it's added + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (70, 80, 'data4');", commit=True) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) + + # Verify the new record is correctly added + result = ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80") + assert len(result) == 1 + assert result[0]['data'] == 'data4' + + # Clean up + run_all_runner.stop() + + # Verify no errors occurred + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert('Traceback' not in read_logs(TEST_DB_NAME)) + + # Additional tests for persistence after restart + + # 1. Remove all entries from table in MySQL + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE 1=1;", commit=True) + + # Add a new row in MySQL before starting the replicator + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (110, 120, 'offline_data');", commit=True) + + # 2. Wait 5 seconds + time.sleep(5) + + # 3. Remove binlog directory (similar to prepare_env, but without removing tables) + if os.path.exists(cfg.binlog_replicator.data_dir): + shutil.rmtree(cfg.binlog_replicator.data_dir) + os.mkdir(cfg.binlog_replicator.data_dir) + + + # 4. Create and run a new runner + new_runner = RunAllRunner(cfg_file=config_file) + new_runner.run() + + # 5. Ensure it has all the previous data (should still be 4 records from before + 1 new offline record) + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5) + + # Verify we still have all the old data + assert len(ch.select(TEST_TABLE_NAME, where="departments=10 AND termine=20")) == 1 + assert len(ch.select(TEST_TABLE_NAME, where="departments=30 AND termine=40")) == 1 + assert len(ch.select(TEST_TABLE_NAME, where="departments=50 AND termine=60")) == 1 + assert len(ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80")) == 1 + + # Verify the offline data was replicated + assert len(ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")) == 1 + offline_data = ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")[0] + assert offline_data['data'] == 'offline_data' + + # 6. Insert new data and verify it gets added to existing data + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (90, 100, 'data5');", commit=True) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6) + + # Verify the combined old and new data + result = ch.select(TEST_TABLE_NAME, where="departments=90 AND termine=100") + assert len(result) == 1 + assert result[0]['data'] == 'data5' + + # Make sure we have all 6 records (4 original + 1 offline + 1 new one) + assert len(ch.select(TEST_TABLE_NAME)) == 6 + + new_runner.stop() + finally: + # Clean up the temporary config file + os.unlink(config_file)