Skip to content

Commit 17cb16e

Browse files
authored
Support ALTER CHANGE (#2)
* Fixed state save order (prevent skipping schema migration in case of exception) * Support for ALTER table CHANGE column
1 parent 4418832 commit 17cb16e

File tree

5 files changed

+76
-5
lines changed

5 files changed

+76
-5
lines changed

docker-compose-tests.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ services:
1111
- CLICKHOUSE_ADMIN_PASSWORD=admin
1212
- CLICKHOUSE_ADMIN_USER=default
1313
- CLICKHOUSE_HTTP_PORT=9123
14-
network_mode: host
14+
networks:
15+
default:
16+
ports:
17+
- 9123:9123
1518
volumes:
1619
- ./tests_override.xml:/bitnami/clickhouse/etc/conf.d/override.xml:ro
1720

mysql_ch_replicator/converter.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,11 @@ def convert_alter_query(self, mysql_query, db_name):
205205
if op_name == 'alter':
206206
continue
207207

208-
raise Exception('not implement')
208+
if op_name == 'change':
209+
self.__convert_alter_table_change_column(db_name, table_name, tokens)
210+
continue
211+
212+
raise Exception(f'operation {op_name} not implement, query: {subquery}')
209213

210214
def __convert_alter_table_add_column(self, db_name, table_name, tokens):
211215
if len(tokens) < 2:
@@ -306,6 +310,51 @@ def __convert_alter_table_modify_column(self, db_name, table_name, tokens):
306310
if self.db_replicator:
307311
self.db_replicator.clickhouse_api.execute_command(query)
308312

313+
def __convert_alter_table_change_column(self, db_name, table_name, tokens):
314+
if len(tokens) < 3:
315+
raise Exception('wrong tokens count', tokens)
316+
317+
if ',' in ' '.join(tokens):
318+
raise Exception('add multiple columns not implemented', tokens)
319+
320+
column_name = strip_sql_name(tokens[0])
321+
new_column_name = strip_sql_name(tokens[1])
322+
column_type_mysql = tokens[2]
323+
column_type_mysql_parameters = ' '.join(tokens[3:])
324+
325+
column_type_ch = self.convert_field_type(column_type_mysql, column_type_mysql_parameters)
326+
327+
# update table structure
328+
if self.db_replicator:
329+
table_structure = self.db_replicator.state.tables_structure[table_name]
330+
mysql_table_structure: TableStructure = table_structure[0]
331+
ch_table_structure: TableStructure = table_structure[1]
332+
333+
current_column_type_ch = ch_table_structure.get_field(column_name).field_type
334+
335+
if current_column_type_ch != column_type_ch:
336+
337+
mysql_table_structure.update_field(
338+
TableField(name=column_name, field_type=column_type_mysql),
339+
)
340+
341+
ch_table_structure.update_field(
342+
TableField(name=column_name, field_type=column_type_ch),
343+
)
344+
345+
query = f'ALTER TABLE {db_name}.{table_name} MODIFY COLUMN {column_name} {column_type_ch}'
346+
self.db_replicator.clickhouse_api.execute_command(query)
347+
348+
if column_name != new_column_name:
349+
curr_field_mysql = mysql_table_structure.get_field(column_name)
350+
curr_field_clickhouse = ch_table_structure.get_field(column_name)
351+
352+
curr_field_mysql.name = new_column_name
353+
curr_field_clickhouse.name = new_column_name
354+
355+
query = f'ALTER TABLE {db_name}.{table_name} RENAME COLUMN {column_name} TO {new_column_name}'
356+
self.db_replicator.clickhouse_api.execute_command(query)
357+
309358
def parse_create_table_query(self, mysql_query) -> tuple:
310359
mysql_table_structure = self.parse_mysql_table_structure(mysql_query)
311360
ch_table_structure = self.convert_table_structure(mysql_table_structure)

mysql_ch_replicator/db_replicator.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,6 @@ def handle_event(self, event: LogEvent):
274274
return
275275

276276
logger.debug(f'processing event {event.transaction_id}')
277-
self.stats.events_count += 1
278-
self.stats.last_transaction = event.transaction_id
279-
self.state.last_processed_transaction_non_uploaded = event.transaction_id
280277

281278
event_handlers = {
282279
EventType.ADD_EVENT.value: self.handle_insert_event,
@@ -286,6 +283,10 @@ def handle_event(self, event: LogEvent):
286283

287284
event_handlers[event.event_type](event)
288285

286+
self.stats.events_count += 1
287+
self.stats.last_transaction = event.transaction_id
288+
self.state.last_processed_transaction_non_uploaded = event.transaction_id
289+
289290
self.upload_records_if_required(table_name=event.table_name)
290291

291292
self.save_state_if_required()

mysql_ch_replicator/table_structure.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,9 @@ def has_field(self, field_name):
4848
if field.name == field_name:
4949
return True
5050
return False
51+
52+
def get_field(self, field_name):
53+
for field in self.fields:
54+
if field.name == field_name:
55+
return field
56+
return None

test_mysql_ch_replicator.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,19 @@ def test_e2e_regular():
130130
f"VALUES ('John', 12, 'Doe', 'USA');", commit=True,
131131
)
132132

133+
mysql.execute(
134+
f"ALTER TABLE {TEST_DB_NAME}.{TEST_TABLE_NAME} "
135+
f"CHANGE COLUMN country origin VARCHAR(24) DEFAULT '' NOT NULL",
136+
)
137+
133138
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 5)
139+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0].get('origin') == 'USA')
140+
141+
mysql.execute(
142+
f"ALTER TABLE {TEST_DB_NAME}.{TEST_TABLE_NAME} "
143+
f"CHANGE COLUMN origin country VARCHAR(24) DEFAULT '' NOT NULL",
144+
)
145+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0].get('origin') is None)
134146
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='John'")[0].get('country') == 'USA')
135147

136148
mysql.execute(f"ALTER TABLE {TEST_DB_NAME}.{TEST_TABLE_NAME} DROP COLUMN country")

0 commit comments

Comments
 (0)