Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ http_port: 9128 # optional
types_mapping: # optional
'char(36)': 'UUID'

ignore_deletes: false # optional, set to true to ignore DELETE operations

```

Expand All @@ -259,6 +260,7 @@ types_mapping: # optional
- `indexes` - you may want to add some indexes to accelerate performance, eg. ngram index for full-test search, etc. To apply indexes you need to start replication from scratch.
- `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands
- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc.
- `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database.

Few more tables / dbs examples:

Expand Down
2 changes: 1 addition & 1 deletion mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def drop_database(self, db_name):
self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`')

def create_database(self, db_name):
self.cursor.execute(f'CREATE DATABASE `{db_name}`')
self.execute_command(f'CREATE DATABASE `{db_name}`')

def select(self, table_name, where=None, final=None):
query = f'SELECT * FROM {table_name}'
Expand Down
2 changes: 2 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self):
self.types_mapping = {}
self.target_databases = {}
self.initial_replication_threads = 0
self.ignore_deletes = False

def load(self, settings_file):
data = open(settings_file, 'r').read()
Expand All @@ -145,6 +146,7 @@ def load(self, settings_file):
self.http_port = data.pop('http_port', 0)
self.target_databases = data.pop('target_databases', {})
self.initial_replication_threads = data.pop('initial_replication_threads', 0)
self.ignore_deletes = data.pop('ignore_deletes', False)

indexes = data.pop('indexes', [])
for index in indexes:
Expand Down
20 changes: 16 additions & 4 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,22 @@ def run(self):
self.run_realtime_replication()
return

logger.info('recreating database')
self.clickhouse_api.database = self.target_database_tmp
if not self.is_parallel_worker:
self.clickhouse_api.recreate_database()
# If ignore_deletes is enabled, we don't create a temporary DB and don't swap DBs
# We replicate directly into the target DB
if self.config.ignore_deletes:
logger.info(f'using existing database (ignore_deletes=True)')
self.clickhouse_api.database = self.target_database
self.target_database_tmp = self.target_database

# Create database if it doesn't exist
if self.target_database not in self.clickhouse_api.get_databases():
logger.info(f'creating database {self.target_database}')
self.clickhouse_api.create_database(db_name=self.target_database)
else:
logger.info('recreating database')
self.clickhouse_api.database = self.target_database_tmp
if not self.is_parallel_worker:
self.clickhouse_api.recreate_database()

self.state.tables = self.mysql_api.get_tables()
self.state.tables = [
Expand Down
29 changes: 16 additions & 13 deletions mysql_ch_replicator/db_replicator_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,22 @@ def perform_initial_replication(self):
# Verify table structures after replication but before swapping databases
self.verify_table_structures_after_replication()

logger.info(f'initial replication - swapping database')
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
self.replicator.clickhouse_api.execute_command(
f'RENAME DATABASE `{self.replicator.target_database}` TO `{self.replicator.target_database}_old`',
)
self.replicator.clickhouse_api.execute_command(
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
)
self.replicator.clickhouse_api.drop_database(f'{self.replicator.target_database}_old')
else:
self.replicator.clickhouse_api.execute_command(
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
)
# If ignore_deletes is enabled, we don't swap databases, as we're directly replicating
# to the target database
if not self.replicator.config.ignore_deletes:
logger.info(f'initial replication - swapping database')
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
self.replicator.clickhouse_api.execute_command(
f'RENAME DATABASE `{self.replicator.target_database}` TO `{self.replicator.target_database}_old`',
)
self.replicator.clickhouse_api.execute_command(
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
)
self.replicator.clickhouse_api.drop_database(f'{self.replicator.target_database}_old')
else:
self.replicator.clickhouse_api.execute_command(
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
)
self.replicator.clickhouse_api.database = self.replicator.target_database
logger.info(f'initial replication - done')

Expand Down
11 changes: 11 additions & 0 deletions mysql_ch_replicator/db_replicator_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ def handle_erase_event(self, event: LogEvent):
f'table: {event.table_name}, '
f'records: {event.records}',
)

# If ignore_deletes is enabled, skip processing delete events
if self.replicator.config.ignore_deletes:
if self.replicator.config.debug_log_level:
logger.debug(
f'ignoring erase event (ignore_deletes=True): {event.transaction_id}, '
f'table: {event.table_name}, '
f'records: {len(event.records)}',
)
return

self.replicator.stats.erase_events_count += 1
self.replicator.stats.erase_records_count += len(event.records)

Expand Down
142 changes: 142 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2454,3 +2454,145 @@ def test_dynamic_column_addition_user_config():
db_pid = get_db_replicator_pid(cfg, "test_replication")
if db_pid:
kill_process(db_pid)


def test_ignore_deletes():
# Create a temporary config file with ignore_deletes=True
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file:
config_file = temp_config_file.name

# Read the original config
with open(CONFIG_FILE, 'r') as original_config:
config_data = yaml.safe_load(original_config)

# Add ignore_deletes=True
config_data['ignore_deletes'] = True

# Write to the temp file
yaml.dump(config_data, temp_config_file)

try:
cfg = config.Settings()
cfg.load(config_file)

# Verify the ignore_deletes option was set
assert cfg.ignore_deletes is True

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

# Create a table with a composite primary key
mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
departments int(11) NOT NULL,
termine int(11) NOT NULL,
data varchar(255) NOT NULL,
PRIMARY KEY (departments,termine)
)
''')

# Insert initial records
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (10, 20, 'data1');", commit=True)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (30, 40, 'data2');", commit=True)
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (50, 60, 'data3');", commit=True)

# Run the replicator with ignore_deletes=True
run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

# Wait for replication to complete
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)

# Delete some records from MySQL
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=10;", commit=True)
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=30;", commit=True)

# Wait a moment to ensure replication processes the events
time.sleep(5)

# Verify records are NOT deleted in ClickHouse (since ignore_deletes=True)
# The count should still be 3
assert len(ch.select(TEST_TABLE_NAME)) == 3, "Deletions were processed despite ignore_deletes=True"

# Insert a new record and verify it's added
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (70, 80, 'data4');", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)

# Verify the new record is correctly added
result = ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80")
assert len(result) == 1
assert result[0]['data'] == 'data4'

# Clean up
run_all_runner.stop()

# Verify no errors occurred
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert('Traceback' not in read_logs(TEST_DB_NAME))

# Additional tests for persistence after restart

# 1. Remove all entries from table in MySQL
mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE 1=1;", commit=True)

# Add a new row in MySQL before starting the replicator
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (110, 120, 'offline_data');", commit=True)

# 2. Wait 5 seconds
time.sleep(5)

# 3. Remove binlog directory (similar to prepare_env, but without removing tables)
if os.path.exists(cfg.binlog_replicator.data_dir):
shutil.rmtree(cfg.binlog_replicator.data_dir)
os.mkdir(cfg.binlog_replicator.data_dir)


# 4. Create and run a new runner
new_runner = RunAllRunner(cfg_file=config_file)
new_runner.run()

# 5. Ensure it has all the previous data (should still be 4 records from before + 1 new offline record)
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5)

# Verify we still have all the old data
assert len(ch.select(TEST_TABLE_NAME, where="departments=10 AND termine=20")) == 1
assert len(ch.select(TEST_TABLE_NAME, where="departments=30 AND termine=40")) == 1
assert len(ch.select(TEST_TABLE_NAME, where="departments=50 AND termine=60")) == 1
assert len(ch.select(TEST_TABLE_NAME, where="departments=70 AND termine=80")) == 1

# Verify the offline data was replicated
assert len(ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")) == 1
offline_data = ch.select(TEST_TABLE_NAME, where="departments=110 AND termine=120")[0]
assert offline_data['data'] == 'offline_data'

# 6. Insert new data and verify it gets added to existing data
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine, data) VALUES (90, 100, 'data5');", commit=True)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6)

# Verify the combined old and new data
result = ch.select(TEST_TABLE_NAME, where="departments=90 AND termine=100")
assert len(result) == 1
assert result[0]['data'] == 'data5'

# Make sure we have all 6 records (4 original + 1 offline + 1 new one)
assert len(ch.select(TEST_TABLE_NAME)) == 6

new_runner.stop()
finally:
# Clean up the temporary config file
os.unlink(config_file)