Skip to content

Commit e93d777

Browse files
authored
Prevent binlog removal during initial replication (#5)
1 parent 8bedc8e commit e93d777

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

mysql_ch_replicator/db_replicator.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from .converter import MysqlToClickhouseConverter, strip_sql_name, strip_sql_comments
1414
from .table_structure import TableStructure
1515
from .binlog_replicator import DataReader, LogEvent, EventType
16-
from .utils import GracefulKiller
16+
from .utils import GracefulKiller, touch_all_files
1717

1818

1919
logger = getLogger(__name__)
@@ -89,6 +89,7 @@ class DbReplicator:
8989
INITIAL_REPLICATION_BATCH_SIZE = 50000
9090
SAVE_STATE_INTERVAL = 10
9191
STATS_DUMP_INTERVAL = 60
92+
BINLOG_TOUCH_INTERVAL = 120
9293

9394
DATA_DUMP_INTERVAL = 1
9495
DATA_DUMP_BATCH_SIZE = 10000
@@ -120,6 +121,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
120121
self.records_to_insert = defaultdict(dict) # table_name => {record_id=>record, ...}
121122
self.records_to_delete = defaultdict(set) # table_name => {record_id, ...}
122123
self.last_records_upload_time = 0
124+
self.last_touch_time = 0
123125

124126
def run(self):
125127
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
@@ -156,6 +158,16 @@ def create_initial_structure_table(self, table_name):
156158
self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
157159
self.clickhouse_api.create_table(clickhouse_structure)
158160

161+
def prevent_binlog_removal(self):
162+
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
163+
return
164+
binlog_directory = os.path.join(self.config.binlog_replicator.data_dir, self.database)
165+
logger.info(f'touch binlog {binlog_directory}')
166+
if not os.path.exists(binlog_directory):
167+
return
168+
self.last_touch_time = time.time()
169+
touch_all_files(binlog_directory)
170+
159171
def perform_initial_replication(self):
160172
self.clickhouse_api.database = self.target_database_tmp
161173
logger.info('running initial replication')
@@ -236,6 +248,7 @@ def perform_initial_replication_table(self, table_name):
236248

237249
self.state.initial_replication_max_primary_key = max_primary_key
238250
self.save_state_if_required()
251+
self.prevent_binlog_removal()
239252

240253
def run_realtime_replication(self):
241254
if self.initial_only:

mysql_ch_replicator/utils.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import signal
22
import subprocess
3+
import os
4+
import time
35

6+
from pathlib import Path
47
from logging import getLogger
58

69

@@ -44,4 +47,24 @@ def wait_complete(self):
4447
self.process = None
4548

4649
def __del__(self):
47-
self.stop()
50+
self.stop()
51+
52+
53+
def touch_all_files(directory_path):
54+
dir_path = Path(directory_path)
55+
56+
if not dir_path.exists():
57+
raise FileNotFoundError(f"The directory '{directory_path}' does not exist.")
58+
59+
if not dir_path.is_dir():
60+
raise NotADirectoryError(f"The path '{directory_path}' is not a directory.")
61+
62+
current_time = time.time()
63+
64+
for item in dir_path.iterdir():
65+
if item.is_file():
66+
try:
67+
# Update the modification and access times
68+
os.utime(item, times=(current_time, current_time))
69+
except Exception as e:
70+
logger.warning(f"Failed to touch {item}: {e}")

0 commit comments

Comments
 (0)