Skip to content

Commit 471ac9f

Browse files
committed
Fix TRUNCATE operation not replicated to ClickHouse (issue #155)
- Add TRUNCATE TABLE handling in DbReplicatorRealtime.handle_query_event() - Implement handle_truncate_query() method to parse and execute TRUNCATE on ClickHouse - Clear pending insert/delete operations when TRUNCATE is processed - Add test_truncate_operation_bug_issue_155() to reproduce and verify the fix - Test confirms TRUNCATE now properly clears data in both MySQL and ClickHouse Fixes #155
1 parent 2965907 commit 471ac9f

File tree

2 files changed

+121
-0
lines changed

2 files changed

+121
-0
lines changed

mysql_ch_replicator/db_replicator_realtime.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ def handle_query_event(self, event: LogEvent):
191191
if query.lower().startswith('rename table'):
192192
self.upload_records()
193193
self.handle_rename_table_query(query, event.db_name)
194+
if query.lower().startswith('truncate'):
195+
self.upload_records()
196+
self.handle_truncate_query(query, event.db_name)
194197

195198
def handle_alter_query(self, query, db_name):
196199
self.replicator.converter.convert_alter_query(query, db_name)
@@ -253,6 +256,35 @@ def handle_rename_table_query(self, query, db_name):
253256
ch_clauses.append(f"`{src_db_name}`.`{src_table_name}` TO `{dest_db_name}`.`{dest_table_name}`")
254257
self.replicator.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')
255258

259+
def handle_truncate_query(self, query, db_name):
260+
"""Handle TRUNCATE TABLE operations by clearing data in ClickHouse"""
261+
tokens = query.strip().split()
262+
if len(tokens) < 3 or tokens[0].lower() != 'truncate' or tokens[1].lower() != 'table':
263+
raise Exception('Invalid TRUNCATE query format', query)
264+
265+
# Get table name from the third token (after TRUNCATE TABLE)
266+
table_token = tokens[2]
267+
268+
# Parse database and table name from the token
269+
db_name, table_name, matches_config = self.replicator.converter.get_db_and_table_name(table_token, db_name)
270+
if not matches_config:
271+
return
272+
273+
# Check if table exists in our tracking
274+
if table_name not in self.replicator.state.tables_structure:
275+
logger.warning(f'TRUNCATE: Table {table_name} not found in tracked tables, skipping')
276+
return
277+
278+
# Clear any pending records for this table
279+
if table_name in self.records_to_insert:
280+
self.records_to_insert[table_name].clear()
281+
if table_name in self.records_to_delete:
282+
self.records_to_delete[table_name].clear()
283+
284+
# Execute TRUNCATE on ClickHouse
285+
logger.info(f'Executing TRUNCATE on ClickHouse table: {db_name}.{table_name}')
286+
self.replicator.clickhouse_api.execute_command(f'TRUNCATE TABLE `{db_name}`.`{table_name}`')
287+
256288
def log_stats_if_required(self):
257289
curr_time = time.time()
258290
if curr_time - self.last_dump_stats_time < self.STATS_DUMP_INTERVAL:

test_mysql_ch_replicator.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2658,3 +2658,92 @@ def test_issue_160_unknown_mysql_type_bug():
26582658
assert mysql_structure.table_name == 'test_table'
26592659
assert len(mysql_structure.fields) == 17 # All columns should be parsed
26602660
assert mysql_structure.primary_keys == ['id', 'col_e']
2661+
2662+
def test_truncate_operation_bug_issue_155():
2663+
"""
2664+
Test to reproduce the bug from issue #155.
2665+
2666+
Bug Description: TRUNCATE operation is not replicated - data is not cleared on ClickHouse side
2667+
2668+
This test should FAIL until the bug is fixed.
2669+
When the bug is present: TRUNCATE will not clear ClickHouse data and the test will FAIL
2670+
When the bug is fixed: TRUNCATE will clear ClickHouse data and the test will PASS
2671+
"""
2672+
cfg = config.Settings()
2673+
cfg.load(CONFIG_FILE)
2674+
2675+
mysql = mysql_api.MySQLApi(
2676+
database=None,
2677+
mysql_settings=cfg.mysql,
2678+
)
2679+
2680+
ch = clickhouse_api.ClickhouseApi(
2681+
database=TEST_DB_NAME,
2682+
clickhouse_settings=cfg.clickhouse,
2683+
)
2684+
2685+
prepare_env(cfg, mysql, ch)
2686+
2687+
# Create a test table
2688+
mysql.execute(f'''
2689+
CREATE TABLE `{TEST_TABLE_NAME}` (
2690+
id int NOT NULL AUTO_INCREMENT,
2691+
name varchar(255),
2692+
age int,
2693+
PRIMARY KEY (id)
2694+
);
2695+
''')
2696+
2697+
# Insert test data
2698+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Alice', 25);", commit=True)
2699+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Bob', 30);", commit=True)
2700+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Charlie', 35);", commit=True)
2701+
2702+
# Start replication
2703+
binlog_replicator_runner = BinlogReplicatorRunner()
2704+
binlog_replicator_runner.run()
2705+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
2706+
db_replicator_runner.run()
2707+
2708+
# Wait for initial replication
2709+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
2710+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
2711+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
2712+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
2713+
2714+
# Verify data is replicated correctly
2715+
mysql.execute(f"SELECT COUNT(*) FROM `{TEST_TABLE_NAME}`")
2716+
mysql_count = mysql.cursor.fetchall()[0][0]
2717+
assert mysql_count == 3
2718+
2719+
ch_count = len(ch.select(TEST_TABLE_NAME))
2720+
assert ch_count == 3
2721+
2722+
# Execute TRUNCATE TABLE in MySQL
2723+
mysql.execute(f"TRUNCATE TABLE `{TEST_TABLE_NAME}`;", commit=True)
2724+
2725+
# Verify MySQL table is now empty
2726+
mysql.execute(f"SELECT COUNT(*) FROM `{TEST_TABLE_NAME}`")
2727+
mysql_count_after_truncate = mysql.cursor.fetchall()[0][0]
2728+
assert mysql_count_after_truncate == 0, "MySQL table should be empty after TRUNCATE"
2729+
2730+
# Wait for replication to process the TRUNCATE operation
2731+
time.sleep(5) # Give some time for the operation to be processed
2732+
2733+
# This is where the bug manifests: ClickHouse table should be empty but it's not
2734+
# When the bug is present, this assertion will FAIL because data is not cleared in ClickHouse
2735+
ch_count_after_truncate = len(ch.select(TEST_TABLE_NAME))
2736+
assert ch_count_after_truncate == 0, f"ClickHouse table should be empty after TRUNCATE, but contains {ch_count_after_truncate} records"
2737+
2738+
# Insert new data to verify replication still works after TRUNCATE
2739+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Dave', 40);", commit=True)
2740+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
2741+
2742+
# Verify the new record
2743+
new_record = ch.select(TEST_TABLE_NAME, where="name='Dave'")
2744+
assert len(new_record) == 1
2745+
assert new_record[0]['age'] == 40
2746+
2747+
# Clean up
2748+
db_replicator_runner.stop()
2749+
binlog_replicator_runner.stop()

0 commit comments

Comments
 (0)