Skip to content

Commit 17c7318

Browse files
committed
Merge branch 'master' into multi_primary_key
2 parents a6a0161 + 324a78b commit 17c7318

File tree

11 files changed

+219
-15
lines changed

11 files changed

+219
-15
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
[![Release][release-image]][releases]
44
[![License][license-image]][license]
55

6-
[release-image]: https://img.shields.io/badge/release-0.0.33-blue.svg?style=flat
6+
[release-image]: https://img.shields.io/badge/release-0.0.35-blue.svg?style=flat
77
[releases]: https://github.com/bakwc/mysql_ch_replicator/releases
88

99
[license-image]: https://img.shields.io/badge/license-MIT-blue.svg?style=flat
@@ -137,7 +137,8 @@ tables: '*'
137137
exclude_databases: ['database_10', 'database_*_42'] # optional
138138
exclude_tables: ['meta_table_*'] # optional
139139

140-
log_level: 'info' # optional
140+
log_level: 'info' # optional
141+
optimize_interval: 86400 # optional
141142
```
142143
143144
#### Required settings
@@ -152,6 +153,7 @@ log_level: 'info' # optional
152153
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
153154
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
154155
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
156+
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
155157

156158
Few more tables / dbs examples:
157159

mysql_ch_replicator/clickhouse_api.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
class ClickhouseApi:
3333
MAX_RETRIES = 5
3434
RETRY_INTERVAL = 30
35-
def __init__(self, database: str, clickhouse_settings: ClickhouseSettings):
35+
36+
def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings):
3637
self.database = database
3738
self.clickhouse_settings = clickhouse_settings
3839
self.client = clickhouse_connect.get_client(
@@ -175,10 +176,12 @@ def drop_database(self, db_name):
175176
def create_database(self, db_name):
176177
self.cursor.execute(f'CREATE DATABASE {db_name}')
177178

178-
def select(self, table_name, where=None):
179+
def select(self, table_name, where=None, final=None):
179180
query = f'SELECT * FROM {table_name}'
180181
if where:
181182
query += f' WHERE {where}'
183+
if final is not None:
184+
query += f' SETTINGS final = {int(final)};'
182185
result = self.client.query(query)
183186
rows = result.result_rows
184187
columns = result.column_names

mysql_ch_replicator/config.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ def validate(self):
8181

8282

8383
class Settings:
84+
DEFAULT_LOG_LEVEL = 'info'
85+
DEFAULT_OPTIMIZE_INTERVAL = 86400
86+
DEFAULT_CHECK_DB_UPDATED_INTERVAL = 120
8487

8588
def __init__(self):
8689
self.mysql = MysqlSettings()
@@ -93,6 +96,8 @@ def __init__(self):
9396
self.settings_file = ''
9497
self.log_level = 'info'
9598
self.debug_log_level = False
99+
self.optimize_interval = 0
100+
self.check_db_updated_interval = 0
96101

97102
def load(self, settings_file):
98103
data = open(settings_file, 'r').read()
@@ -105,7 +110,11 @@ def load(self, settings_file):
105110
self.tables = data.pop('tables', '*')
106111
self.exclude_databases = data.pop('exclude_databases', '')
107112
self.exclude_tables = data.pop('exclude_tables', '')
108-
self.log_level = data.pop('log_level', 'info')
113+
self.log_level = data.pop('log_level', Settings.DEFAULT_LOG_LEVEL)
114+
self.optimize_interval = data.pop('optimize_interval', Settings.DEFAULT_OPTIMIZE_INTERVAL)
115+
self.check_db_updated_interval = data.pop(
116+
'check_db_updated_interval', Settings.DEFAULT_CHECK_DB_UPDATED_INTERVAL,
117+
)
109118
assert isinstance(self.databases, str) or isinstance(self.databases, list)
110119
assert isinstance(self.tables, str) or isinstance(self.tables, list)
111120
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))

mysql_ch_replicator/converter.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ def convert_type(self, mysql_type, parameters):
194194
return 'String'
195195
if 'varbinary' in mysql_type:
196196
return 'String'
197+
if 'binary' in mysql_type:
198+
return 'String'
197199
raise Exception(f'unknown mysql type "{mysql_type}"')
198200

199201
def convert_field_type(self, mysql_type, mysql_parameters):
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import pickle
2+
import os
3+
import time
4+
from logging import getLogger
5+
6+
from .config import Settings
7+
from .mysql_api import MySQLApi
8+
from .clickhouse_api import ClickhouseApi
9+
from .utils import GracefulKiller
10+
11+
12+
logger = getLogger(__name__)
13+
14+
15+
class State:
16+
17+
def __init__(self, file_name):
18+
self.file_name = file_name
19+
self.last_process_time = {}
20+
self.load()
21+
22+
def load(self):
23+
file_name = self.file_name
24+
if not os.path.exists(file_name):
25+
return
26+
data = open(file_name, 'rb').read()
27+
data = pickle.loads(data)
28+
self.last_process_time = data['last_process_time']
29+
30+
def save(self):
31+
file_name = self.file_name
32+
data = pickle.dumps({
33+
'last_process_time': self.last_process_time,
34+
})
35+
with open(file_name + '.tmp', 'wb') as f:
36+
f.write(data)
37+
os.rename(file_name + '.tmp', file_name)
38+
39+
40+
class DbOptimizer:
41+
def __init__(self, config: Settings):
42+
self.state = State(os.path.join(
43+
config.binlog_replicator.data_dir,
44+
'db_optimizer.bin',
45+
))
46+
self.config = config
47+
self.mysql_api = MySQLApi(
48+
database=None,
49+
mysql_settings=config.mysql,
50+
)
51+
self.clickhouse_api = ClickhouseApi(
52+
database=None,
53+
clickhouse_settings=config.clickhouse,
54+
)
55+
56+
def select_db_to_optimize(self):
57+
databases = self.mysql_api.get_databases()
58+
databases = [db for db in databases if self.config.is_database_matches(db)]
59+
ch_databases = set(self.clickhouse_api.get_databases())
60+
61+
for db in databases:
62+
if db not in ch_databases:
63+
continue
64+
last_process_time = self.state.last_process_time.get(db, 0.0)
65+
if time.time() - last_process_time < self.config.optimize_interval:
66+
continue
67+
return db
68+
return None
69+
70+
def optimize_table(self, db_name, table_name):
71+
logger.info(f'Optimizing table {db_name}.{table_name}')
72+
t1 = time.time()
73+
self.clickhouse_api.execute_command(
74+
f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2'
75+
)
76+
t2 = time.time()
77+
logger.info(f'Optimize finished in {int(t2-t1)} seconds')
78+
79+
def optimize_database(self, db_name):
80+
self.mysql_api.set_database(db_name)
81+
tables = self.mysql_api.get_tables()
82+
tables = [table for table in tables if self.config.is_table_matches(table)]
83+
84+
self.clickhouse_api.execute_command(f'USE {db_name}')
85+
ch_tables = set(self.clickhouse_api.get_tables())
86+
87+
for table in tables:
88+
if table not in ch_tables:
89+
continue
90+
self.optimize_table(db_name, table)
91+
self.state.last_process_time[db_name] = time.time()
92+
self.state.save()
93+
94+
def run(self):
95+
logger.info('running optimizer')
96+
killer = GracefulKiller()
97+
try:
98+
while not killer.kill_now:
99+
db_to_optimize = self.select_db_to_optimize()
100+
if db_to_optimize is None:
101+
time.sleep(min(120, self.config.optimize_interval))
102+
continue
103+
self.optimize_database(db_name=db_to_optimize)
104+
except Exception as e:
105+
logger.error(f'error {e}', exc_info=True)
106+
logger.info('optimizer stopped')

mysql_ch_replicator/main.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .config import Settings
1010
from .db_replicator import DbReplicator
1111
from .binlog_replicator import BinlogReplicator
12+
from .db_optimizer import DbOptimizer
1213
from .monitoring import Monitoring
1314
from .runner import Runner
1415

@@ -97,6 +98,24 @@ def run_db_replicator(args, config: Settings):
9798
db_replicator.run()
9899

99100

101+
def run_db_optimizer(args, config: Settings):
102+
data_dir = config.binlog_replicator.data_dir
103+
if not os.path.exists(data_dir):
104+
os.mkdir(data_dir)
105+
106+
log_file = os.path.join(
107+
data_dir,
108+
'db_optimizer.log',
109+
)
110+
111+
set_logging_config(f'dbopt {args.db}', log_file=log_file, log_level_str=config.log_level)
112+
113+
db_optimizer = DbOptimizer(
114+
config=config,
115+
)
116+
db_optimizer.run()
117+
118+
100119
def run_monitoring(args, config: Settings):
101120
set_logging_config('monitor', log_level_str=config.log_level)
102121
monitoring = Monitoring(args.db or '', config)
@@ -114,7 +133,7 @@ def main():
114133
parser.add_argument(
115134
"mode", help="run mode",
116135
type=str,
117-
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring"])
136+
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring", "db_optimizer"])
118137
parser.add_argument("--config", help="config file path", default='config.yaml', type=str)
119138
parser.add_argument("--db", help="source database(s) name", type=str)
120139
parser.add_argument("--target_db", help="target database(s) name, if not set will be same as source", type=str)
@@ -131,6 +150,8 @@ def main():
131150
run_binlog_replicator(args, config)
132151
if args.mode == 'db_replicator':
133152
run_db_replicator(args, config)
153+
if args.mode == 'db_optimizer':
154+
run_db_optimizer(args, config)
134155
if args.mode == 'monitoring':
135156
run_monitoring(args, config)
136157
if args.mode == 'run_all':

mysql_ch_replicator/mysql_api.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ def __init__(self, database: str, mysql_settings: MysqlSettings):
1717
def close(self):
1818
self.db.close()
1919

20-
def reconnect_if_required(self):
20+
def reconnect_if_required(self, force=False):
2121
curr_time = time.time()
22-
if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL:
22+
if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL and not force:
2323
return
2424
conn_settings = dict(
2525
host=self.mysql_settings.host,
@@ -59,7 +59,7 @@ def set_database(self, database):
5959
self.cursor.execute(f'USE {self.database}')
6060

6161
def get_databases(self):
62-
self.reconnect_if_required()
62+
self.reconnect_if_required(True) # New database appear only after new connection
6363
self.cursor.execute('SHOW DATABASES')
6464
res = self.cursor.fetchall()
6565
tables = [x[0] for x in res]

mysql_ch_replicator/runner.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ def __init__(self, db_name, config_file):
2525
super().__init__(f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator')
2626

2727

28+
class DbOptimizerRunner(ProcessRunner):
29+
def __init__(self, config_file):
30+
super().__init__(f'{sys.argv[0]} --config {config_file} db_optimizer')
31+
32+
2833
class RunAllRunner(ProcessRunner):
2934
def __init__(self, db_name, config_file):
3035
super().__init__(f'{sys.argv[0]} --config {config_file} run_all --db {db_name}')
@@ -35,8 +40,9 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
3540
self.config = config
3641
self.databases = databases or config.databases
3742
self.wait_initial_replication = wait_initial_replication
38-
self.runners: dict = {}
43+
self.runners: dict[str: DbReplicatorRunner] = {}
3944
self.binlog_runner = None
45+
self.db_optimizer = None
4046

4147
def is_initial_replication_finished(self, db_name):
4248
state_path = os.path.join(
@@ -52,6 +58,28 @@ def restart_dead_processes(self):
5258
runner.restart_dead_process_if_required()
5359
if self.binlog_runner is not None:
5460
self.binlog_runner.restart_dead_process_if_required()
61+
if self.db_optimizer is not None:
62+
self.db_optimizer.restart_dead_process_if_required()
63+
64+
def check_databases_updated(self, mysql_api: MySQLApi):
65+
logger.debug('check if databases were created / removed in mysql')
66+
databases = mysql_api.get_databases()
67+
logger.info(f'mysql databases: {databases}')
68+
databases = [db for db in databases if self.config.is_database_matches(db)]
69+
logger.info(f'mysql databases filtered: {databases}')
70+
for db in databases:
71+
if db in self.runners:
72+
continue
73+
logger.info(f'running replication for {db} (database created in mysql)')
74+
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)
75+
runner.run()
76+
77+
for db in self.runners.keys():
78+
if db in databases:
79+
continue
80+
logger.info(f'stop replication for {db} (database removed from mysql)')
81+
self.runners[db].stop()
82+
self.runners.pop(db)
5583

5684
def run(self):
5785
mysql_api = MySQLApi(
@@ -65,6 +93,9 @@ def run(self):
6593
self.binlog_runner = BinlogReplicatorRunner(self.config.settings_file)
6694
self.binlog_runner.run()
6795

96+
self.db_optimizer = DbOptimizerRunner(self.config.settings_file)
97+
self.db_optimizer.run()
98+
6899
# First - continue replication for DBs that already finished initial replication
69100
for db in databases:
70101
if not self.is_initial_replication_finished(db_name=db):
@@ -90,16 +121,24 @@ def run(self):
90121

91122
logger.info('all replicators launched')
92123

124+
last_check_db_updated = time.time()
93125
while not killer.kill_now:
94126
time.sleep(1)
95127
self.restart_dead_processes()
128+
if time.time() - last_check_db_updated > self.config.check_db_updated_interval:
129+
self.check_databases_updated(mysql_api=mysql_api)
130+
last_check_db_updated = time.time()
96131

97132
logger.info('stopping runner')
98133

99134
if self.binlog_runner is not None:
100135
logger.info('stopping binlog replication')
101136
self.binlog_runner.stop()
102137

138+
if self.db_optimizer is not None:
139+
logger.info('stopping db_optimizer')
140+
self.db_optimizer.stop()
141+
103142
for db_name, db_replication_runner in self.runners.items():
104143
logger.info(f'stopping replication for {db_name}')
105144
db_replication_runner.stop()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "mysql-ch-replicator"
3-
version = "0.0.33"
3+
version = "0.0.35"
44
description = "Tool for replication of MySQL databases to ClickHouse"
55
authors = ["Filipp Ozinov <[email protected]>"]
66
license = "MIT"

0 commit comments

Comments
 (0)