Skip to content

Commit 324a78b

Browse files
authored
Auto-create a new DB in clickhouse when it created in MySQL (#37)
1 parent cec65d6 commit 324a78b

File tree

5 files changed

+41
-4
lines changed

5 files changed

+41
-4
lines changed

mysql_ch_replicator/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def validate(self):
8383
class Settings:
8484
DEFAULT_LOG_LEVEL = 'info'
8585
DEFAULT_OPTIMIZE_INTERVAL = 86400
86+
DEFAULT_CHECK_DB_UPDATED_INTERVAL = 120
8687

8788
def __init__(self):
8889
self.mysql = MysqlSettings()
@@ -96,6 +97,7 @@ def __init__(self):
9697
self.log_level = 'info'
9798
self.debug_log_level = False
9899
self.optimize_interval = 0
100+
self.check_db_updated_interval = 0
99101

100102
def load(self, settings_file):
101103
data = open(settings_file, 'r').read()
@@ -110,6 +112,9 @@ def load(self, settings_file):
110112
self.exclude_tables = data.pop('exclude_tables', '')
111113
self.log_level = data.pop('log_level', Settings.DEFAULT_LOG_LEVEL)
112114
self.optimize_interval = data.pop('optimize_interval', Settings.DEFAULT_OPTIMIZE_INTERVAL)
115+
self.check_db_updated_interval = data.pop(
116+
'check_db_updated_interval', Settings.DEFAULT_CHECK_DB_UPDATED_INTERVAL,
117+
)
113118
assert isinstance(self.databases, str) or isinstance(self.databases, list)
114119
assert isinstance(self.tables, str) or isinstance(self.tables, list)
115120
self.binlog_replicator = BinlogReplicatorSettings(**data.pop('binlog_replicator'))

mysql_ch_replicator/mysql_api.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ def __init__(self, database: str, mysql_settings: MysqlSettings):
1717
def close(self):
1818
self.db.close()
1919

20-
def reconnect_if_required(self):
20+
def reconnect_if_required(self, force=False):
2121
curr_time = time.time()
22-
if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL:
22+
if curr_time - self.last_connect_time < MySQLApi.RECONNECT_INTERVAL and not force:
2323
return
2424
conn_settings = dict(
2525
host=self.mysql_settings.host,
@@ -59,7 +59,7 @@ def set_database(self, database):
5959
self.cursor.execute(f'USE {self.database}')
6060

6161
def get_databases(self):
62-
self.reconnect_if_required()
62+
self.reconnect_if_required(True) # New database appear only after new connection
6363
self.cursor.execute('SHOW DATABASES')
6464
res = self.cursor.fetchall()
6565
tables = [x[0] for x in res]

mysql_ch_replicator/runner.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
4040
self.config = config
4141
self.databases = databases or config.databases
4242
self.wait_initial_replication = wait_initial_replication
43-
self.runners: dict = {}
43+
self.runners: dict[str: DbReplicatorRunner] = {}
4444
self.binlog_runner = None
4545
self.db_optimizer = None
4646

@@ -61,6 +61,26 @@ def restart_dead_processes(self):
6161
if self.db_optimizer is not None:
6262
self.db_optimizer.restart_dead_process_if_required()
6363

64+
def check_databases_updated(self, mysql_api: MySQLApi):
65+
logger.debug('check if databases were created / removed in mysql')
66+
databases = mysql_api.get_databases()
67+
logger.info(f'mysql databases: {databases}')
68+
databases = [db for db in databases if self.config.is_database_matches(db)]
69+
logger.info(f'mysql databases filtered: {databases}')
70+
for db in databases:
71+
if db in self.runners:
72+
continue
73+
logger.info(f'running replication for {db} (database created in mysql)')
74+
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)
75+
runner.run()
76+
77+
for db in self.runners.keys():
78+
if db in databases:
79+
continue
80+
logger.info(f'stop replication for {db} (database removed from mysql)')
81+
self.runners[db].stop()
82+
self.runners.pop(db)
83+
6484
def run(self):
6585
mysql_api = MySQLApi(
6686
database=None, mysql_settings=self.config.mysql,
@@ -101,9 +121,13 @@ def run(self):
101121

102122
logger.info('all replicators launched')
103123

124+
last_check_db_updated = time.time()
104125
while not killer.kill_now:
105126
time.sleep(1)
106127
self.restart_dead_processes()
128+
if time.time() - last_check_db_updated > self.config.check_db_updated_interval:
129+
self.check_databases_updated(mysql_api=mysql_api)
130+
last_check_db_updated = time.time()
107131

108132
logger.info('stopping runner')
109133

test_mysql_ch_replicator.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
CONFIG_FILE = 'tests_config.yaml'
1818
CONFIG_FILE_MARIADB = 'tests_config_mariadb.yaml'
1919
TEST_DB_NAME = 'replication_test_db'
20+
TEST_DB_NAME_2 = 'replication_test_db_2'
2021
TEST_TABLE_NAME = 'test_table'
2122
TEST_TABLE_NAME_2 = 'test_table_2'
2223
TEST_TABLE_NAME_3 = 'test_table_3'
@@ -300,6 +301,9 @@ def test_runner():
300301
clickhouse_settings=cfg.clickhouse,
301302
)
302303

304+
mysql.drop_database(TEST_DB_NAME_2)
305+
ch.drop_database(TEST_DB_NAME_2)
306+
303307
prepare_env(cfg, mysql, ch)
304308

305309
mysql.execute(f'''
@@ -358,6 +362,9 @@ def test_runner():
358362

359363
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, final=False)) == 4)
360364

365+
mysql.create_database(TEST_DB_NAME_2)
366+
assert_wait(lambda: TEST_DB_NAME_2 in ch.get_databases(), max_wait_time=5)
367+
361368
run_all_runner.stop()
362369

363370

tests_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ binlog_replicator:
1818
databases: '*test*'
1919
log_level: 'debug'
2020
optimize_interval: 3
21+
check_db_updated_interval: 3

0 commit comments

Comments
 (0)