|
8 | 8 | from mysql_ch_replicator import mysql_api |
9 | 9 | from mysql_ch_replicator import clickhouse_api |
10 | 10 | from mysql_ch_replicator.binlog_replicator import State as BinlogState |
11 | | -from mysql_ch_replicator.db_replicator import State as DbReplicatorState |
| 11 | +from mysql_ch_replicator.db_replicator import State as DbReplicatorState, DbReplicator |
12 | 12 |
|
13 | 13 | from mysql_ch_replicator.runner import ProcessRunner |
14 | 14 |
|
@@ -704,3 +704,62 @@ def test_json(): |
704 | 704 |
|
705 | 705 | assert json.loads(ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['data'])['c'] == [1, 2, 3] |
706 | 706 | assert json.loads(ch.select(TEST_TABLE_NAME, "name='Peter'")[0]['data'])['c'] == [3, 2, 1] |
| 707 | + |
| 708 | + |
| 709 | +def test_string_primary_key(monkeypatch): |
| 710 | + monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1) |
| 711 | + |
| 712 | + cfg = config.Settings() |
| 713 | + cfg.load(CONFIG_FILE) |
| 714 | + |
| 715 | + mysql = mysql_api.MySQLApi( |
| 716 | + database=None, |
| 717 | + mysql_settings=cfg.mysql, |
| 718 | + ) |
| 719 | + |
| 720 | + ch = clickhouse_api.ClickhouseApi( |
| 721 | + database=TEST_DB_NAME, |
| 722 | + clickhouse_settings=cfg.clickhouse, |
| 723 | + ) |
| 724 | + |
| 725 | + prepare_env(cfg, mysql, ch) |
| 726 | + |
| 727 | + mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';") |
| 728 | + |
| 729 | + mysql.execute(f''' |
| 730 | +CREATE TABLE {TEST_TABLE_NAME} ( |
| 731 | + `id` char(30) NOT NULL, |
| 732 | + name varchar(255), |
| 733 | + PRIMARY KEY (id) |
| 734 | +); |
| 735 | + ''') |
| 736 | + |
| 737 | + mysql.execute( |
| 738 | + f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + |
| 739 | + """('01', 'Ivan');""", |
| 740 | + commit=True, |
| 741 | + ) |
| 742 | + mysql.execute( |
| 743 | + f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + |
| 744 | + """('02', 'Peter');""", |
| 745 | + commit=True, |
| 746 | + ) |
| 747 | + |
| 748 | + binlog_replicator_runner = BinlogReplicatorRunner() |
| 749 | + binlog_replicator_runner.run() |
| 750 | + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME) |
| 751 | + db_replicator_runner.run() |
| 752 | + |
| 753 | + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) |
| 754 | + |
| 755 | + ch.execute_command(f'USE {TEST_DB_NAME}') |
| 756 | + |
| 757 | + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) |
| 758 | + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2) |
| 759 | + |
| 760 | + mysql.execute( |
| 761 | + f"INSERT INTO {TEST_TABLE_NAME} (id, name) VALUES " + |
| 762 | + """('03', 'Filipp');""", |
| 763 | + commit=True, |
| 764 | + ) |
| 765 | + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3) |
0 commit comments