Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions mysql_ch_replicator/clickhouse_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


CREATE_TABLE_QUERY = '''
CREATE TABLE {if_not_exists} {db_name}.{table_name}
CREATE TABLE {if_not_exists} `{db_name}`.`{table_name}`
(
{fields},
`_version` UInt64,
Expand All @@ -26,7 +26,7 @@
'''

DELETE_QUERY = '''
DELETE FROM {db_name}.{table_name} WHERE ({field_name}) IN ({field_values})
DELETE FROM `{db_name}`.`{table_name}` WHERE ({field_name}) IN ({field_values})
'''


Expand Down Expand Up @@ -126,8 +126,8 @@ def execute_command(self, query):
time.sleep(ClickhouseApi.RETRY_INTERVAL)

def recreate_database(self):
self.execute_command(f'DROP DATABASE IF EXISTS {self.database}')
self.execute_command(f'CREATE DATABASE {self.database}')
self.execute_command(f'DROP DATABASE IF EXISTS `{self.database}`')
self.execute_command(f'CREATE DATABASE `{self.database}`')

def get_last_used_version(self, table_name):
return self.tables_last_record_version.get(table_name, 0)
Expand Down Expand Up @@ -210,9 +210,9 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
records_to_insert.append(tuple(record) + (current_version,))
current_version += 1

full_table_name = table_name
full_table_name = f'`table_name`'
if '.' not in full_table_name:
full_table_name = f'{self.database}.{table_name}'
full_table_name = f'`{self.database}`.`{table_name}`'

duration = 0.0
for attempt in range(ClickhouseApi.MAX_RETRIES):
Expand Down Expand Up @@ -258,10 +258,10 @@ def erase(self, table_name, field_name, field_values):
)

def drop_database(self, db_name):
self.execute_command(f'DROP DATABASE IF EXISTS {db_name}')
self.execute_command(f'DROP DATABASE IF EXISTS `{db_name}`')

def create_database(self, db_name):
self.cursor.execute(f'CREATE DATABASE {db_name}')
self.cursor.execute(f'CREATE DATABASE `{db_name}`')

def select(self, table_name, where=None, final=None):
query = f'SELECT * FROM {table_name}'
Expand All @@ -282,7 +282,7 @@ def query(self, query: str):
return self.client.query(query)

def show_create_table(self, table_name):
return self.client.query(f'SHOW CREATE TABLE {table_name}').result_rows[0][0]
return self.client.query(f'SHOW CREATE TABLE `{table_name}`').result_rows[0][0]

def get_system_setting(self, name):
results = self.select('system.settings', f"name = '{name}'")
Expand Down
10 changes: 5 additions & 5 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
column_after,
)

query = f'ALTER TABLE {db_name}.{table_name} ADD COLUMN {column_name} {column_type_ch}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` ADD COLUMN `{column_name}` {column_type_ch}'
if column_first:
query += ' FIRST'
else:
Expand All @@ -525,7 +525,7 @@ def __convert_alter_table_drop_column(self, db_name, table_name, tokens):
mysql_table_structure.remove_field(field_name=column_name)
ch_table_structure.remove_field(field_name=column_name)

query = f'ALTER TABLE {db_name}.{table_name} DROP COLUMN {column_name}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` DROP COLUMN {column_name}'
if self.db_replicator:
self.db_replicator.clickhouse_api.execute_command(query)

Expand Down Expand Up @@ -556,7 +556,7 @@ def __convert_alter_table_modify_column(self, db_name, table_name, tokens):
TableField(name=column_name, field_type=column_type_ch),
)

query = f'ALTER TABLE {db_name}.{table_name} MODIFY COLUMN {column_name} {column_type_ch}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN `{column_name}` {column_type_ch}'
if self.db_replicator:
self.db_replicator.clickhouse_api.execute_command(query)

Expand Down Expand Up @@ -592,7 +592,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
TableField(name=column_name, field_type=column_type_ch),
)

query = f'ALTER TABLE {db_name}.{table_name} MODIFY COLUMN {column_name} {column_type_ch}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` MODIFY COLUMN {column_name} {column_type_ch}'
self.db_replicator.clickhouse_api.execute_command(query)

if column_name != new_column_name:
Expand All @@ -602,7 +602,7 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
curr_field_mysql.name = new_column_name
curr_field_clickhouse.name = new_column_name

query = f'ALTER TABLE {db_name}.{table_name} RENAME COLUMN {column_name} TO {new_column_name}'
query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
self.db_replicator.clickhouse_api.execute_command(query)

def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]:
Expand Down
2 changes: 1 addition & 1 deletion mysql_ch_replicator/db_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def optimize_table(self, db_name, table_name):
logger.info(f'Optimizing table {db_name}.{table_name}')
t1 = time.time()
self.clickhouse_api.execute_command(
f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2'
f'OPTIMIZE TABLE `{db_name}`.`{table_name}` FINAL SETTINGS mutations_sync = 2'
)
t2 = time.time()
logger.info(f'Optimize finished in {int(t2-t1)} seconds')
Expand Down
10 changes: 5 additions & 5 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,15 @@ def perform_initial_replication(self):
logger.info(f'initial replication - swapping database')
if self.target_database in self.clickhouse_api.get_databases():
self.clickhouse_api.execute_command(
f'RENAME DATABASE {self.target_database} TO {self.target_database}_old',
f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`',
)
self.clickhouse_api.execute_command(
f'RENAME DATABASE {self.target_database_tmp} TO {self.target_database}',
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
)
self.clickhouse_api.drop_database(f'{self.target_database}_old')
else:
self.clickhouse_api.execute_command(
f'RENAME DATABASE {self.target_database_tmp} TO {self.target_database}',
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
)
self.clickhouse_api.database = self.target_database
logger.info(f'initial replication - done')
Expand Down Expand Up @@ -519,7 +519,7 @@ def handle_drop_table_query(self, query, db_name):

if table_name in self.state.tables_structure:
self.state.tables_structure.pop(table_name)
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}')
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} `{db_name}`.`{table_name}`')

def handle_rename_table_query(self, query, db_name):
tokens = query.split()
Expand All @@ -545,7 +545,7 @@ def handle_rename_table_query(self, query, db_name):
if src_table_name in self.state.tables_structure:
self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name)

ch_clauses.append(f"{src_db_name}.{src_table_name} TO {dest_db_name}.{dest_table_name}")
ch_clauses.append(f"`{src_db_name}`.`{src_table_name}` TO `{dest_db_name}`.`{dest_table_name}`")
self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')

def log_stats_if_required(self):
Expand Down
2 changes: 1 addition & 1 deletion mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def drop_table(self, table_name):
self.cursor.execute(f'DROP TABLE IF EXISTS `{table_name}`')

def create_database(self, db_name):
self.cursor.execute(f'CREATE DATABASE {db_name}')
self.cursor.execute(f'CREATE DATABASE `{db_name}`')

def execute(self, command, commit=False, args=None):
if args:
Expand Down
Loading
Loading