Skip to content

Commit 9d8d1cf

Browse files
committed
Tests for optimizer
1 parent 405e2d7 commit 9d8d1cf

File tree

5 files changed

+28
-12
lines changed

5 files changed

+28
-12
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,12 @@ def drop_database(self, db_name):
176176
def create_database(self, db_name):
177177
self.cursor.execute(f'CREATE DATABASE {db_name}')
178178

179-
def select(self, table_name, where=None):
179+
def select(self, table_name, where=None, final=None):
180180
query = f'SELECT * FROM {table_name}'
181181
if where:
182182
query += f' WHERE {where}'
183+
if final is not None:
184+
query += f' SETTINGS final = {int(final)};'
183185
result = self.client.query(query)
184186
rows = result.result_rows
185187
columns = result.column_names

mysql_ch_replicator/db_optimizer.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import time
44
from logging import getLogger
55

6-
from config import Settings
7-
from mysql_api import MySQLApi
8-
from clickhouse_api import ClickhouseApi
6+
from .config import Settings
7+
from .mysql_api import MySQLApi
8+
from .clickhouse_api import ClickhouseApi
9+
from .utils import GracefulKiller
910

1011

1112
logger = getLogger(__name__)
@@ -40,7 +41,7 @@ class DbOptimizer:
4041
def __init__(self, config: Settings):
4142
self.state = State(os.path.join(
4243
config.binlog_replicator.data_dir,
43-
''
44+
'db_optimizer.bin',
4445
))
4546
self.config = config
4647
self.mysql_api = MySQLApi(
@@ -72,6 +73,7 @@ def optimize_table(self, db_name, table_name):
7273
f'OPTIMIZE TABLE {db_name}.{table_name} FINAL SETTINGS mutations_sync = 2'
7374
)
7475
logger.info('Optimize finished')
76+
self.state.last_process_time[db_name] = time.time()
7577

7678
def optimize_database(self, db_name):
7779
self.mysql_api.set_database(db_name)
@@ -87,15 +89,15 @@ def optimize_database(self, db_name):
8789
self.optimize_table(db_name, table)
8890

8991
def run(self):
90-
while True:
91-
time.sleep(999)
92-
92+
logger.info('running optimizer')
93+
killer = GracefulKiller()
9394
try:
94-
while True:
95+
while not killer.kill_now:
9596
db_to_optimize = self.select_db_to_optimize()
9697
if db_to_optimize is None:
9798
time.sleep(min(120, self.config.optimize_interval))
9899
continue
99100
self.optimize_database(db_name=db_to_optimize)
100101
except Exception as e:
101102
logger.error(f'error {e}', exc_info=True)
103+
logger.info('optimizer stopped')

mysql_ch_replicator/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def main():
133133
parser.add_argument(
134134
"mode", help="run mode",
135135
type=str,
136-
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring"])
136+
choices=["run_all", "binlog_replicator", "db_replicator", "monitoring", "db_optimizer"])
137137
parser.add_argument("--config", help="config file path", default='config.yaml', type=str)
138138
parser.add_argument("--db", help="source database(s) name", type=str)
139139
parser.add_argument("--target_db", help="target database(s) name, if not set will be same as source", type=str)

test_mysql_ch_replicator.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,19 @@ def test_runner():
344344
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
345345

346346
mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=66 WHERE name='Ivan'", commit=True)
347-
time.sleep(4)
348-
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
347+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 66)
348+
349+
mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=77 WHERE name='Ivan'", commit=True)
350+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 77)
351+
352+
mysql.execute(f"UPDATE {TEST_TABLE_NAME} SET age=88 WHERE name='Ivan'", commit=True)
353+
assert_wait(lambda: ch.select(TEST_TABLE_NAME, "name='Ivan'")[0]['age'] == 88)
354+
355+
mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('Vlad', 99);", commit=True)
356+
357+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 4)
358+
359+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)
349360

350361
run_all_runner.stop()
351362

tests_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ binlog_replicator:
1717

1818
databases: '*test*'
1919
log_level: 'debug'
20+
optimize_interval: 3

0 commit comments

Comments
 (0)