Skip to content

Commit e4ae3c6

Browse files
authored
Allow multiple DBs to be replicated into a single one (#206)
1 parent 05cff59 commit e4ae3c6

11 files changed

+506
-28
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,11 @@ target_databases: # optional
226226
source_db_in_mysql_2: destination_db_in_clickhouse_2
227227
...
228228

229+
target_tables: # optional
230+
source_db_in_mysql_1.source_table_in_mysql_1: destination_table_name_1
231+
source_db_in_mysql_1.source_table_in_mysql_2: destination_table_name_2
232+
...
233+
229234
log_level: 'info' # optional
230235
optimize_interval: 86400 # optional
231236
auto_restart_interval: 3600 # optional
@@ -265,6 +270,7 @@ mysql_timezone: 'UTC' # optional, timezone for MySQL timestamp conversion (de
265270
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
266271
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
267272
- `target_databases` - if you want database in ClickHouse to have different name from MySQL database
273+
- `target_tables` - if you want table in ClickHouse to have different name from MySQL table. Specify as `source_database.source_table: target_table_name`. The target database is determined by existing rules (e.g., `target_databases` mapping). This mapping applies to both initial and realtime replication, including DDL operations like ALTER, DROP, etc.
268274
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
269275
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
270276
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.

mysql_ch_replicator/clickhouse_api.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,12 @@ def erase(self, table_name, field_name, field_values):
282282
def drop_database(self, db_name):
283283
self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`')
284284

285-
def create_database(self, db_name):
286-
self.execute_command(f'CREATE DATABASE `{db_name}`')
285+
def create_database(self, db_name, if_not_exists=False):
286+
if_not_exists_clause = 'IF NOT EXISTS ' if if_not_exists else ''
287+
self.execute_command(f'CREATE DATABASE {if_not_exists_clause}`{db_name}`')
288+
289+
def drop_table(self, table_name):
290+
self.execute_command(f'DROP TABLE IF EXISTS `{self.database}`.`{table_name}`')
287291

288292
def select(self, table_name, where=None, final=None):
289293
query = f'SELECT * FROM {table_name}'

mysql_ch_replicator/config.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def __init__(self):
146146
self.http_port = 0
147147
self.types_mapping = {}
148148
self.target_databases = {}
149+
self.target_tables = {}
149150
self.initial_replication_threads = 0
150151
self.ignore_deletes = False
151152
self.mysql_timezone = 'UTC'
@@ -174,6 +175,7 @@ def load(self, settings_file):
174175
self.http_host = data.pop('http_host', '')
175176
self.http_port = data.pop('http_port', 0)
176177
self.target_databases = data.pop('target_databases', {})
178+
self.target_tables = data.pop('target_tables', {})
177179
self.initial_replication_threads = data.pop('initial_replication_threads', 0)
178180
self.ignore_deletes = data.pop('ignore_deletes', False)
179181
self.mysql_timezone = data.pop('mysql_timezone', 'UTC')
@@ -272,13 +274,42 @@ def get_post_initial_replication_commands(self, db_name):
272274
results.extend(cmd_config.commands)
273275
return results
274276

277+
def is_multiple_mysql_dbs_to_single_ch_db(self, mysql_database: str, target_database: str) -> bool:
278+
"""
279+
Check if multiple MySQL databases are being replicated to the same ClickHouse database.
280+
281+
Args:
282+
mysql_database: The MySQL database being replicated
283+
target_database: The ClickHouse target database
284+
285+
Returns:
286+
True if multiple MySQL databases map to the same ClickHouse database
287+
"""
288+
if not self.target_databases:
289+
return False
290+
291+
same_target_count = 0
292+
for mysql_db, ch_db in self.target_databases.items():
293+
if ch_db == target_database:
294+
same_target_count += 1
295+
if same_target_count > 1:
296+
return True
297+
298+
return False
299+
300+
def get_target_table_name(self, source_database: str, source_table: str) -> str:
301+
key = f'{source_database}.{source_table}'
302+
return self.target_tables.get(key, source_table)
303+
275304
def validate(self):
276305
self.mysql.validate()
277306
self.clickhouse.validate()
278307
self.binlog_replicator.validate()
279308
self.validate_log_level()
280309
if not isinstance(self.target_databases, dict):
281310
raise ValueError(f'wrong target databases {self.target_databases}')
311+
if not isinstance(self.target_tables, dict):
312+
raise ValueError(f'wrong target tables {self.target_tables}')
282313
if not isinstance(self.initial_replication_threads, int):
283314
raise ValueError(f'initial_replication_threads should be an integer, not {type(self.initial_replication_threads)}')
284315
if self.initial_replication_threads < 0:

mysql_ch_replicator/converter.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,8 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
840840
column_after,
841841
)
842842

843-
query = f'ALTER TABLE `{db_name}`.`{table_name}` ADD COLUMN `{column_name}` {column_type_ch}'
843+
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
844+
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` ADD COLUMN `{column_name}` {column_type_ch}'
844845
if column_first:
845846
query += ' FIRST'
846847
else:
@@ -864,7 +865,8 @@ def __convert_alter_table_drop_column(self, db_name, table_name, tokens):
864865
mysql_table_structure.remove_field(field_name=column_name)
865866
ch_table_structure.remove_field(field_name=column_name)
866867

867-
query = f'ALTER TABLE `{db_name}`.`{table_name}` DROP COLUMN {column_name}'
868+
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
869+
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` DROP COLUMN {column_name}'
868870
if self.db_replicator:
869871
self.db_replicator.clickhouse_api.execute_command(query)
870872

@@ -892,7 +894,8 @@ def __convert_alter_table_modify_column(self, db_name, table_name, tokens):
892894
TableField(name=column_name, field_type=column_type_ch),
893895
)
894896

895-
query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN `{column_name}` {column_type_ch}'
897+
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
898+
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` MODIFY COLUMN `{column_name}` {column_type_ch}'
896899
if self.db_replicator:
897900
self.db_replicator.clickhouse_api.execute_command(query)
898901

@@ -914,6 +917,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
914917
ch_table_structure: TableStructure = table_structure[1]
915918

916919
current_column_type_ch = ch_table_structure.get_field(column_name).field_type
920+
target_table_name = self.db_replicator.get_target_table_name(table_name)
917921

918922
if current_column_type_ch != column_type_ch:
919923

@@ -925,7 +929,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
925929
TableField(name=column_name, field_type=column_type_ch),
926930
)
927931

928-
query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN {column_name} {column_type_ch}'
932+
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` MODIFY COLUMN {column_name} {column_type_ch}'
929933
self.db_replicator.clickhouse_api.execute_command(query)
930934

931935
if column_name != new_column_name:
@@ -935,7 +939,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
935939
curr_field_mysql.name = new_column_name
936940
curr_field_clickhouse.name = new_column_name
937941

938-
query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
942+
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
939943
self.db_replicator.clickhouse_api.execute_command(query)
940944

941945
def __convert_alter_table_rename_column(self, db_name, table_name, tokens):
@@ -981,7 +985,8 @@ def __convert_alter_table_rename_column(self, db_name, table_name, tokens):
981985
ch_table_structure.preprocess()
982986

983987
# Execute the RENAME COLUMN command in ClickHouse
984-
query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN `{old_column_name}` TO `{new_column_name}`'
988+
target_table_name = self.db_replicator.get_target_table_name(table_name) if self.db_replicator else table_name
989+
query = f'ALTER TABLE `{db_name}`.`{target_table_name}` RENAME COLUMN `{old_column_name}` TO `{new_column_name}`'
985990
if self.db_replicator:
986991
self.db_replicator.clickhouse_api.execute_command(query)
987992

mysql_ch_replicator/db_replicator.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
141141
self.state_path = os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl')
142142
self.is_parallel_worker = False
143143

144+
# Check if multiple MySQL databases are being replicated to the same ClickHouse database
145+
self.is_multi_mysql_to_single_ch = self.config.is_multiple_mysql_dbs_to_single_ch_db(
146+
self.database, self.target_database
147+
)
148+
144149
self.target_database_tmp = self.target_database + '_tmp'
145150
if self.is_parallel_worker:
146151
self.target_database_tmp = self.target_database
@@ -149,6 +154,11 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
149154
# This must be set here to ensure consistency between first run and resume
150155
if self.config.ignore_deletes:
151156
self.target_database_tmp = self.target_database
157+
158+
# If multiple MySQL databases map to same ClickHouse database, replicate directly
159+
if self.is_multi_mysql_to_single_ch:
160+
self.target_database_tmp = self.target_database
161+
logger.info(f'detected multiple MySQL databases mapping to {self.target_database} - using direct replication')
152162

153163
self.mysql_api = MySQLApi(
154164
database=self.database,
@@ -174,6 +184,9 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
174184
def create_state(self):
175185
return State(self.state_path)
176186

187+
def get_target_table_name(self, source_table: str) -> str:
188+
return self.config.get_target_table_name(self.database, source_table)
189+
177190
def validate_database_settings(self):
178191
if not self.initial_only:
179192
final_setting = self.clickhouse_api.get_system_setting('final')
@@ -206,16 +219,21 @@ def run(self):
206219
self.run_realtime_replication()
207220
return
208221

209-
# If ignore_deletes is enabled, we don't create a temporary DB and don't swap DBs
222+
# If ignore_deletes is enabled OR multiple MySQL databases map to same ClickHouse database,
223+
# we don't create a temporary DB and don't swap DBs
210224
# We replicate directly into the target DB
211-
if self.config.ignore_deletes:
212-
logger.info(f'using existing database (ignore_deletes=True)')
225+
if self.config.ignore_deletes or self.is_multi_mysql_to_single_ch:
226+
if self.config.ignore_deletes:
227+
logger.info(f'using existing database (ignore_deletes=True)')
228+
if self.is_multi_mysql_to_single_ch:
229+
logger.info(f'using existing database (multi-mysql-to-single-ch mode)')
230+
213231
self.clickhouse_api.database = self.target_database
214232

215-
# Create database if it doesn't exist
233+
# Create database if it doesn't exist (use IF NOT EXISTS to avoid race condition)
216234
if self.target_database not in self.clickhouse_api.get_databases():
217235
logger.info(f'creating database {self.target_database}')
218-
self.clickhouse_api.create_database(db_name=self.target_database)
236+
self.clickhouse_api.create_database(db_name=self.target_database, if_not_exists=True)
219237
else:
220238
logger.info('recreating database')
221239
self.clickhouse_api.database = self.target_database_tmp

mysql_ch_replicator/db_replicator_initial.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ def create_initial_structure_table(self, table_name):
4848
self.validate_mysql_structure(mysql_structure)
4949
clickhouse_structure = self.replicator.converter.convert_table_structure(mysql_structure)
5050

51+
target_table_name = self.replicator.get_target_table_name(table_name)
52+
clickhouse_structure.table_name = target_table_name
53+
5154
# Always set if_not_exists to True to prevent errors when tables already exist
5255
clickhouse_structure.if_not_exists = True
5356

@@ -56,6 +59,11 @@ def create_initial_structure_table(self, table_name):
5659
partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, table_name)
5760

5861
if not self.replicator.is_parallel_worker:
62+
# Drop table if multiple MySQL databases map to same ClickHouse database
63+
if self.replicator.is_multi_mysql_to_single_ch:
64+
logger.info(f'dropping table {target_table_name} before recreating (multi-mysql-to-single-ch mode)')
65+
self.replicator.clickhouse_api.drop_table(target_table_name)
66+
5967
self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes, additional_partition_bys=partition_bys)
6068

6169
def validate_mysql_structure(self, mysql_structure: TableStructure):
@@ -106,9 +114,15 @@ def perform_initial_replication(self):
106114
# Verify table structures after replication but before swapping databases
107115
self.verify_table_structures_after_replication()
108116

109-
# If ignore_deletes is enabled, we don't swap databases, as we're directly replicating
110-
# to the target database
111-
if not self.replicator.config.ignore_deletes:
117+
# Skip database swap if:
118+
# 1. ignore_deletes is enabled - we're replicating directly to target
119+
# 2. Multiple MySQL databases map to same ClickHouse database - we're replicating directly to target
120+
should_skip_db_swap = (
121+
self.replicator.config.ignore_deletes or
122+
self.replicator.is_multi_mysql_to_single_ch
123+
)
124+
125+
if not should_skip_db_swap:
112126
logger.info(f'initial replication - swapping database')
113127
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
114128
self.replicator.clickhouse_api.execute_command(
@@ -197,7 +211,8 @@ def perform_initial_replication_table(self, table_name):
197211

198212
if not records:
199213
break
200-
self.replicator.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
214+
target_table_name = self.replicator.get_target_table_name(table_name)
215+
self.replicator.clickhouse_api.insert(target_table_name, records, table_structure=clickhouse_table_structure)
201216
for record in records:
202217
record_primary_key = [record[key_idx] for key_idx in primary_key_ids]
203218
if max_primary_key is None:
@@ -416,8 +431,9 @@ def consolidate_worker_record_versions(self, table_name):
416431
"""
417432
logger.info(f"Getting maximum record version from ClickHouse for table {table_name}")
418433

434+
target_table_name = self.replicator.get_target_table_name(table_name)
419435
# Query ClickHouse for the maximum record version
420-
max_version = self.replicator.clickhouse_api.get_max_record_version(table_name)
436+
max_version = self.replicator.clickhouse_api.get_max_record_version(target_table_name)
421437

422438
if max_version is not None and max_version > 0:
423439
current_version = self.replicator.state.tables_last_record_version.get(table_name, 0)

0 commit comments

Comments
 (0)