Skip to content

Commit 32b6a18

Browse files
authored
Fixed erase multicolumn primary key (bakwc#54)
1 parent 07c980d commit 32b6a18

File tree

3 files changed

+66
-1
lines changed

3 files changed

+66
-1
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def insert(self, table_name, records, table_structure: TableStructure = None):
174174

175175
def erase(self, table_name, field_name, field_values):
176176
field_name = ','.join(field_name)
177-
field_values = ', '.join(list(map(str, field_values)))
177+
field_values = ', '.join(f'({v})' for v in field_values)
178178
query = DELETE_QUERY.format(**{
179179
'db_name': self.database,
180180
'table_name': table_name,

test_mysql_ch_replicator.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,69 @@ def test_runner():
401401
run_all_runner.stop()
402402

403403

404+
def read_logs(db_name):
405+
return open(os.path.join('binlog', db_name, 'db_replicator.log')).read()
406+
407+
408+
def test_multi_column_erase():
409+
config_file = CONFIG_FILE
410+
411+
cfg = config.Settings()
412+
cfg.load(config_file)
413+
414+
mysql = mysql_api.MySQLApi(
415+
database=None,
416+
mysql_settings=cfg.mysql,
417+
)
418+
419+
ch = clickhouse_api.ClickhouseApi(
420+
database=TEST_DB_NAME,
421+
clickhouse_settings=cfg.clickhouse,
422+
)
423+
424+
mysql.drop_database(TEST_DB_NAME_2)
425+
ch.drop_database(TEST_DB_NAME_2)
426+
427+
prepare_env(cfg, mysql, ch)
428+
429+
mysql.execute(f'''
430+
CREATE TABLE {TEST_TABLE_NAME} (
431+
departments int(11) NOT NULL,
432+
termine int(11) NOT NULL,
433+
PRIMARY KEY (departments,termine)
434+
)
435+
''')
436+
437+
438+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (10, 20);", commit=True)
439+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (30, 40);", commit=True)
440+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (50, 60);", commit=True)
441+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (20, 10);", commit=True)
442+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (40, 30);", commit=True)
443+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (departments, termine) VALUES (60, 50);", commit=True)
444+
445+
run_all_runner = RunAllRunner(cfg_file=config_file)
446+
run_all_runner.run()
447+
448+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
449+
450+
ch.execute_command(f'USE {TEST_DB_NAME}')
451+
452+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
453+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 6)
454+
455+
mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=10;", commit=True)
456+
mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=30;", commit=True)
457+
mysql.execute(f"DELETE FROM {TEST_TABLE_NAME} WHERE departments=50;", commit=True)
458+
459+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
460+
461+
run_all_runner.stop()
462+
463+
assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
464+
assert('Traceback' not in read_logs(TEST_DB_NAME))
465+
466+
404467
def test_initial_only():
405468
cfg = config.Settings()
406469
cfg.load(CONFIG_FILE)

tests_config_mariadb.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ binlog_replicator:
1717

1818
databases: '*test*'
1919
log_level: 'debug'
20+
optimize_interval: 3
21+
check_db_updated_interval: 3

0 commit comments

Comments
 (0)