Skip to content

Commit 507f2e8

Browse files
authored
Add support for "ALTER TABLE t COLUMN c INT FIRST", and fix bug when primary key columns move. (#79)
Id columns always move when adding a first column, but they could also move when adding a column using AFTER, or when dropping a column if the primary key isn't the first column. Probably a rare case, but the result if it happens would be that the column order in the internal state doesn't match the database anymore, which is bad.
1 parent 87f3d1b commit 507f2e8

File tree

3 files changed

+113
-11
lines changed

3 files changed

+113
-11
lines changed

mysql_ch_replicator/converter.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -434,11 +434,14 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
434434
raise Exception('add multiple columns not implemented', tokens)
435435

436436
column_after = None
437+
column_first = False
437438
if tokens[-2].lower() == 'after':
438439
column_after = strip_sql_name(tokens[-1])
439440
tokens = tokens[:-2]
440441
if len(tokens) < 2:
441442
raise Exception('wrong tokens count', tokens)
443+
elif tokens[-1].lower() == 'first':
444+
column_first = True
442445

443446
column_name = strip_sql_name(tokens[0])
444447
column_type_mysql = tokens[1]
@@ -452,21 +455,32 @@ def __convert_alter_table_add_column(self, db_name, table_name, tokens):
452455
mysql_table_structure: TableStructure = table_structure[0]
453456
ch_table_structure: TableStructure = table_structure[1]
454457

455-
if column_after is None:
456-
column_after = strip_sql_name(mysql_table_structure.fields[-1].name)
458+
if column_first:
459+
mysql_table_structure.add_field_first(
460+
TableField(name=column_name, field_type=column_type_mysql)
461+
)
462+
463+
ch_table_structure.add_field_first(
464+
TableField(name=column_name, field_type=column_type_ch)
465+
)
466+
else:
467+
if column_after is None:
468+
column_after = strip_sql_name(mysql_table_structure.fields[-1].name)
457469

458-
mysql_table_structure.add_field_after(
459-
TableField(name=column_name, field_type=column_type_mysql),
460-
column_after,
461-
)
470+
mysql_table_structure.add_field_after(
471+
TableField(name=column_name, field_type=column_type_mysql),
472+
column_after,
473+
)
462474

463-
ch_table_structure.add_field_after(
464-
TableField(name=column_name, field_type=column_type_ch),
465-
column_after,
466-
)
475+
ch_table_structure.add_field_after(
476+
TableField(name=column_name, field_type=column_type_ch),
477+
column_after,
478+
)
467479

468480
query = f'ALTER TABLE {db_name}.{table_name} ADD COLUMN {column_name} {column_type_ch}'
469-
if column_after is not None:
481+
if column_first:
482+
query += ' FIRST'
483+
else:
470484
query += f' AFTER {column_after}'
471485

472486
if self.db_replicator:

mysql_ch_replicator/table_structure.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ def preprocess(self):
2525
field_names.index(key) for key in self.primary_keys
2626
]
2727

28+
def add_field_first(self, new_field: TableField):
29+
30+
self.fields.insert(0, new_field)
31+
self.preprocess()
32+
2833
def add_field_after(self, new_field: TableField, after: str):
2934

3035
idx_to_insert = None
@@ -36,11 +41,13 @@ def add_field_after(self, new_field: TableField, after: str):
3641
raise Exception('field after not found', after)
3742

3843
self.fields.insert(idx_to_insert, new_field)
44+
self.preprocess()
3945

4046
def remove_field(self, field_name):
4147
for idx, field in enumerate(self.fields):
4248
if field.name == field_name:
4349
del self.fields[idx]
50+
self.preprocess()
4451
return
4552
raise Exception(f'field {field_name} not found')
4653

test_mysql_ch_replicator.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,87 @@ def test_percona_migration(monkeypatch):
11731173
binlog_replicator_runner.stop()
11741174

11751175

1176+
def test_add_column_first_after_and_drop_column(monkeypatch):
1177+
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1178+
1179+
cfg = config.Settings()
1180+
cfg.load(CONFIG_FILE)
1181+
1182+
mysql = mysql_api.MySQLApi(
1183+
database=None,
1184+
mysql_settings=cfg.mysql,
1185+
)
1186+
1187+
ch = clickhouse_api.ClickhouseApi(
1188+
database=TEST_DB_NAME,
1189+
clickhouse_settings=cfg.clickhouse,
1190+
)
1191+
1192+
prepare_env(cfg, mysql, ch)
1193+
1194+
mysql.execute(f'''
1195+
CREATE TABLE {TEST_TABLE_NAME} (
1196+
`id` int NOT NULL,
1197+
PRIMARY KEY (`id`));
1198+
''')
1199+
1200+
mysql.execute(
1201+
f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)",
1202+
commit=True,
1203+
)
1204+
1205+
binlog_replicator_runner = BinlogReplicatorRunner()
1206+
binlog_replicator_runner.run()
1207+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
1208+
db_replicator_runner.run()
1209+
1210+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
1211+
1212+
ch.execute_command(f'USE {TEST_DB_NAME}')
1213+
1214+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
1215+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
1216+
1217+
# Test adding a column as the new first column, after another column, and dropping a column
1218+
# These all move the primary key column to a different index and test the table structure is
1219+
# updated correctly.
1220+
1221+
# Test add column first
1222+
mysql.execute(
1223+
f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c1 INT FIRST")
1224+
mysql.execute(
1225+
f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 11)",
1226+
commit=True,
1227+
)
1228+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=43")) == 1)
1229+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=43")[0]['c1'] == 11)
1230+
1231+
# Test add column after
1232+
mysql.execute(
1233+
f"ALTER TABLE {TEST_TABLE_NAME} ADD COLUMN c2 INT AFTER c1")
1234+
mysql.execute(
1235+
f"INSERT INTO {TEST_TABLE_NAME} (id, c1, c2) VALUES (44, 111, 222)",
1236+
commit=True,
1237+
)
1238+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=44")) == 1)
1239+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=44")[0]['c1'] == 111)
1240+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=44")[0]['c2'] == 222)
1241+
1242+
# Test drop column
1243+
mysql.execute(
1244+
f"ALTER TABLE {TEST_TABLE_NAME} DROP COLUMN c2")
1245+
mysql.execute(
1246+
f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (45, 1111)",
1247+
commit=True,
1248+
)
1249+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, where="id=45")) == 1)
1250+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=45")[0]['c1'] == 1111)
1251+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="id=45")[0].get('c2') is None)
1252+
1253+
db_replicator_runner.stop()
1254+
binlog_replicator_runner.stop()
1255+
1256+
11761257
def test_parse_mysql_table_structure():
11771258
query = "CREATE TABLE IF NOT EXISTS user_preferences_portal (\n\t\t\tid char(36) NOT NULL,\n\t\t\tcategory varchar(50) DEFAULT NULL,\n\t\t\tdeleted tinyint(1) DEFAULT 0,\n\t\t\tdate_entered datetime DEFAULT NULL,\n\t\t\tdate_modified datetime DEFAULT NULL,\n\t\t\tassigned_user_id char(36) DEFAULT NULL,\n\t\t\tcontents longtext DEFAULT NULL\n\t\t ) ENGINE=InnoDB DEFAULT CHARSET=utf8"
11781259

0 commit comments

Comments
 (0)