Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ target_databases: # optional
source_db_in_mysql_2: destination_db_in_clickhouse_2
...

target_tables: # optional
source_db_in_mysql_1.source_table_in_mysql_1: destination_table_name_1
source_db_in_mysql_1.source_table_in_mysql_2: destination_table_name_2
...

log_level: 'info' # optional
optimize_interval: 86400 # optional
auto_restart_interval: 3600 # optional
Expand Down Expand Up @@ -265,6 +270,7 @@ mysql_timezone: 'UTC' # optional, timezone for MySQL timestamp conversion (de
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
- `target_databases` - if you want database in ClickHouse to have different name from MySQL database
- `target_tables` - if you want table in ClickHouse to have different name from MySQL table. Specify as `source_database.source_table: target_table_name`. The target database is determined by existing rules (e.g., `target_databases` mapping). This mapping applies to both initial and realtime replication, including DDL operations like ALTER, DROP, etc.
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.
Expand Down
8 changes: 6 additions & 2 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,12 @@ def erase(self, table_name, field_name, field_values):
def drop_database(self, db_name):
self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`')

def create_database(self, db_name):
self.execute_command(f'CREATE DATABASE `{db_name}`')
def create_database(self, db_name, if_not_exists=False):
if_not_exists_clause = 'IF NOT EXISTS ' if if_not_exists else ''
self.execute_command(f'CREATE DATABASE {if_not_exists_clause}`{db_name}`')

def drop_table(self, table_name):
self.execute_command(f'DROP TABLE IF EXISTS `{self.database}`.`{table_name}`')

def select(self, table_name, where=None, final=None):
query = f'SELECT * FROM {table_name}'
Expand Down
31 changes: 31 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__(self):
self.http_port = 0
self.types_mapping = {}
self.target_databases = {}
self.target_tables = {}
self.initial_replication_threads = 0
self.ignore_deletes = False
self.mysql_timezone = 'UTC'
Expand Down Expand Up @@ -174,6 +175,7 @@ def load(self, settings_file):
self.http_host = data.pop('http_host', '')
self.http_port = data.pop('http_port', 0)
self.target_databases = data.pop('target_databases', {})
self.target_tables = data.pop('target_tables', {})
self.initial_replication_threads = data.pop('initial_replication_threads', 0)
self.ignore_deletes = data.pop('ignore_deletes', False)
self.mysql_timezone = data.pop('mysql_timezone', 'UTC')
Expand Down Expand Up @@ -272,13 +274,42 @@ def get_post_initial_replication_commands(self, db_name):
results.extend(cmd_config.commands)
return results

def is_multiple_mysql_dbs_to_single_ch_db(self, mysql_database: str, target_database: str) -> bool:
"""
Check if multiple MySQL databases are being replicated to the same ClickHouse database.

Args:
mysql_database: The MySQL database being replicated
target_database: The ClickHouse target database

Returns:
True if multiple MySQL databases map to the same ClickHouse database
"""
if not self.target_databases:
return False

same_target_count = 0
for mysql_db, ch_db in self.target_databases.items():
if ch_db == target_database:
same_target_count += 1
if same_target_count > 1:
return True

return False

def get_target_table_name(self, source_database: str, source_table: str) -> str:
key = f'{source_database}.{source_table}'
return self.target_tables.get(key, source_table)

def validate(self):
self.mysql.validate()
self.clickhouse.validate()
self.binlog_replicator.validate()
self.validate_log_level()
if not isinstance(self.target_databases, dict):
raise ValueError(f'wrong target databases {self.target_databases}')
if not isinstance(self.target_tables, dict):
raise ValueError(f'wrong target tables {self.target_tables}')
if not isinstance(self.initial_replication_threads, int):
raise ValueError(f'initial_replication_threads should be an integer, not {type(self.initial_replication_threads)}')
if self.initial_replication_threads < 0:
Expand Down
17 changes: 11 additions & 6 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
column_after,
)

query = f'ALTER TABLE `{db_name}`.`{table_name}` ADD COLUMN `{column_name}` {column_type_ch}'
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` ADD COLUMN `{column_name}` {column_type_ch}'
if column_first:
query += ' FIRST'
else:
Expand All @@ -864,7 +865,8 @@ def __convert_alter_table_drop_column(self, db_name, table_name, tokens):
mysql_table_structure.remove_field(field_name=column_name)
ch_table_structure.remove_field(field_name=column_name)

query = f'ALTER TABLE `{db_name}`.`{table_name}` DROP COLUMN {column_name}'
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` DROP COLUMN {column_name}'
if self.db_replicator:
self.db_replicator.clickhouse_api.execute_command(query)

Expand Down Expand Up @@ -892,7 +894,8 @@ def __convert_alter_table_modify_column(self, db_name, table_name, tokens):
TableField(name=column_name, field_type=column_type_ch),
)

query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN `{column_name}` {column_type_ch}'
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` MODIFY COLUMN `{column_name}` {column_type_ch}'
if self.db_replicator:
self.db_replicator.clickhouse_api.execute_command(query)

Expand All @@ -914,6 +917,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
ch_table_structure: TableStructure = table_structure[1]

current_column_type_ch = ch_table_structure.get_field(column_name).field_type
target_table_name = self.db_replicator.get_target_table_name(table_name)

if current_column_type_ch != column_type_ch:

Expand All @@ -925,7 +929,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
TableField(name=column_name, field_type=column_type_ch),
)

query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN {column_name} {column_type_ch}'
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` MODIFY COLUMN {column_name} {column_type_ch}'
self.db_replicator.clickhouse_api.execute_command(query)

if column_name != new_column_name:
Expand All @@ -935,7 +939,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
curr_field_mysql.name = new_column_name
curr_field_clickhouse.name = new_column_name

query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
self.db_replicator.clickhouse_api.execute_command(query)

def __convert_alter_table_rename_column(self, db_name, table_name, tokens):
Expand Down Expand Up @@ -981,7 +985,8 @@ def __convert_alter_table_rename_column(self, db_name, table_name, tokens):
ch_table_structure.preprocess()

# Execute the RENAME COLUMN command in ClickHouse
query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN `{old_column_name}` TO `{new_column_name}`'
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` RENAME COLUMN `{old_column_name}` TO `{new_column_name}`'
if self.db_replicator:
self.db_replicator.clickhouse_api.execute_command(query)

Expand Down
28 changes: 23 additions & 5 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
self.state_path = os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl')
self.is_parallel_worker = False

# Check if multiple MySQL databases are being replicated to the same ClickHouse database
self.is_multi_mysql_to_single_ch = self.config.is_multiple_mysql_dbs_to_single_ch_db(
self.database, self.target_database
)

self.target_database_tmp = self.target_database + '_tmp'
if self.is_parallel_worker:
self.target_database_tmp = self.target_database
Expand All @@ -149,6 +154,11 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
# This must be set here to ensure consistency between first run and resume
if self.config.ignore_deletes:
self.target_database_tmp = self.target_database

# If multiple MySQL databases map to same ClickHouse database, replicate directly
if self.is_multi_mysql_to_single_ch:
self.target_database_tmp = self.target_database
logger.info(f'detected multiple MySQL databases mapping to {self.target_database} - using direct replication')

self.mysql_api = MySQLApi(
database=self.database,
Expand All @@ -174,6 +184,9 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
def create_state(self):
return State(self.state_path)

def get_target_table_name(self, source_table: str) -> str:
return self.config.get_target_table_name(self.database, source_table)

def validate_database_settings(self):
if not self.initial_only:
final_setting = self.clickhouse_api.get_system_setting('final')
Expand Down Expand Up @@ -206,16 +219,21 @@ def run(self):
self.run_realtime_replication()
return

# If ignore_deletes is enabled, we don't create a temporary DB and don't swap DBs
# If ignore_deletes is enabled OR multiple MySQL databases map to same ClickHouse database,
# we don't create a temporary DB and don't swap DBs
# We replicate directly into the target DB
if self.config.ignore_deletes:
logger.info(f'using existing database (ignore_deletes=True)')
if self.config.ignore_deletes or self.is_multi_mysql_to_single_ch:
if self.config.ignore_deletes:
logger.info(f'using existing database (ignore_deletes=True)')
if self.is_multi_mysql_to_single_ch:
logger.info(f'using existing database (multi-mysql-to-single-ch mode)')

self.clickhouse_api.database = self.target_database

# Create database if it doesn't exist
# Create database if it doesn't exist (use IF NOT EXISTS to avoid race condition)
if self.target_database not in self.clickhouse_api.get_databases():
logger.info(f'creating database {self.target_database}')
self.clickhouse_api.create_database(db_name=self.target_database)
self.clickhouse_api.create_database(db_name=self.target_database, if_not_exists=True)
else:
logger.info('recreating database')
self.clickhouse_api.database = self.target_database_tmp
Expand Down
26 changes: 21 additions & 5 deletions mysql_ch_replicator/db_replicator_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def create_initial_structure_table(self, table_name):
self.validate_mysql_structure(mysql_structure)
clickhouse_structure = self.replicator.converter.convert_table_structure(mysql_structure)

target_table_name = self.replicator.get_target_table_name(table_name)
clickhouse_structure.table_name = target_table_name

# Always set if_not_exists to True to prevent errors when tables already exist
clickhouse_structure.if_not_exists = True

Expand All @@ -56,6 +59,11 @@ def create_initial_structure_table(self, table_name):
partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, table_name)

if not self.replicator.is_parallel_worker:
# Drop table if multiple MySQL databases map to same ClickHouse database
if self.replicator.is_multi_mysql_to_single_ch:
logger.info(f'dropping table {target_table_name} before recreating (multi-mysql-to-single-ch mode)')
self.replicator.clickhouse_api.drop_table(target_table_name)

self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes, additional_partition_bys=partition_bys)

def validate_mysql_structure(self, mysql_structure: TableStructure):
Expand Down Expand Up @@ -106,9 +114,15 @@ def perform_initial_replication(self):
# Verify table structures after replication but before swapping databases
self.verify_table_structures_after_replication()

# If ignore_deletes is enabled, we don't swap databases, as we're directly replicating
# to the target database
if not self.replicator.config.ignore_deletes:
# Skip database swap if:
# 1. ignore_deletes is enabled - we're replicating directly to target
# 2. Multiple MySQL databases map to same ClickHouse database - we're replicating directly to target
should_skip_db_swap = (
self.replicator.config.ignore_deletes or
self.replicator.is_multi_mysql_to_single_ch
)

if not should_skip_db_swap:
logger.info(f'initial replication - swapping database')
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
self.replicator.clickhouse_api.execute_command(
Expand Down Expand Up @@ -197,7 +211,8 @@ def perform_initial_replication_table(self, table_name):

if not records:
break
self.replicator.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
target_table_name = self.replicator.get_target_table_name(table_name)
self.replicator.clickhouse_api.insert(target_table_name, records, table_structure=clickhouse_table_structure)
for record in records:
record_primary_key = [record[key_idx] for key_idx in primary_key_ids]
if max_primary_key is None:
Expand Down Expand Up @@ -416,8 +431,9 @@ def consolidate_worker_record_versions(self, table_name):
"""
logger.info(f"Getting maximum record version from ClickHouse for table {table_name}")

target_table_name = self.replicator.get_target_table_name(table_name)
# Query ClickHouse for the maximum record version
max_version = self.replicator.clickhouse_api.get_max_record_version(table_name)
max_version = self.replicator.clickhouse_api.get_max_record_version(target_table_name)

if max_version is not None and max_version > 0:
current_version = self.replicator.state.tables_last_record_version.get(table_name, 0)
Expand Down
Loading