Skip to content

Commit 6c47fc3

Browse files
authored
Call OPTIMIZE automatically (bakwc#36)
1 parent cdaf5ba commit 6c47fc3

File tree

8 files changed

+166
-7
lines changed

8 files changed

+166
-7
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ tables: '*'
137137
exclude_databases: ['database_10', 'database_*_42'] # optional
138138
exclude_tables: ['meta_table_*'] # optional
139139

140-
log_level: 'info' # optional
140+
log_level: 'info' # optional
141+
optimize_interval: 86400 # optional
141142
```
142143
143144
#### Required settings
@@ -152,6 +153,7 @@ log_level: 'info' # optional
152153
- `exclude_databases` - databases to __exclude__, string or list, eg `'table1*'` or `['table2', 'table3*']`. If same database matches `databases` and `exclude_databases`, exclude has higher priority.
153154
- `exclude_tables` - databases to __exclude__, string or list. If same table matches `tables` and `exclude_tables`, exclude has higher priority.
154155
- `log_level` - log level, default is `info`, you can set to `debug` to get maximum information (allowed values are `debug`, `info`, `warning`, `error`, `critical`)
156+
- `optimize_interval` - interval (seconds) between automatic `OPTIMIZE table FINAL` calls. Default 86400 (1 day). This is required to perform all merges guaranteed and avoid increasing of used storage and decreasing performance.
155157

156158
Few more tables / dbs examples:
157159

mysql_ch_replicator/clickhouse_api.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
class ClickhouseApi:
3333
MAX_RETRIES = 5
3434
RETRY_INTERVAL = 30
35-
def __init__(self, database: str, clickhouse_settings: ClickhouseSettings):
35+
36+
def __init__(self, database: str | None, clickhouse_settings: ClickhouseSettings):
3637
self.database = database
3738
self.clickhouse_settings = clickhouse_settings
3839
self.client = clickhouse_connect.get_client(
@@ -175,10 +176,12 @@ def drop_database(self, db_name):
175176
def create_database(self, db_name):
176177
self.cursor.execute(f'CREATE DATABASE {db_name}')
177178

178-
def select(self, table_name, where=None):
179+
def select(self, table_name, where=None, final=None):
179180
query = f'SELECT * FROM {table_name}'
180181
if where:
181182
query += f' WHERE {where}'
183+
if final is not None:
184+
query += f' SETTINGS final = {int(final)};'
182185
result = self.client.query(query)
183186
rows = result.result_rows
184187
columns = result.column_names

mysql_ch_replicator/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ def validate(self):
8181

8282

8383
class Settings:
84+
DEFAULT_LOG_LEVEL = 'info'
85+
DEFAULT_OPTIMIZE_INTERVAL = 86400
8486

8587
def __init__(self):
8688
self.mysql = MysqlSettings()
@@ -93,6 +95,7 @@ def __init__(self):
9395
self.settings_file = ''
9496
self.log_level = 'info'
9597
self.debug_log_level = False
98+
self.optimize_interval = 0
9699

97100
def load(self, settings_file):
98101
data = open(settings_file, 'r').read()
@@ -105,7 +108,8 @@ def load(self, settings_file):
105108
self.tables = data.pop('tables', '*')
106109
self.exclude_databases = data.pop('exclude_databases', '')
107110
self.exclude_tables = data.pop('exclude_tables', '')
108-
self.log_level = data.pop('log_level', 'info')
111+
self.log_level = data.pop('log_level', Settings.DEFAULT_LOG_LEVEL)
112+
self.optimize_interval = data.pop('optimize_interval', Settings.DEFAULT_OPTIMIZE_INTERVAL)
109113
assert isinstance(self.databases, str) or isinstance(self.databases, list)
110114
assert isinstance(self.tables, str) or isinstance(self.tables, list)
111115
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import pickle
2+
import os
3+
import time
4+
from logging import getLogger
5+
6+
from .config import Settings
7+
from .mysql_api import MySQLApi
8+
from .clickhouse_api import ClickhouseApi
9+
from .utils import GracefulKiller
10+
11+
12+
logger = getLogger(__name__)
13+
14+
15+
class State:
16+
17+
def __init__(self, file_name):
18+
self.file_name = file_name
19+
self.last_process_time = {}
20+
self.load()
21+
22+
def load(self):
23+
file_name = self.file_name
24+
if not os.path.exists(file_name):
25+
return
26+
data = open(file_name, 'rb').read()
27+
data = pickle.loads(data)
28+
self.last_process_time = data['last_process_time']
29+
30+
def save(self):
31+
file_name = self.file_name
32+
data = pickle.dumps({
33+
'last_process_time': self.last_process_time,
34+
})
35+
with open(file_name + '.tmp', 'wb') as f:
36+
f.write(data)
37+
os.rename(file_name + '.tmp', file_name)
38+
39+
40+
class DbOptimizer:
41+
def __init__(self, config: Settings):
42+
self.state = State(os.path.join(
43+
config.binlog_replicator.data_dir,
44+
'db_optimizer.bin',
45+
))
46+
self.config = config
47+
self.mysql_api = MySQLApi(
48+
database=None,
49+
mysql_settings=config.mysql,
50+
)
51+
self.clickhouse_api = ClickhouseApi(
52+
database=None,
53+
clickhouse_settings=config.clickhouse,
54+
)
55+
56+
def select_db_to_optimize(self):
57+
databases = self.mysql_api.get_databases()
58+
databases = [db for db in databases if self.config.is_database_matches(db)]
59+
ch_databases = set(self.clickhouse_api.get_databases())
60+
61+
for db in databases:
62+
if db not in ch_databases:
63+
continue
64+
last_process_time = self.state.last_process_time.get(db, 0.0)
65+
if time.time() - last_process_time < self.config.optimize_interval:
66+
continue
67+
return db
68+
return None
69+
70+
def optimize_table(self, db_name, table_name):
71+
logger.info(f'Optimizing table {db_name}.{table_name}')
72+
self.clickhouse_api.execute_command(
73+
f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2'
74+
)
75+
logger.info('Optimize finished')
76+
self.state.last_process_time[db_name] = time.time()
77+
78+
def optimize_database(self, db_name):
79+
self.mysql_api.set_database(db_name)
80+
tables = self.mysql_api.get_tables()
81+
tables = [table for table in tables if self.config.is_table_matches(table)]
82+
83+
self.clickhouse_api.execute_command(f'USE {db_name}')
84+
ch_tables = set(self.clickhouse_api.get_tables())
85+
86+
for table in tables:
87+
if table not in ch_tables:
88+
continue
89+
self.optimize_table(db_name, table)
90+
self.state.save()
91+
92+
def run(self):
93+
logger.info('running optimizer')
94+
killer = GracefulKiller()
95+
try:
96+
while not killer.kill_now:
97+
db_to_optimize = self.select_db_to_optimize()
98+
if db_to_optimize is None:
99+
time.sleep(min(120, self.config.optimize_interval))
100+
continue
101+
self.optimize_database(db_name=db_to_optimize)
102+
except Exception as e:
103+
logger.error(f'error {e}', exc_info=True)
104+
logger.info('optimizer stopped')

mysql_ch_replicator/main.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .config import Settings
1010
from .db_replicator import DbReplicator
1111
from .binlog_replicator import BinlogReplicator
12+
from .db_optimizer import DbOptimizer
1213
from .monitoring import Monitoring
1314
from .runner import Runner
1415

@@ -97,6 +98,24 @@ def run_db_replicator(args, config: Settings):
9798
db_replicator.run()
9899

99100

101+
def run_db_optimizer(args, config: Settings):
102+
data_dir = config.binlog_replicator.data_dir
103+
if not os.path.exists(data_dir):
104+
os.mkdir(data_dir)
105+
106+
log_file = os.path.join(
107+
data_dir,
108+
'db_optimizer.log',
109+
)
110+
111+
set_logging_config(f'dbopt {args.db}', log_file=log_file, log_level_str=config.log_level)
112+
113+
db_optimizer = DbOptimizer(
114+
config=config,
115+
)
116+
db_optimizer.run()
117+
118+
100119
def run_monitoring(args, config: Settings):
101120
set_logging_config('monitor', log_level_str=config.log_level)
102121
monitoring = Monitoring(args.db or '', config)
@@ -114,7 +133,7 @@ def main():
114133
parser.add_argument(
115134
"mode", help="run mode",
116135
type=str,
117-
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring"])
136+
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring", "db_optimizer"])
118137
parser.add_argument("--config", help="config file path", default='config.yaml', type=str)
119138
parser.add_argument("--db", help="source database(s) name", type=str)
120139
parser.add_argument("--target_db", help="target database(s) name, if not set will be same as source", type=str)
@@ -131,6 +150,8 @@ def main():
131150
run_binlog_replicator(args, config)
132151
if args.mode == 'db_replicator':
133152
run_db_replicator(args, config)
153+
if args.mode == 'db_optimizer':
154+
run_db_optimizer(args, config)
134155
if args.mode == 'monitoring':
135156
run_monitoring(args, config)
136157
if args.mode == 'run_all':

mysql_ch_replicator/runner.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ def __init__(self, db_name, config_file):
2525
super().__init__(f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator')
2626

2727

28+
class DbOptimizerRunner(ProcessRunner):
29+
def __init__(self, config_file):
30+
super().__init__(f'{sys.argv[0]} --config {config_file} db_optimizer')
31+
32+
2833
class RunAllRunner(ProcessRunner):
2934
def __init__(self, db_name, config_file):
3035
super().__init__(f'{sys.argv[0]} --config {config_file} run_all --db {db_name}')
@@ -37,6 +42,7 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
3742
self.wait_initial_replication = wait_initial_replication
3843
self.runners: dict = {}
3944
self.binlog_runner = None
45+
self.db_optimizer = None
4046

4147
def is_initial_replication_finished(self, db_name):
4248
state_path = os.path.join(
@@ -65,6 +71,9 @@ def run(self):
6571
self.binlog_runner = BinlogReplicatorRunner(self.config.settings_file)
6672
self.binlog_runner.run()
6773

74+
self.db_optimizer = DbOptimizerRunner(self.config.settings_file)
75+
self.db_optimizer.run()
76+
6877
# First - continue replication for DBs that already finished initial replication
6978
for db in databases:
7079
if not self.is_initial_replication_finished(db_name=db):
@@ -100,6 +109,10 @@ def run(self):
100109
logger.info('stopping binlog replication')
101110
self.binlog_runner.stop()
102111

112+
if self.db_optimizer is not None:
113+
logger.info('stopping db_optimizer')
114+
self.db_optimizer.stop()
115+
103116
for db_name, db_replication_runner in self.runners.items():
104117
logger.info(f'stopping replication for {db_name}')
105118
db_replication_runner.stop()

test_mysql_ch_replicator.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,19 @@ def test_runner():
344344
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
345345

346346
mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=66 WHERE name='Ivan'", commit=True)
347-
time.sleep(4)
348-
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
347+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 66)
348+
349+
mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=77 WHERE name='Ivan'", commit=True)
350+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 77)
351+
352+
mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=88 WHERE name='Ivan'", commit=True)
353+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 88)
354+
355+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Vlad', 99);", commit=True)
356+
357+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)
358+
359+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)
349360

350361
run_all_runner.stop()
351362

tests_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ binlog_replicator:
1717

1818
databases: '*test*'
1919
log_level: 'debug'
20+
optimize_interval: 3

0 commit comments

Comments
 (0)