Skip to content

Commit 447afa5

Browse files
committed
Merge branch 'master' into support_types_mapping_and_uuid
2 parents e991f4e + 6410a7b commit 447afa5

File tree

5 files changed

+122
-10
lines changed

5 files changed

+122
-10
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ tables: '*'
158158
exclude_databases: ['database_10', 'database_*_42'] # optional
159159
exclude_tables: ['meta_table_*'] # optional
160160

161+
target_databases: # optional
162+
source_db_in_mysql_1: destination_db_in_clickhouse_1
163+
source_db_in_mysql_2: destination_db_in_clickhouse_2
164+
...
165+
161166
log_level: 'info' # optional
162167
optimize_interval: 86400 # optional
163168
auto_restart_interval: 3600 # optional
@@ -187,6 +192,7 @@ types_mapping: # optional
187192
- `tables` - tables to filter, list is also supported
188193
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
189194
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
195+
- `target_databases` - if you want database in ClickHouse to have different name from MySQL database
190196
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
191197
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
192198
- `auto_restart_interval` - interval (seconds) between automatic db_replicator restart. Default 3600 (1 hour). This is done to reduce memory usage.

mysql_ch_replicator/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def __init__(self):
111111
self.http_host = ''
112112
self.http_port = 0
113113
self.types_mapping = {}
114+
self.target_databases = {}
114115

115116
def load(self, settings_file):
116117
data = open(settings_file, 'r').read()
@@ -134,6 +135,7 @@ def load(self, settings_file):
134135
self.types_mapping = data.pop('types_mapping', {})
135136
self.http_host = data.pop('http_host', '')
136137
self.http_port = data.pop('http_port', 0)
138+
self.target_databases = data.pop('target_databases', {})
137139

138140
indexes = data.pop('indexes', [])
139141
for index in indexes:
@@ -191,3 +193,5 @@ def validate(self):
191193
self.clickhouse.validate()
192194
self.binlog_replicator.validate()
193195
self.validate_log_level()
196+
if not isinstance(self.target_databases, dict):
197+
raise ValueError(f'wrong target databases {self.target_databases}')

mysql_ch_replicator/db_replicator.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def save(self):
6868
'tables_structure': self.tables_structure,
6969
'tables': self.tables,
7070
'pid': os.getpid(),
71+
'save_time': time.time(),
7172
})
7273
with open(file_name + '.tmp', 'wb') as f:
7374
f.write(data)
@@ -108,7 +109,19 @@ class DbReplicator:
108109
def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False):
109110
self.config = config
110111
self.database = database
111-
self.target_database = target_database or database
112+
113+
# use same as source database by default
114+
self.target_database = database
115+
116+
# use target database from config file if exists
117+
target_database_from_config = config.target_databases.get(database)
118+
if target_database_from_config:
119+
self.target_database = target_database_from_config
120+
121+
# use command line argument if exists
122+
if target_database:
123+
self.target_database = target_database
124+
112125
self.target_database_tmp = self.target_database + '_tmp'
113126
self.initial_only = initial_only
114127

test_mysql_ch_replicator.py

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml'
2424
TEST_DB_NAME = 'replication-test_db'
2525
TEST_DB_NAME_2 = 'replication-test_db_2'
26+
TEST_DB_NAME_2_DESTINATION = 'replication-destination'
27+
2628
TEST_TABLE_NAME = 'test_table'
2729
TEST_TABLE_NAME_2 = 'test_table_2'
2830
TEST_TABLE_NAME_3 = 'test_table_3'
@@ -102,18 +104,18 @@ def test_e2e_regular(config_file):
102104
CREATE TABLE `{TEST_TABLE_NAME}` (
103105
id int NOT NULL AUTO_INCREMENT,
104106
name varchar(255) COMMENT 'Dân tộc, ví dụ: Kinh',
105-
`age x` int COMMENT 'CMND Cũ',
107+
age int COMMENT 'CMND Cũ',
106108
field1 text,
107109
field2 blob,
108110
PRIMARY KEY (id)
109111
);
110112
''')
111113

112114
mysql.execute(
113-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');",
115+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, field1, field2) VALUES ('Ivan', 42, 'test1', 'test2');",
114116
commit=True,
115117
)
116-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Peter', 33);", commit=True)
118+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Peter', 33);", commit=True)
117119

118120
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
119121
binlog_replicator_runner.run()
@@ -127,13 +129,13 @@ def test_e2e_regular(config_file):
127129
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
128130
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
129131

130-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`) VALUES ('Filipp', 50);", commit=True)
132+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('Filipp', 50);", commit=True)
131133
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
132-
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age x'] == 50)
134+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Filipp'")[0]['age'] == 50)
133135

134136

135137
mysql.execute(f"ALTER TABLE `{TEST_TABLE_NAME}` ADD `last_name` varchar(255); ")
136-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name) VALUES ('Mary', 24, 'Smith');", commit=True)
138+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name) VALUES ('Mary', 24, 'Smith');", commit=True)
137139

138140
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)
139141
assert_wait(lambda: ch.select(TEST_TABLE_NAME, where="name='Mary'")[0]['last_name'] == 'Smith')
@@ -148,7 +150,7 @@ def test_e2e_regular(config_file):
148150
)
149151

150152
mysql.execute(
151-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, `age x`, last_name, country) "
153+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, last_name, country) "
152154
f"VALUES ('John', 12, 'Doe', 'USA');", commit=True,
153155
)
154156

@@ -316,6 +318,7 @@ def test_runner():
316318

317319
mysql.drop_database(TEST_DB_NAME_2)
318320
ch.drop_database(TEST_DB_NAME_2)
321+
ch.drop_database(TEST_DB_NAME_2_DESTINATION)
319322

320323
prepare_env(cfg, mysql, ch)
321324

@@ -418,7 +421,7 @@ def test_runner():
418421
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "age=1912")[0]['name'] == 'Hällo')
419422

420423
mysql.create_database(TEST_DB_NAME_2)
421-
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases())
424+
assert_wait(lambda: TEST_DB_NAME_2_DESTINATION in ch.get_databases())
422425

423426
mysql.execute(f'''
424427
CREATE TABLE `group` (
@@ -459,7 +462,7 @@ def test_multi_column_erase():
459462
)
460463

461464
mysql.drop_database(TEST_DB_NAME_2)
462-
ch.drop_database(TEST_DB_NAME_2)
465+
ch.drop_database(TEST_DB_NAME_2_DESTINATION)
463466

464467
prepare_env(cfg, mysql, ch)
465468

@@ -711,6 +714,89 @@ def test_datetime_exception():
711714
binlog_replicator_runner.stop()
712715

713716

717+
def test_performance():
718+
config_file = 'tests_config_perf.yaml'
719+
num_records = 100000
720+
721+
cfg = config.Settings()
722+
cfg.load(config_file)
723+
724+
mysql = mysql_api.MySQLApi(
725+
database=None,
726+
mysql_settings=cfg.mysql,
727+
)
728+
729+
ch = clickhouse_api.ClickhouseApi(
730+
database=TEST_DB_NAME,
731+
clickhouse_settings=cfg.clickhouse,
732+
)
733+
734+
prepare_env(cfg, mysql, ch)
735+
736+
mysql.execute(f'''
737+
CREATE TABLE `{TEST_TABLE_NAME}` (
738+
id int NOT NULL AUTO_INCREMENT,
739+
name varchar(2048),
740+
age int,
741+
PRIMARY KEY (id)
742+
);
743+
''')
744+
745+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
746+
binlog_replicator_runner.run()
747+
748+
time.sleep(1)
749+
750+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True)
751+
752+
def _get_last_insert_name():
753+
record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME)
754+
if record is None:
755+
return None
756+
return record[1].decode('utf-8')
757+
758+
assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5)
759+
760+
binlog_replicator_runner.stop()
761+
762+
time.sleep(1)
763+
764+
print("populating mysql data")
765+
766+
base_value = 'a' * 2000
767+
768+
for i in range(num_records):
769+
if i % 2000 == 0:
770+
print(f'populated {i} elements')
771+
mysql.execute(
772+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
773+
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
774+
)
775+
776+
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)
777+
778+
print("running db_replicator")
779+
t1 = time.time()
780+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
781+
binlog_replicator_runner.run()
782+
783+
assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_FINAL', retry_interval=0.5, max_wait_time=1000)
784+
t2 = time.time()
785+
786+
binlog_replicator_runner.stop()
787+
788+
time_delta = t2 - t1
789+
rps = num_records / time_delta
790+
791+
print('\n\n')
792+
print("*****************************")
793+
print("records per second:", int(rps))
794+
print("total time (seconds):", round(time_delta, 2))
795+
print("*****************************")
796+
print('\n\n')
797+
798+
799+
714800
def test_different_types_1():
715801
cfg = config.Settings()
716802
cfg.load(CONFIG_FILE)

tests_config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ log_level: 'debug'
2020
optimize_interval: 3
2121
check_db_updated_interval: 3
2222

23+
target_databases:
24+
replication-test_db_2: replication-destination
25+
2326
indexes:
2427
- databases: '*'
2528
tables: ['group']

0 commit comments

Comments
 (0)