Skip to content

Commit 6410a7b

Browse files
authored
Option to set another name for destination database (#96)
1 parent ca057c1 commit 6410a7b

File tree

5 files changed

+122
-10
lines changed

5 files changed

+122
-10
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ tables: '*'
158158
exclude_databases: ['database_10', 'database_*_42'] # optional
159159
exclude_tables: ['meta_table_*'] # optional
160160

161+
target_databases: # optional
162+
source_db_in_mysql_1: destination_db_in_clickhouse_1
163+
source_db_in_mysql_2: destination_db_in_clickhouse_2
164+
...
165+
161166
log_level: 'info' # optional
162167
optimize_interval: 86400 # optional
163168
auto_restart_interval: 3600 # optional
@@ -183,6 +188,7 @@ http_port: 9128 # optional
183188
- `tables` - tables to filter, list is also supported
184189
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
185190
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
191+
- `target_databases` - if you want database in ClickHouse to have different name from MySQL database
186192
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
187193
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
188194
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.

mysql_ch_replicator/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def __init__(self):
110110
self.auto_restart_interval = 0
111111
self.http_host = ''
112112
self.http_port = 0
113+
self.target_databases = {}
113114

114115
def load(self, settings_file):
115116
data = open(settings_file, 'r').read()
@@ -132,6 +133,7 @@ def load(self, settings_file):
132133
)
133134
self.http_host = data.pop('http_host', '')
134135
self.http_port = data.pop('http_port', 0)
136+
self.target_databases = data.pop('target_databases', {})
135137

136138
indexes = data.pop('indexes', [])
137139
for index in indexes:
@@ -189,3 +191,5 @@ def validate(self):
189191
self.clickhouse.validate()
190192
self.binlog_replicator.validate()
191193
self.validate_log_level()
194+
if not isinstance(self.target_databases, dict):
195+
raise ValueError(f'wrong target databases {self.target_databases}')

mysql_ch_replicator/db_replicator.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def save(self):
6868
'tables_structure': self.tables_structure,
6969
'tables': self.tables,
7070
'pid': os.getpid(),
71+
'save_time': time.time(),
7172
})
7273
with open(file_name + '.tmp', 'wb') as f:
7374
f.write(data)
@@ -108,7 +109,19 @@ class DbReplicator:
108109
def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False):
109110
self.config = config
110111
self.database = database
111-
self.target_database = target_database or database
112+
113+
# use same as source database by default
114+
self.target_database = database
115+
116+
# use target database from config file if exists
117+
target_database_from_config = config.target_databases.get(database)
118+
if target_database_from_config:
119+
self.target_database = target_database_from_config
120+
121+
# use command line argument if exists
122+
if target_database:
123+
self.target_database = target_database
124+
112125
self.target_database_tmp = self.target_database + '_tmp'
113126
self.initial_only = initial_only
114127

test_mysql_ch_replicator.py

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml'
2222
TEST_DB_NAME = 'replication-test_db'
2323
TEST_DB_NAME_2 = 'replication-test_db_2'
24+
TEST_DB_NAME_2_DESTINATION = 'replication-destination'
25+
2426
TEST_TABLE_NAME = 'test_table'
2527
TEST_TABLE_NAME_2 = 'test_table_2'
2628
TEST_TABLE_NAME_3 = 'test_table_3'
@@ -100,18 +102,18 @@ def test_e2e_regular(config_file):
100102
CREATE TABLE `{TEST_TABLE_NAME}` (
101103
id int NOT NULL AUTO_INCREMENT,
102104
name varchar(255) COMMENT 'Dân tộc, ví dụ: Kinh',
103-
`age x` int COMMENT 'CMND Cũ',
105+
age int COMMENT 'CMND Cũ',
104106
field1 text,
105107
field2 blob,
106108
PRIMARY KEY (id)
107109
);
108110
''')
109111

110112
mysql.execute(
111-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');",
113+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');",
112114
commit=True,
113115
)
114-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Peter', 33);", commit=True)
116+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Peter', 33);", commit=True)
115117

116118
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
117119
binlog_replicator_runner.run()
@@ -125,13 +127,13 @@ def test_e2e_regular(config_file):
125127
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
126128
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
127129

128-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Filipp', 50);", commit=True)
130+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True)
129131
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
130-
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age x'] == 50)
132+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)
131133

132134

133135
mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD `last_name` varchar(255); ")
134-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name) VALUES ('Mary', 24, 'Smith');", commit=True)
136+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name) VALUES ('Mary', 24, 'Smith');", commit=True)
135137

136138
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)
137139
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0]['last_name'] == 'Smith')
@@ -146,7 +148,7 @@ def test_e2e_regular(config_file):
146148
)
147149

148150
mysql.execute(
149-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name, country) "
151+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name, country) "
150152
f"VALUES ('John', 12, 'Doe', 'USA');", commit=True,
151153
)
152154

@@ -314,6 +316,7 @@ def test_runner():
314316

315317
mysql.drop_database(TEST_DB_NAME_2)
316318
ch.drop_database(TEST_DB_NAME_2)
319+
ch.drop_database(TEST_DB_NAME_2_DESTINATION)
317320

318321
prepare_env(cfg, mysql, ch)
319322

@@ -416,7 +419,7 @@ def test_runner():
416419
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo')
417420

418421
mysql.create_database(TEST_DB_NAME_2)
419-
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())
422+
assert_wait(lambda: TEST_DB_NAME_2_DESTINATION in ch.get_databases())
420423

421424
mysql.execute(f'''
422425
CREATE TABLE `group` (
@@ -457,7 +460,7 @@ def test_multi_column_erase():
457460
)
458461

459462
mysql.drop_database(TEST_DB_NAME_2)
460-
ch.drop_database(TEST_DB_NAME_2)
463+
ch.drop_database(TEST_DB_NAME_2_DESTINATION)
461464

462465
prepare_env(cfg, mysql, ch)
463466

@@ -709,6 +712,89 @@ def test_datetime_exception():
709712
binlog_replicator_runner.stop()
710713

711714

715+
def test_performance():
716+
config_file = 'tests_config_perf.yaml'
717+
num_records = 100000
718+
719+
cfg = config.Settings()
720+
cfg.load(config_file)
721+
722+
mysql = mysql_api.MySQLApi(
723+
database=None,
724+
mysql_settings=cfg.mysql,
725+
)
726+
727+
ch = clickhouse_api.ClickhouseApi(
728+
database=TEST_DB_NAME,
729+
clickhouse_settings=cfg.clickhouse,
730+
)
731+
732+
prepare_env(cfg, mysql, ch)
733+
734+
mysql.execute(f'''
735+
CREATE TABLE `{TEST_TABLE_NAME}` (
736+
id int NOT NULL AUTO_INCREMENT,
737+
name varchar(2048),
738+
age int,
739+
PRIMARY KEY (id)
740+
);
741+
''')
742+
743+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
744+
binlog_replicator_runner.run()
745+
746+
time.sleep(1)
747+
748+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True)
749+
750+
def _get_last_insert_name():
751+
record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME)
752+
if record is None:
753+
return None
754+
return record[1].decode('utf-8')
755+
756+
assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5)
757+
758+
binlog_replicator_runner.stop()
759+
760+
time.sleep(1)
761+
762+
print("populating mysql data")
763+
764+
base_value = 'a' * 2000
765+
766+
for i in range(num_records):
767+
if i % 2000 == 0:
768+
print(f'populated {i} elements')
769+
mysql.execute(
770+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
771+
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
772+
)
773+
774+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)
775+
776+
print("running db_replicator")
777+
t1 = time.time()
778+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
779+
binlog_replicator_runner.run()
780+
781+
assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_FINAL', retry_interval=0.5, max_wait_time=1000)
782+
t2 = time.time()
783+
784+
binlog_replicator_runner.stop()
785+
786+
time_delta = t2 - t1
787+
rps = num_records / time_delta
788+
789+
print('\n\n')
790+
print("*****************************")
791+
print("records per second:", int(rps))
792+
print("total time (seconds):", round(time_delta, 2))
793+
print("*****************************")
794+
print('\n\n')
795+
796+
797+
712798
def test_different_types_1():
713799
cfg = config.Settings()
714800
cfg.load(CONFIG_FILE)

tests_config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ log_level: 'debug'
2020
optimize_interval: 3
2121
check_db_updated_interval: 3
2222

23+
target_databases:
24+
replication-test_db_2: replication-destination
25+
2326
indexes:
2427
- databases: '*'
2528
tables: ['group']

0 commit comments

Comments
 (0)