Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions mysql_ch_replicator/db_replicator_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ def handle_query_event(self, event: LogEvent):
if query.lower().startswith('rename table'):
self.upload_records()
self.handle_rename_table_query(query, event.db_name)
if query.lower().startswith('truncate'):
self.upload_records()
self.handle_truncate_query(query, event.db_name)

def handle_alter_query(self, query, db_name):
self.replicator.converter.convert_alter_query(query, db_name)
Expand Down Expand Up @@ -253,6 +256,35 @@ def handle_rename_table_query(self, query, db_name):
ch_clauses.append(f"`{src_db_name}`.`{src_table_name}` TO `{dest_db_name}`.`{dest_table_name}`")
self.replicator.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')

def handle_truncate_query(self, query, db_name):
"""Handle TRUNCATE TABLE operations by clearing data in ClickHouse"""
tokens = query.strip().split()
if len(tokens) < 3 or tokens[0].lower() != 'truncate' or tokens[1].lower() != 'table':
raise Exception('Invalid TRUNCATE query format', query)

# Get table name from the third token (after TRUNCATE TABLE)
table_token = tokens[2]

# Parse database and table name from the token
db_name, table_name, matches_config = self.replicator.converter.get_db_and_table_name(table_token, db_name)
if not matches_config:
return

# Check if table exists in our tracking
if table_name not in self.replicator.state.tables_structure:
logger.warning(f'TRUNCATE: Table {table_name} not found in tracked tables, skipping')
return

# Clear any pending records for this table
if table_name in self.records_to_insert:
self.records_to_insert[table_name].clear()
if table_name in self.records_to_delete:
self.records_to_delete[table_name].clear()

# Execute TRUNCATE on ClickHouse
logger.info(f'Executing TRUNCATE on ClickHouse table: {db_name}.{table_name}')
self.replicator.clickhouse_api.execute_command(f'TRUNCATE TABLE `{db_name}`.`{table_name}`')

def log_stats_if_required(self):
curr_time = time.time()
if curr_time - self.last_dump_stats_time < self.STATS_DUMP_INTERVAL:
Expand Down
89 changes: 89 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2658,3 +2658,92 @@ def test_issue_160_unknown_mysql_type_bug():
assert mysql_structure.table_name == 'test_table'
assert len(mysql_structure.fields) == 17 # All columns should be parsed
assert mysql_structure.primary_keys == ['id', 'col_e']

def test_truncate_operation_bug_issue_155():
"""
Test to reproduce the bug from issue #155.

Bug Description: TRUNCATE operation is not replicated - data is not cleared on ClickHouse side

This test should FAIL until the bug is fixed.
When the bug is present: TRUNCATE will not clear ClickHouse data and the test will FAIL
When the bug is fixed: TRUNCATE will clear ClickHouse data and the test will PASS
"""
cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

# Create a test table
mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
age int,
PRIMARY KEY (id)
);
''')

# Insert test data
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Alice', 25);", commit=True)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Bob', 30);", commit=True)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Charlie', 35);", commit=True)

# Start replication
binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

# Wait for initial replication
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)

# Verify data is replicated correctly
mysql.execute(f"SELECT COUNT(*) FROM `{TEST_TABLE_NAME}`")
mysql_count = mysql.cursor.fetchall()[0][0]
assert mysql_count == 3

ch_count = len(ch.select(TEST_TABLE_NAME))
assert ch_count == 3

# Execute TRUNCATE TABLE in MySQL
mysql.execute(f"TRUNCATE TABLE `{TEST_TABLE_NAME}`;", commit=True)

# Verify MySQL table is now empty
mysql.execute(f"SELECT COUNT(*) FROM `{TEST_TABLE_NAME}`")
mysql_count_after_truncate = mysql.cursor.fetchall()[0][0]
assert mysql_count_after_truncate == 0, "MySQL table should be empty after TRUNCATE"

# Wait for replication to process the TRUNCATE operation
time.sleep(5) # Give some time for the operation to be processed

# This is where the bug manifests: ClickHouse table should be empty but it's not
# When the bug is present, this assertion will FAIL because data is not cleared in ClickHouse
ch_count_after_truncate = len(ch.select(TEST_TABLE_NAME))
assert ch_count_after_truncate == 0, f"ClickHouse table should be empty after TRUNCATE, but contains {ch_count_after_truncate} records"

# Insert new data to verify replication still works after TRUNCATE
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Dave', 40);", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

# Verify the new record
new_record = ch.select(TEST_TABLE_NAME, where="name='Dave'")
assert len(new_record) == 1
assert new_record[0]['age'] == 40

# Clean up
db_replicator_runner.stop()
binlog_replicator_runner.stop()