diff --git a/mysql_ch_replicator/clickhouse_api.py b/mysql_ch_replicator/clickhouse_api.py index 5a5fe56..e521b43 100644 --- a/mysql_ch_replicator/clickhouse_api.py +++ b/mysql_ch_replicator/clickhouse_api.py @@ -14,7 +14,7 @@ CREATE_TABLE_QUERY = ''' -CREATE TABLE {if_not_exists} {db_name}.{table_name} +CREATE TABLE {if_not_exists} `{db_name}`.`{table_name}` ( {fields}, `_version` UInt64, @@ -26,7 +26,7 @@ ''' DELETE_QUERY = ''' -DELETE FROM {db_name}.{table_name} WHERE ({field_name}) IN ({field_values}) +DELETE FROM `{db_name}`.`{table_name}` WHERE ({field_name}) IN ({field_values}) ''' @@ -126,8 +126,8 @@ def execute_command(self, query): time.sleep(ClickhouseApi.RETRY_INTERVAL) def recreate_database(self): - self.execute_command(f'DROP DATABASE IF EXISTS {self.database}') - self.execute_command(f'CREATE DATABASE {self.database}') + self.execute_command(f'DROP DATABASE IF EXISTS `{self.database}`') + self.execute_command(f'CREATE DATABASE `{self.database}`') def get_last_used_version(self, table_name): return self.tables_last_record_version.get(table_name, 0) @@ -210,9 +210,9 @@ def insert(self, table_name, records, table_structure: TableStructure = None): records_to_insert.append(tuple(record) + (current_version,)) current_version += 1 - full_table_name = table_name + full_table_name = f'`table_name`' if '.' not in full_table_name: - full_table_name = f'{self.database}.{table_name}' + full_table_name = f'`{self.database}`.`{table_name}`' duration = 0.0 for attempt in range(ClickhouseApi.MAX_RETRIES): @@ -258,10 +258,10 @@ def erase(self, table_name, field_name, field_values): ) def drop_database(self, db_name): - self.execute_command(f'DROP DATABASE IF EXISTS {db_name}') + self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`') def create_database(self, db_name): - self.cursor.execute(f'CREATE DATABASE {db_name}') + self.cursor.execute(f'CREATE DATABASE `{db_name}`') def select(self, table_name, where=None, final=None): query = f'SELECT * FROM {table_name}' @@ -282,7 +282,7 @@ def query(self, query: str): return self.client.query(query) def show_create_table(self, table_name): - return self.client.query(f'SHOW CREATE TABLE {table_name}').result_rows[0][0] + return self.client.query(f'SHOW CREATE TABLE `{table_name}`').result_rows[0][0] def get_system_setting(self, name): results = self.select('system.settings', f"name = '{name}'") diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index b8babc5..34440c7 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -498,7 +498,7 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens): column_after, ) - query = f'ALTER TABLE {db_name}.{table_name} ADD COLUMN {column_name} {column_type_ch}' + query = f'ALTER TABLE `{db_name}`.`{table_name}` ADD COLUMN `{column_name}` {column_type_ch}' if column_first: query += ' FIRST' else: @@ -525,7 +525,7 @@ def __convert_alter_table_drop_column(self, db_name, table_name, tokens): mysql_table_structure.remove_field(field_name=column_name) ch_table_structure.remove_field(field_name=column_name) - query = f'ALTER TABLE {db_name}.{table_name} DROP COLUMN {column_name}' + query = f'ALTER TABLE `{db_name}`.`{table_name}` DROP COLUMN {column_name}' if self.db_replicator: self.db_replicator.clickhouse_api.execute_command(query) @@ -556,7 +556,7 @@ def __convert_alter_table_modify_column(self, db_name, table_name, tokens): TableField(name=column_name, field_type=column_type_ch), ) - query = f'ALTER TABLE {db_name}.{table_name} MODIFY COLUMN {column_name} {column_type_ch}' + query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN `{column_name}` {column_type_ch}' if self.db_replicator: self.db_replicator.clickhouse_api.execute_command(query) @@ -592,7 +592,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens): TableField(name=column_name, field_type=column_type_ch), ) - query = f'ALTER TABLE {db_name}.{table_name} MODIFY COLUMN {column_name} {column_type_ch}' + query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN {column_name} {column_type_ch}' self.db_replicator.clickhouse_api.execute_command(query) if column_name != new_column_name: @@ -602,7 +602,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens): curr_field_mysql.name = new_column_name curr_field_clickhouse.name = new_column_name - query = f'ALTER TABLE {db_name}.{table_name} RENAME COLUMN {column_name} TO {new_column_name}' + query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}' self.db_replicator.clickhouse_api.execute_command(query) def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]: diff --git a/mysql_ch_replicator/db_optimizer.py b/mysql_ch_replicator/db_optimizer.py index 33fda8b..72433d7 100644 --- a/mysql_ch_replicator/db_optimizer.py +++ b/mysql_ch_replicator/db_optimizer.py @@ -71,7 +71,7 @@ def optimize_table(self, db_name, table_name): logger.info(f'Optimizing table {db_name}.{table_name}') t1 = time.time() self.clickhouse_api.execute_command( - f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2' + f'OPTIMIZE TABLE `{db_name}`.`{table_name}` FINAL SETTINGS mutations_sync = 2' ) t2 = time.time() logger.info(f'Optimize finished in {int(t2-t1)} seconds') diff --git a/mysql_ch_replicator/db_replicator.py b/mysql_ch_replicator/db_replicator.py index 2fb7528..6110413 100644 --- a/mysql_ch_replicator/db_replicator.py +++ b/mysql_ch_replicator/db_replicator.py @@ -242,15 +242,15 @@ def perform_initial_replication(self): logger.info(f'initial replication - swapping database') if self.target_database in self.clickhouse_api.get_databases(): self.clickhouse_api.execute_command( - f'RENAME DATABASE {self.target_database} TO {self.target_database}_old', + f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`', ) self.clickhouse_api.execute_command( - f'RENAME DATABASE {self.target_database_tmp} TO {self.target_database}', + f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`', ) self.clickhouse_api.drop_database(f'{self.target_database}_old') else: self.clickhouse_api.execute_command( - f'RENAME DATABASE {self.target_database_tmp} TO {self.target_database}', + f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`', ) self.clickhouse_api.database = self.target_database logger.info(f'initial replication - done') @@ -519,7 +519,7 @@ def handle_drop_table_query(self, query, db_name): if table_name in self.state.tables_structure: self.state.tables_structure.pop(table_name) - self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}') + self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} `{db_name}`.`{table_name}`') def handle_rename_table_query(self, query, db_name): tokens = query.split() @@ -545,7 +545,7 @@ def handle_rename_table_query(self, query, db_name): if src_table_name in self.state.tables_structure: self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name) - ch_clauses.append(f"{src_db_name}.{src_table_name} TO {dest_db_name}.{dest_table_name}") + ch_clauses.append(f"`{src_db_name}`.`{src_table_name}` TO `{dest_db_name}`.`{dest_table_name}`") self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}') def log_stats_if_required(self): diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index 5e2f2da..b8b25c3 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -49,7 +49,7 @@ def drop_table(self, table_name): self.cursor.execute(f'DROP TABLE IF EXISTS `{table_name}`') def create_database(self, db_name): - self.cursor.execute(f'CREATE DATABASE {db_name}') + self.cursor.execute(f'CREATE DATABASE `{db_name}`') def execute(self, command, commit=False, args=None): if args: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 6c5478a..e1d78ce 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -19,8 +19,8 @@ CONFIG_FILE = 'tests_config.yaml' CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml' -TEST_DB_NAME = 'replication_test_db' -TEST_DB_NAME_2 = 'replication_test_db_2' +TEST_DB_NAME = 'replication-test_db' +TEST_DB_NAME_2 = 'replication-test_db_2' TEST_TABLE_NAME = 'test_table' TEST_TABLE_NAME_2 = 'test_table_2' TEST_TABLE_NAME_3 = 'test_table_3' @@ -97,7 +97,7 @@ def test_e2e_regular(config_file): prepare_env(cfg, mysql, ch) mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( id int NOT NULL AUTO_INCREMENT, name varchar(255) COMMENT 'Dân tộc, ví dụ: Kinh', age int COMMENT 'CMND Cũ', @@ -108,10 +108,10 @@ def test_e2e_regular(config_file): ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, age, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');", + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');", commit=True, ) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Peter', 33);", commit=True) binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() @@ -120,18 +120,18 @@ def test_e2e_regular(config_file): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Filipp', 50);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50) mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD `last_name` varchar(255); ") - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, last_name) VALUES ('Mary', 24, 'Smith');", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name) VALUES ('Mary', 24, 'Smith');", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0]['last_name'] == 'Smith') @@ -141,17 +141,17 @@ def test_e2e_regular(config_file): mysql.execute( - f"ALTER TABLE {TEST_DB_NAME}.{TEST_TABLE_NAME} " + f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` " f"ADD COLUMN country VARCHAR(25) DEFAULT '' NOT NULL AFTER name;" ) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, age, last_name, country) " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name, country) " f"VALUES ('John', 12, 'Doe', 'USA');", commit=True, ) mysql.execute( - f"ALTER TABLE {TEST_DB_NAME}.{TEST_TABLE_NAME} " + f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` " f"CHANGE COLUMN country origin VARCHAR(24) DEFAULT '' NOT NULL", ) @@ -159,18 +159,18 @@ def test_e2e_regular(config_file): assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0].get('origin') == 'USA') mysql.execute( - f"ALTER TABLE {TEST_DB_NAME}.{TEST_TABLE_NAME} " + f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` " f"CHANGE COLUMN origin country VARCHAR(24) DEFAULT '' NOT NULL", ) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0].get('origin') is None) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0].get('country') == 'USA') - mysql.execute(f"ALTER TABLE {TEST_DB_NAME}.{TEST_TABLE_NAME} DROP COLUMN country") + mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` DROP COLUMN country") assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0].get('country') is None) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0].get('last_name') is None) - mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET last_name = '' WHERE last_name IS NULL;") + mysql.execute(f"UPDATE `{TEST_TABLE_NAME}` SET last_name = '' WHERE last_name IS NULL;") mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` MODIFY `last_name` varchar(1024) NOT NULL") assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0].get('last_name') == '') @@ -187,7 +187,7 @@ def test_e2e_regular(config_file): assert_wait(lambda: TEST_TABLE_NAME_2 in ch.get_tables()) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME_2} (name, age) VALUES ('Ivan', 42);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME_2}` (name, age) VALUES ('Ivan', 42);", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME_2)) == 1) @@ -202,10 +202,10 @@ def test_e2e_regular(config_file): assert_wait(lambda: TEST_TABLE_NAME_3 in ch.get_tables()) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME_3} (name, `age`) VALUES ('Ivan', 42);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME_3}` (name, `age`) VALUES ('Ivan', 42);", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME_3)) == 1) - mysql.execute(f'DROP TABLE {TEST_TABLE_NAME_3}') + mysql.execute(f'DROP TABLE `{TEST_TABLE_NAME_3}`') assert_wait(lambda: TEST_TABLE_NAME_3 not in ch.get_tables()) db_replicator_runner.stop() @@ -228,7 +228,7 @@ def test_e2e_multistatement(): prepare_env(cfg, mysql, ch) mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( id int NOT NULL AUTO_INCREMENT, name varchar(255), age int, @@ -236,7 +236,7 @@ def test_e2e_multistatement(): ); ''') - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Ivan', 42);", commit=True) binlog_replicator_runner = BinlogReplicatorRunner() binlog_replicator_runner.run() @@ -245,14 +245,14 @@ def test_e2e_multistatement(): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD `last_name` varchar(255), ADD COLUMN city varchar(255); ") mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, age, last_name, city) " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name, city) " f"VALUES ('Mary', 24, 'Smith', 'London');", commit=True, ) @@ -260,11 +260,11 @@ def test_e2e_multistatement(): assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0].get('last_name') == 'Smith') assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0].get('city') == 'London') - mysql.execute(f"ALTER TABLE {TEST_TABLE_NAME} DROP COLUMN last_name, DROP COLUMN city") + mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` DROP COLUMN last_name, DROP COLUMN city") assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0].get('last_name') is None) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0].get('city') is None) - mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE name='Ivan';", commit=True) + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE name='Ivan';", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( @@ -318,7 +318,7 @@ def test_runner(): prepare_env(cfg, mysql, ch) mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( id int NOT NULL AUTO_INCREMENT, name varchar(255), age int, @@ -343,8 +343,8 @@ def test_runner(): ''', commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Ivan', 42, POINT(10.0, 20.0));", commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Peter', 33, POINT(10.0, 20.0));", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, coordinate) VALUES ('Ivan', 42, POINT(10.0, 20.0));", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, coordinate) VALUES ('Peter', 33, POINT(10.0, 20.0));", commit=True) mysql.execute(f"INSERT INTO `group` (name, age, rate) VALUES ('Peter', 33, 10.2);", commit=True) @@ -353,7 +353,7 @@ def test_runner(): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME};') + ch.execute_command(f'USE `{TEST_DB_NAME}`;') assert_wait(lambda: 'group' in ch.get_tables()) @@ -364,7 +364,7 @@ def test_runner(): assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Xeishfru32', 50, POINT(10.0, 20.0));", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, coordinate) VALUES ('Xeishfru32', 50, POINT(10.0, 20.0));", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Xeishfru32'")[0]['age'] == 50) @@ -375,30 +375,30 @@ def test_runner(): kill_process(binlog_repl_pid) kill_process(db_repl_pid, force=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, rate, coordinate) VALUES ('John', 12.5, POINT(10.0, 20.0));", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, rate, coordinate) VALUES ('John', 12.5, POINT(10.0, 20.0));", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0]['rate'] == 12.5) - mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE name='John';", commit=True) + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE name='John';", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) - mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=66 WHERE name='Ivan'", commit=True) + mysql.execute(f"UPDATE `{TEST_TABLE_NAME}` SET age=66 WHERE name='Ivan'", commit=True) assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 66) - mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=77 WHERE name='Ivan'", commit=True) + mysql.execute(f"UPDATE `{TEST_TABLE_NAME}` SET age=77 WHERE name='Ivan'", commit=True) assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 77) - mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=88 WHERE name='Ivan'", commit=True) + mysql.execute(f"UPDATE `{TEST_TABLE_NAME}` SET age=88 WHERE name='Ivan'", commit=True) assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 88) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES ('Vlad', 99, POINT(10.0, 20.0));", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, coordinate) VALUES ('Vlad', 99, POINT(10.0, 20.0));", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4) mysql.execute( - command=f"INSERT INTO {TEST_TABLE_NAME} (name, age, coordinate) VALUES (%s, %s, POINT(10.0, 20.0));", + command=f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, coordinate) VALUES (%s, %s, POINT(10.0, 20.0));", args=(b'H\xe4llo'.decode('latin-1'), 1912), commit=True, ) @@ -462,7 +462,7 @@ def test_multi_column_erase(): prepare_env(cfg, mysql, ch) mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( departments int(11) NOT NULL, termine int(11) NOT NULL, PRIMARY KEY (departments,termine) @@ -470,26 +470,26 @@ def test_multi_column_erase(): ''') - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (10, 20);", commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (30, 40);", commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (50, 60);", commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (20, 10);", commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (40, 30);", commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (60, 50);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine) VALUES (10, 20);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine) VALUES (30, 40);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine) VALUES (50, 60);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine) VALUES (20, 10);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine) VALUES (40, 30);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (departments, termine) VALUES (60, 50);", commit=True) run_all_runner = RunAllRunner(cfg_file=config_file) run_all_runner.run() assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6) - mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=10;", commit=True) - mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=30;", commit=True) - mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=50;", commit=True) + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=10;", commit=True) + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=30;", commit=True) + mysql.execute(f"DELETE FROM `{TEST_TABLE_NAME}` WHERE departments=50;", commit=True) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) @@ -516,7 +516,7 @@ def test_initial_only(): prepare_env(cfg, mysql, ch) mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( id int NOT NULL AUTO_INCREMENT, name varchar(255), age int, @@ -524,8 +524,8 @@ def test_initial_only(): ); ''') - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Ivan', 42);", commit=True) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Peter', 33);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Ivan', 42);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Peter', 33);", commit=True) db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, additional_arguments='--initial_only=True') db_replicator_runner.run() @@ -533,12 +533,12 @@ def test_initial_only(): assert TEST_DB_NAME in ch.get_databases() - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert TEST_TABLE_NAME in ch.get_tables() assert len(ch.select(TEST_TABLE_NAME)) == 2 - ch.execute_command(f'DROP DATABASE {TEST_DB_NAME}') + ch.execute_command(f'DROP DATABASE `{TEST_DB_NAME}`') db_replicator_runner.stop() @@ -664,7 +664,7 @@ def test_datetime_exception(): mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( id int NOT NULL AUTO_INCREMENT, name varchar(255), modified_date DateTime(3) NOT NULL, @@ -674,7 +674,7 @@ def test_datetime_exception(): ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date, test_date) " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date, test_date) " f"VALUES ('Ivan', '0000-00-00 00:00:00', '2015-05-28');", commit=True, ) @@ -686,18 +686,18 @@ def test_datetime_exception(): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date, test_date) " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date, test_date) " f"VALUES ('Alex', '0000-00-00 00:00:00', '2015-06-02');", commit=True, ) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date, test_date) " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date, test_date) " f"VALUES ('Givi', '2023-01-08 03:11:09', '2015-06-02');", commit=True, ) @@ -728,7 +728,7 @@ def test_different_types_1(): mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( `id` int unsigned NOT NULL AUTO_INCREMENT, name varchar(255), `employee` int unsigned NOT NULL, @@ -766,7 +766,7 @@ def test_different_types_1(): ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Ivan', '0000-00-00 00:00:00');", + f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Ivan', '0000-00-00 00:00:00');", commit=True, ) @@ -777,17 +777,17 @@ def test_different_types_1(): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Alex', '0000-00-00 00:00:00');", + f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Alex', '0000-00-00 00:00:00');", commit=True, ) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, modified_date) VALUES ('Givi', '2023-01-08 03:11:09');", + f"INSERT INTO `{TEST_TABLE_NAME}` (name, modified_date) VALUES ('Givi', '2023-01-08 03:11:09');", commit=True, ) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) @@ -814,7 +814,7 @@ def test_numeric_types_and_limits(): mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( `id` int unsigned NOT NULL AUTO_INCREMENT, name varchar(255), test1 smallint, @@ -830,7 +830,7 @@ def test_numeric_types_and_limits(): ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, test1, test2, test3, test4, test5, test6, test7, test8) VALUES " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, test1, test2, test3, test4, test5, test6, test7, test8) VALUES " f"('Ivan', -20000, 50000, -30, 100, 16777200, 4294967290, 18446744073709551586, NULL);", commit=True, ) @@ -842,13 +842,13 @@ def test_numeric_types_and_limits(): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, test1, test2, test3, test4, test5, test6, test7, test8) VALUES " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, test1, test2, test3, test4, test5, test6, test7, test8) VALUES " f"('Peter', -10000, 60000, -120, 250, 16777200, 4294967280, 18446744073709551586, NULL);", commit=True, ) @@ -883,7 +883,7 @@ def test_different_types_2(): mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( `id` int unsigned NOT NULL AUTO_INCREMENT, test1 bit(1), test2 point, @@ -895,7 +895,7 @@ def test_different_types_2(): ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test3, test4, test5) VALUES " + f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test3, test4, test5) VALUES " f"(0, POINT(10.0, 20.0), 'azaza', '1,3,5', '2023-08-15 14:30:00');", commit=True, ) @@ -907,13 +907,13 @@ def test_different_types_2(): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1, test2, test4, test5) VALUES " + f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2, test4, test5) VALUES " f"(1, POINT(15.0, 14.0), '2,4,5', '2023-08-15 14:40:00');", commit=True, ) @@ -933,7 +933,7 @@ def test_different_types_2(): assert str(value) == '2023-08-15 14:40:00+00:00' mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (test1, test2) VALUES " + f"INSERT INTO `{TEST_TABLE_NAME}` (test1, test2) VALUES " f"(0, NULL);", commit=True, ) @@ -962,7 +962,7 @@ def test_json(): mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( `id` int unsigned NOT NULL AUTO_INCREMENT, name varchar(255), data json, @@ -971,7 +971,7 @@ def test_json(): ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, data) VALUES " + + f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES " + """('Ivan', '{"a": "b", "c": [1,2,3]}');""", commit=True, ) @@ -983,13 +983,13 @@ def test_json(): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, data) VALUES " + + f"INSERT INTO `{TEST_TABLE_NAME}` (name, data) VALUES " + """('Peter', '{"b": "b", "c": [3,2,1]}');""", commit=True, ) @@ -1023,7 +1023,7 @@ def test_string_primary_key(monkeypatch): mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( `id` char(30) NOT NULL, name varchar(255), PRIMARY KEY (id) @@ -1031,12 +1031,12 @@ def test_string_primary_key(monkeypatch): ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + + f"INSERT INTO `{TEST_TABLE_NAME}` (id, name) VALUES " + """('01', 'Ivan');""", commit=True, ) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + + f"INSERT INTO `{TEST_TABLE_NAME}` (id, name) VALUES " + """('02', 'Peter');""", commit=True, ) @@ -1048,13 +1048,13 @@ def test_string_primary_key(monkeypatch): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + + f"INSERT INTO `{TEST_TABLE_NAME}` (id, name) VALUES " + """('03', 'Filipp');""", commit=True, ) @@ -1089,14 +1089,14 @@ def test_if_exists_if_not_exists(monkeypatch): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));") - mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));") - mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));") + mysql.execute(f"CREATE TABLE IF NOT EXISTS `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id int NOT NULL, PRIMARY KEY(id));") + mysql.execute(f"CREATE TABLE IF NOT EXISTS `{TEST_TABLE_NAME}` (id int NOT NULL, PRIMARY KEY(id));") + mysql.execute(f"CREATE TABLE IF NOT EXISTS `{TEST_DB_NAME}`.{TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));") mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));") - mysql.execute(f"DROP TABLE IF EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME};") + mysql.execute(f"DROP TABLE IF EXISTS `{TEST_DB_NAME}`.{TEST_TABLE_NAME};") mysql.execute(f"DROP TABLE IF EXISTS {TEST_TABLE_NAME};") - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME_2 in ch.get_tables()) assert_wait(lambda: TEST_TABLE_NAME not in ch.get_tables()) @@ -1124,13 +1124,13 @@ def test_percona_migration(monkeypatch): prepare_env(cfg, mysql, ch) mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( `id` int NOT NULL, PRIMARY KEY (`id`)); ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)", + f"INSERT INTO `{TEST_TABLE_NAME}` (id) VALUES (42)", commit=True, ) @@ -1141,7 +1141,7 @@ def test_percona_migration(monkeypatch): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) @@ -1170,7 +1170,7 @@ def test_percona_migration(monkeypatch): f"DROP TABLE IF EXISTS `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`;") mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 1)", + f"INSERT INTO `{TEST_TABLE_NAME}` (id, c1) VALUES (43, 1)", commit=True, ) @@ -1199,13 +1199,13 @@ def test_add_column_first_after_and_drop_column(monkeypatch): prepare_env(cfg, mysql, ch) mysql.execute(f''' -CREATE TABLE {TEST_TABLE_NAME} ( +CREATE TABLE `{TEST_TABLE_NAME}` ( `id` int NOT NULL, PRIMARY KEY (`id`)); ''') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)", + f"INSERT INTO `{TEST_TABLE_NAME}` (id) VALUES (42)", commit=True, ) @@ -1216,7 +1216,7 @@ def test_add_column_first_after_and_drop_column(monkeypatch): assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) - ch.execute_command(f'USE {TEST_DB_NAME}') + ch.execute_command(f'USE `{TEST_DB_NAME}`') assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) @@ -1227,9 +1227,9 @@ def test_add_column_first_after_and_drop_column(monkeypatch): # Test add column first mysql.execute( - f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c1 INT FIRST") + f"ALTER TABLE `{TEST_TABLE_NAME}` ADD COLUMN c1 INT FIRST") mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 11)", + f"INSERT INTO `{TEST_TABLE_NAME}` (id, c1) VALUES (43, 11)", commit=True, ) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=43")) == 1) @@ -1237,9 +1237,9 @@ def test_add_column_first_after_and_drop_column(monkeypatch): # Test add column after mysql.execute( - f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c2 INT AFTER c1") + f"ALTER TABLE `{TEST_TABLE_NAME}` ADD COLUMN c2 INT AFTER c1") mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id, c1, c2) VALUES (44, 111, 222)", + f"INSERT INTO `{TEST_TABLE_NAME}` (id, c1, c2) VALUES (44, 111, 222)", commit=True, ) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=44")) == 1) @@ -1248,9 +1248,9 @@ def test_add_column_first_after_and_drop_column(monkeypatch): # Test drop column mysql.execute( - f"ALTER TABLE {TEST_TABLE_NAME} DROP COLUMN c2") + f"ALTER TABLE `{TEST_TABLE_NAME}` DROP COLUMN c2") mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (45, 1111)", + f"INSERT INTO `{TEST_TABLE_NAME}` (id, c1) VALUES (45, 1111)", commit=True, ) assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=45")) == 1) @@ -1333,7 +1333,7 @@ def test_performance_dbreplicator(): prepare_env(cfg, mysql, ch) mysql.execute(f''' - CREATE TABLE {TEST_TABLE_NAME} ( + CREATE TABLE `{TEST_TABLE_NAME}` ( id int NOT NULL AUTO_INCREMENT, name varchar(2048), age int, @@ -1346,7 +1346,7 @@ def test_performance_dbreplicator(): time.sleep(1) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True) def _get_last_insert_name(): record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME) @@ -1368,11 +1368,11 @@ def _get_last_insert_name(): if i % 2000 == 0: print(f'populated {i} elements') mysql.execute( - f"INSERT INTO {TEST_TABLE_NAME} (name, age) " + f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) " f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0, ) - mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) + mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) print("running db_replicator") t1 = time.time()