diff --git a/mysql_ch_replicator/db_replicator_realtime.py b/mysql_ch_replicator/db_replicator_realtime.py index 815fdd9..adb42fb 100644 --- a/mysql_ch_replicator/db_replicator_realtime.py +++ b/mysql_ch_replicator/db_replicator_realtime.py @@ -191,6 +191,9 @@ def handle_query_event(self, event: LogEvent): if query.lower().startswith('rename table'): self.upload_records() self.handle_rename_table_query(query, event.db_name) + if query.lower().startswith('truncate'): + self.upload_records() + self.handle_truncate_query(query, event.db_name) def handle_alter_query(self, query, db_name): self.replicator.converter.convert_alter_query(query, db_name) @@ -253,6 +256,35 @@ def handle_rename_table_query(self, query, db_name): ch_clauses.append(f"`{src_db_name}`.`{src_table_name}` TO `{dest_db_name}`.`{dest_table_name}`") self.replicator.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}') + def handle_truncate_query(self, query, db_name): + """Handle TRUNCATE TABLE operations by clearing data in ClickHouse""" + tokens = query.strip().split() + if len(tokens) < 3 or tokens[0].lower() != 'truncate' or tokens[1].lower() != 'table': + raise Exception('Invalid TRUNCATE query format', query) + + # Get table name from the third token (after TRUNCATE TABLE) + table_token = tokens[2] + + # Parse database and table name from the token + db_name, table_name, matches_config = self.replicator.converter.get_db_and_table_name(table_token, db_name) + if not matches_config: + return + + # Check if table exists in our tracking + if table_name not in self.replicator.state.tables_structure: + logger.warning(f'TRUNCATE: Table {table_name} not found in tracked tables, skipping') + return + + # Clear any pending records for this table + if table_name in self.records_to_insert: + self.records_to_insert[table_name].clear() + if table_name in self.records_to_delete: + self.records_to_delete[table_name].clear() + + # Execute TRUNCATE on ClickHouse + logger.info(f'Executing TRUNCATE on ClickHouse table: {db_name}.{table_name}') + self.replicator.clickhouse_api.execute_command(f'TRUNCATE TABLE `{db_name}`.`{table_name}`') + def log_stats_if_required(self): curr_time = time.time() if curr_time - self.last_dump_stats_time < self.STATS_DUMP_INTERVAL: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 9980690..2df242a 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -2658,3 +2658,92 @@ def test_issue_160_unknown_mysql_type_bug(): assert mysql_structure.table_name == 'test_table' assert len(mysql_structure.fields) == 17 # All columns should be parsed assert mysql_structure.primary_keys == ['id', 'col_e'] + +def test_truncate_operation_bug_issue_155(): + """ + Test to reproduce the bug from issue #155. + + Bug Description: TRUNCATE operation is not replicated - data is not cleared on ClickHouse side + + This test should FAIL until the bug is fixed. + When the bug is present: TRUNCATE will not clear ClickHouse data and the test will FAIL + When the bug is fixed: TRUNCATE will clear ClickHouse data and the test will PASS + """ + cfg = config.Settings() + cfg.load(CONFIG_FILE) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + # Create a test table + mysql.execute(f''' +CREATE TABLE `{TEST_TABLE_NAME}` ( + id int NOT NULL AUTO_INCREMENT, + name varchar(255), + age int, + PRIMARY KEY (id) +); + ''') + + # Insert test data + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Alice', 25);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Bob', 30);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Charlie', 35);", commit=True) + + # Start replication + binlog_replicator_runner = BinlogReplicatorRunner() + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) + db_replicator_runner.run() + + # Wait for initial replication + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) + + # Verify data is replicated correctly + mysql.execute(f"SELECT COUNT(*) FROM `{TEST_TABLE_NAME}`") + mysql_count = mysql.cursor.fetchall()[0][0] + assert mysql_count == 3 + + ch_count = len(ch.select(TEST_TABLE_NAME)) + assert ch_count == 3 + + # Execute TRUNCATE TABLE in MySQL + mysql.execute(f"TRUNCATE TABLE `{TEST_TABLE_NAME}`;", commit=True) + + # Verify MySQL table is now empty + mysql.execute(f"SELECT COUNT(*) FROM `{TEST_TABLE_NAME}`") + mysql_count_after_truncate = mysql.cursor.fetchall()[0][0] + assert mysql_count_after_truncate == 0, "MySQL table should be empty after TRUNCATE" + + # Wait for replication to process the TRUNCATE operation + time.sleep(5) # Give some time for the operation to be processed + + # This is where the bug manifests: ClickHouse table should be empty but it's not + # When the bug is present, this assertion will FAIL because data is not cleared in ClickHouse + ch_count_after_truncate = len(ch.select(TEST_TABLE_NAME)) + assert ch_count_after_truncate == 0, f"ClickHouse table should be empty after TRUNCATE, but contains {ch_count_after_truncate} records" + + # Insert new data to verify replication still works after TRUNCATE + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Dave', 40);", commit=True) + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) + + # Verify the new record + new_record = ch.select(TEST_TABLE_NAME, where="name='Dave'") + assert len(new_record) == 1 + assert new_record[0]['age'] == 40 + + # Clean up + db_replicator_runner.stop() + binlog_replicator_runner.stop()