Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .converter import MysqlToClickhouseConverter, strip_sql_name, strip_sql_comments
from .table_structure import TableStructure
from .binlog_replicator import DataReader, LogEvent, EventType
from .utils import GracefulKiller
from .utils import GracefulKiller, touch_all_files


logger = getLogger(__name__)
Expand Down Expand Up @@ -89,6 +89,7 @@ class DbReplicator:
INITIAL_REPLICATION_BATCH_SIZE = 50000
SAVE_STATE_INTERVAL = 10
STATS_DUMP_INTERVAL = 60
BINLOG_TOUCH_INTERVAL = 120

DATA_DUMP_INTERVAL = 1
DATA_DUMP_BATCH_SIZE = 10000
Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
self.records_to_insert = defaultdict(dict) # table_name => {record_id=>record, ...}
self.records_to_delete = defaultdict(set) # table_name => {record_id, ...}
self.last_records_upload_time = 0
self.last_touch_time = 0

def run(self):
if self.state.status == Status.RUNNING_REALTIME_REPLICATION:
Expand Down Expand Up @@ -156,6 +158,16 @@ def create_initial_structure_table(self, table_name):
self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
self.clickhouse_api.create_table(clickhouse_structure)

def prevent_binlog_removal(self):
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
return
binlog_directory = os.path.join(self.config.binlog_replicator.data_dir, self.database)
logger.info(f'touch binlog {binlog_directory}')
if not os.path.exists(binlog_directory):
return
self.last_touch_time = time.time()
touch_all_files(binlog_directory)

def perform_initial_replication(self):
self.clickhouse_api.database = self.target_database_tmp
logger.info('running initial replication')
Expand Down Expand Up @@ -236,6 +248,7 @@ def perform_initial_replication_table(self, table_name):

self.state.initial_replication_max_primary_key = max_primary_key
self.save_state_if_required()
self.prevent_binlog_removal()

def run_realtime_replication(self):
if self.initial_only:
Expand Down
25 changes: 24 additions & 1 deletion mysql_ch_replicator/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import signal
import subprocess
import os
import time

from pathlib import Path
from logging import getLogger


Expand Down Expand Up @@ -44,4 +47,24 @@ def wait_complete(self):
self.process = None

def __del__(self):
self.stop()
self.stop()


def touch_all_files(directory_path):
dir_path = Path(directory_path)

if not dir_path.exists():
raise FileNotFoundError(f"The directory '{directory_path}' does not exist.")

if not dir_path.is_dir():
raise NotADirectoryError(f"The path '{directory_path}' is not a directory.")

current_time = time.time()

for item in dir_path.iterdir():
if item.is_file():
try:
# Update the modification and access times
os.utime(item, times=(current_time, current_time))
except Exception as e:
logger.warning(f"Failed to touch {item}: {e}")
Loading