Skip to content

Commit ab4cfcd

Browse files
authored
Http API to restart replication (#64)
1 parent a33d29b commit ab4cfcd

File tree

8 files changed

+685
-134
lines changed

8 files changed

+685
-134
lines changed

mysql_ch_replicator/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ def __init__(self):
108108
self.check_db_updated_interval = 0
109109
self.indexes: list[Index] = []
110110
self.auto_restart_interval = 0
111+
self.http_host = ''
112+
self.http_port = 0
111113

112114
def load(self, settings_file):
113115
data = open(settings_file, 'r').read()
@@ -128,6 +130,9 @@ def load(self, settings_file):
128130
self.auto_restart_interval = data.pop(
129131
'auto_restart_interval', Settings.DEFAULT_AUTO_RESTART_INTERVAL,
130132
)
133+
self.http_host = data.pop('http_host', '')
134+
self.http_port = data.pop('http_port', 0)
135+
131136
indexes = data.pop('indexes', [])
132137
for index in indexes:
133138
self.indexes.append(

mysql_ch_replicator/runner.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import os
22
import time
33
import sys
4+
import threading
5+
from uvicorn import Config, Server
6+
from fastapi import APIRouter, FastAPI
47

58
from logging import getLogger
69

@@ -35,6 +38,10 @@ def __init__(self, db_name, config_file):
3538
super().__init__(f'{sys.argv[0]} --config {config_file} run_all --db {db_name}')
3639

3740

41+
app = FastAPI()
42+
43+
44+
3845
class Runner:
3946
def __init__(self, config: Settings, wait_initial_replication: bool, databases: str):
4047
self.config = config
@@ -43,6 +50,32 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
4350
self.runners: dict[str: DbReplicatorRunner] = {}
4451
self.binlog_runner = None
4552
self.db_optimizer = None
53+
self.http_server = None
54+
self.router = None
55+
self.need_restart_replication = False
56+
self.replication_restarted = False
57+
58+
def run_server(self):
59+
if not self.config.http_host or not self.config.http_port:
60+
logger.info('http server disabled')
61+
return
62+
logger.info('starting http server')
63+
64+
config = Config(app=app, host=self.config.http_host, port=self.config.http_port)
65+
self.router = APIRouter()
66+
self.router.add_api_route("/restart_replication", self.restart_replication, methods=["GET"])
67+
app.include_router(self.router)
68+
69+
self.http_server = Server(config)
70+
self.http_server.run()
71+
72+
def restart_replication(self):
73+
self.replication_restarted = False
74+
self.need_restart_replication = True
75+
while not self.replication_restarted:
76+
logger.info('waiting replication restarted..')
77+
time.sleep(1)
78+
return {"restarted": True}
4679

4780
def is_initial_replication_finished(self, db_name):
4881
state_path = os.path.join(
@@ -61,6 +94,23 @@ def restart_dead_processes(self):
6194
if self.db_optimizer is not None:
6295
self.db_optimizer.restart_dead_process_if_required()
6396

97+
def restart_replication_if_required(self):
98+
if not self.need_restart_replication:
99+
return
100+
logger.info('\n\n\n ====== restarting replication =====')
101+
for db_name, runner in self.runners.items():
102+
logger.info(f'stopping runner {db_name}')
103+
runner.stop()
104+
path = os.path.join(self.config.binlog_replicator.data_dir, db_name, 'state.pckl')
105+
if os.path.exists(path):
106+
logger.debug(f'removing {path}')
107+
os.remove(path)
108+
109+
logger.info('starting replication')
110+
self.restart_dead_processes()
111+
self.need_restart_replication = False
112+
self.replication_restarted = True
113+
64114
def check_databases_updated(self, mysql_api: MySQLApi):
65115
logger.debug('check if databases were created / removed in mysql')
66116
databases = mysql_api.get_databases()
@@ -96,6 +146,9 @@ def run(self):
96146
self.db_optimizer = DbOptimizerRunner(self.config.settings_file)
97147
self.db_optimizer.run()
98148

149+
server_thread = threading.Thread(target=self.run_server, daemon=True)
150+
server_thread.start()
151+
99152
# First - continue replication for DBs that already finished initial replication
100153
for db in databases:
101154
if not self.is_initial_replication_finished(db_name=db):
@@ -124,6 +177,7 @@ def run(self):
124177
last_check_db_updated = time.time()
125178
while not killer.kill_now:
126179
time.sleep(1)
180+
self.restart_replication_if_required()
127181
self.restart_dead_processes()
128182
if time.time() - last_check_db_updated > self.config.check_db_updated_interval:
129183
self.check_databases_updated(mysql_api=mysql_api)
@@ -143,4 +197,9 @@ def run(self):
143197
logger.info(f'stopping replication for {db_name}')
144198
db_replication_runner.stop()
145199

200+
if self.http_server:
201+
self.http_server.should_exit = True
202+
203+
server_thread.join()
204+
146205
logger.info('stopped')

mysql_ch_replicator/utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ def run(self):
4141
self.process = subprocess.Popen(cmd)
4242

4343
def restart_dead_process_if_required(self):
44+
if self.process is None:
45+
logger.warning(f'Restarting stopped process: < {self.cmd} >')
46+
self.run()
47+
return
4448
res = self.process.poll()
4549
if res is None:
4650
# still running

0 commit comments

Comments
 (0)