Skip to content

Commit e688494

Browse files
committed
Support Percona-style migration to add a column
1 parent 829030f commit e688494

File tree

3 files changed

+130
-22
lines changed

3 files changed

+130
-22
lines changed

mysql_ch_replicator/converter.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,24 @@ def __basic_validate_query(self, mysql_query):
354354
if mysql_query.find(';') != -1:
355355
raise Exception('multi-query statement not supported')
356356
return mysql_query
357+
358+
def get_db_and_table_name(self, token, db_name):
359+
if '.' in token:
360+
db_name, table_name = token.split('.')
361+
else:
362+
table_name = token
363+
db_name = strip_sql_name(db_name)
364+
table_name = strip_sql_name(table_name)
365+
if self.db_replicator:
366+
if db_name == self.db_replicator.database:
367+
db_name = self.db_replicator.target_database
368+
matches_config = (
369+
self.db_replicator.config.is_database_matches(db_name)
370+
and self.db_replicator.config.is_table_matches(table_name))
371+
else:
372+
matches_config = True
373+
374+
return db_name, table_name, matches_config
357375

358376
def convert_alter_query(self, mysql_query, db_name):
359377
mysql_query = self.__basic_validate_query(mysql_query)
@@ -365,21 +383,10 @@ def convert_alter_query(self, mysql_query, db_name):
365383
if tokens[1].lower() != 'table':
366384
raise Exception('wrong query')
367385

368-
table_name = tokens[2]
369-
if table_name.find('.') != -1:
370-
db_name, table_name = table_name.split('.')
386+
db_name, table_name, matches_config = self.get_db_and_table_name(tokens[2], db_name)
371387

372-
if self.db_replicator:
373-
if not self.db_replicator.config.is_database_matches(db_name):
374-
return
375-
if not self.db_replicator.config.is_table_matches(table_name):
376-
return
377-
378-
db_name = strip_sql_name(db_name)
379-
if self.db_replicator and db_name == self.db_replicator.database:
380-
db_name = self.db_replicator.target_database
381-
382-
table_name = strip_sql_name(table_name)
388+
if not matches_config:
389+
return
383390

384391
subqueries = ' '.join(tokens[3:])
385392
subqueries = split_high_level(subqueries, ',')

mysql_ch_replicator/db_replicator.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -476,15 +476,17 @@ def handle_query_event(self, event: LogEvent):
476476
if self.config.debug_log_level:
477477
logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}')
478478
query = strip_sql_comments(event.records)
479+
self.upload_records()
479480
if query.lower().startswith('alter'):
480481
self.handle_alter_query(query, event.db_name)
481482
if query.lower().startswith('create table'):
482483
self.handle_create_table_query(query, event.db_name)
483484
if query.lower().startswith('drop table'):
484485
self.handle_drop_table_query(query, event.db_name)
486+
if query.lower().startswith('rename table'):
487+
self.handle_rename_table_query(query, event.db_name)
485488

486489
def handle_alter_query(self, query, db_name):
487-
self.upload_records()
488490
self.converter.convert_alter_query(query, db_name)
489491

490492
def handle_create_table_query(self, query, db_name):
@@ -509,17 +511,41 @@ def handle_drop_table_query(self, query, db_name):
509511
if len(tokens) != 3:
510512
raise Exception('wrong token count', query)
511513

512-
table_name = tokens[2]
513-
if '.' in table_name:
514-
db_name, table_name = table_name.split('.')
515-
if db_name == self.database:
516-
db_name = self.target_database
517-
table_name = strip_sql_name(table_name)
518-
db_name = strip_sql_name(db_name)
514+
db_name, table_name, matches_config = self.converter.get_db_and_table_name(tokens[2], db_name)
515+
if not matches_config:
516+
return
517+
519518
if table_name in self.state.tables_structure:
520519
self.state.tables_structure.pop(table_name)
521520
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}')
522521

522+
def handle_rename_table_query(self, query, db_name):
523+
tokens = query.split()
524+
if tokens[0].lower() != 'rename' or tokens[1].lower() != 'table':
525+
raise Exception('wrong rename table query', query)
526+
527+
ch_clauses = []
528+
for rename_clause in ' '.join(tokens[2:]).split(','):
529+
tokens = rename_clause.split()
530+
531+
if len(tokens) != 3:
532+
raise Exception('wrong token count', query)
533+
if tokens[1].lower() != 'to':
534+
raise Exception('"to" keyword expected', query)
535+
536+
src_db_name, src_table_name, matches_config = self.converter.get_db_and_table_name(tokens[0], db_name)
537+
dest_db_name, dest_table_name, _ = self.converter.get_db_and_table_name(tokens[2], db_name)
538+
if not matches_config:
539+
return
540+
541+
if src_db_name != self.target_database or dest_db_name != self.target_database:
542+
raise Exception('cross databases table renames not implemented', tokens)
543+
if src_table_name in self.state.tables_structure:
544+
self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name)
545+
546+
ch_clauses.append(f"{src_db_name}.{src_table_name} TO {dest_db_name}.{dest_table_name}")
547+
self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')
548+
523549
def log_stats_if_required(self):
524550
curr_time = time.time()
525551
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:

test_mysql_ch_replicator.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,6 +1098,81 @@ def test_if_exists_if_not_exists(monkeypatch):
10981098
binlog_replicator_runner.stop()
10991099

11001100

1101+
def test_percona_migration(monkeypatch):
1102+
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1103+
1104+
cfg = config.Settings()
1105+
cfg.load(CONFIG_FILE)
1106+
1107+
mysql = mysql_api.MySQLApi(
1108+
database=None,
1109+
mysql_settings=cfg.mysql,
1110+
)
1111+
1112+
ch = clickhouse_api.ClickhouseApi(
1113+
database=TEST_DB_NAME,
1114+
clickhouse_settings=cfg.clickhouse,
1115+
)
1116+
1117+
prepare_env(cfg, mysql, ch)
1118+
1119+
mysql.execute(f'''
1120+
CREATE TABLE {TEST_TABLE_NAME} (
1121+
`id` int NOT NULL,
1122+
PRIMARY KEY (`id`));
1123+
''')
1124+
1125+
mysql.execute(
1126+
f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)",
1127+
commit=True,
1128+
)
1129+
1130+
binlog_replicator_runner = BinlogReplicatorRunner()
1131+
binlog_replicator_runner.run()
1132+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
1133+
db_replicator_runner.run()
1134+
1135+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
1136+
1137+
ch.execute_command(f'USE {TEST_DB_NAME}')
1138+
1139+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
1140+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
1141+
1142+
# Perform 'pt-online-schema-change' style migration to add a column
1143+
# This is a subset of what happens when the following command is run:
1144+
# pt-online-schema-change --alter "ADD COLUMN c1 INT" D=$TEST_DB_NAME,t=$TEST_TABLE_NAME,h=0.0.0.0,P=3306,u=root,p=admin --execute
1145+
mysql.execute(f'''
1146+
CREATE TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` (
1147+
`id` int NOT NULL,
1148+
PRIMARY KEY (`id`)
1149+
)''')
1150+
1151+
mysql.execute(
1152+
f"ALTER TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` ADD COLUMN c1 INT;")
1153+
1154+
mysql.execute(
1155+
f"INSERT LOW_PRIORITY IGNORE INTO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` (`id`) SELECT `id` FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` LOCK IN SHARE MODE;",
1156+
commit=True,
1157+
)
1158+
1159+
mysql.execute(
1160+
f"RENAME TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` TO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`, `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` TO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}`;")
1161+
1162+
mysql.execute(
1163+
f"DROP TABLE IF EXISTS `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`;")
1164+
1165+
mysql.execute(
1166+
f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 1)",
1167+
commit=True,
1168+
)
1169+
1170+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
1171+
1172+
db_replicator_runner.stop()
1173+
binlog_replicator_runner.stop()
1174+
1175+
11011176
def test_parse_mysql_table_structure():
11021177
query = "CREATE TABLE IF NOT EXISTS user_preferences_portal (\n\t\t\tid char(36) NOT NULL,\n\t\t\tcategory varchar(50) DEFAULT NULL,\n\t\t\tdeleted tinyint(1) DEFAULT 0,\n\t\t\tdate_entered datetime DEFAULT NULL,\n\t\t\tdate_modified datetime DEFAULT NULL,\n\t\t\tassigned_user_id char(36) DEFAULT NULL,\n\t\t\tcontents longtext DEFAULT NULL\n\t\t ) ENGINE=InnoDB DEFAULT CHARSET=utf8"
11031178

0 commit comments

Comments
 (0)