Skip to content

Commit 687df69

Browse files
committed
db_replicator refactor, splitted into several classes
1 parent 6a6f443 commit 687df69

File tree

2 files changed

+282
-0
lines changed

2 files changed

+282
-0
lines changed

mysql_ch_replicator/common.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from enum import Enum
2+
3+
class Status(Enum):
4+
NONE = 0
5+
CREATING_INITIAL_STRUCTURES = 1
6+
PERFORMING_INITIAL_REPLICATION = 2
7+
RUNNING_REALTIME_REPLICATION = 3
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
import json
2+
import os.path
3+
import hashlib
4+
import time
5+
import sys
6+
import subprocess
7+
from logging import getLogger
8+
from enum import Enum
9+
10+
from .config import Settings
11+
from .mysql_api import MySQLApi
12+
from .clickhouse_api import ClickhouseApi
13+
from .converter import MysqlToClickhouseConverter
14+
from .table_structure import TableStructure
15+
from .utils import touch_all_files
16+
from .common import Status
17+
18+
logger = getLogger(__name__)
19+
20+
class DbReplicatorInitial:
21+
22+
INITIAL_REPLICATION_BATCH_SIZE = 50000
23+
SAVE_STATE_INTERVAL = 10
24+
BINLOG_TOUCH_INTERVAL = 120
25+
26+
def __init__(self, replicator):
27+
self.replicator = replicator
28+
self.last_touch_time = 0
29+
self.last_save_state_time = 0
30+
31+
def create_initial_structure(self):
32+
self.replicator.state.status = Status.CREATING_INITIAL_STRUCTURES
33+
for table in self.replicator.state.tables:
34+
self.create_initial_structure_table(table)
35+
self.replicator.state.save()
36+
37+
def create_initial_structure_table(self, table_name):
38+
if not self.replicator.config.is_table_matches(table_name):
39+
return
40+
41+
if self.replicator.single_table and self.replicator.single_table != table_name:
42+
return
43+
44+
mysql_create_statement = self.replicator.mysql_api.get_table_create_statement(table_name)
45+
mysql_structure = self.replicator.converter.parse_mysql_table_structure(
46+
mysql_create_statement, required_table_name=table_name,
47+
)
48+
self.validate_mysql_structure(mysql_structure)
49+
clickhouse_structure = self.replicator.converter.convert_table_structure(mysql_structure)
50+
51+
# Always set if_not_exists to True to prevent errors when tables already exist
52+
clickhouse_structure.if_not_exists = True
53+
54+
self.replicator.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
55+
indexes = self.replicator.config.get_indexes(self.replicator.database, table_name)
56+
57+
if not self.replicator.is_parallel_worker:
58+
self.replicator.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
59+
60+
def validate_mysql_structure(self, mysql_structure: TableStructure):
61+
for key_idx in mysql_structure.primary_key_ids:
62+
primary_field = mysql_structure.fields[key_idx]
63+
if 'not null' not in primary_field.parameters.lower():
64+
logger.warning('primary key validation failed')
65+
logger.warning(
66+
f'\n\n\n !!! WARNING - PRIMARY KEY NULLABLE (field "{primary_field.name}", table "{mysql_structure.table_name}") !!!\n\n'
67+
'There could be errors replicating nullable primary key\n'
68+
'Please ensure all tables has NOT NULL parameter for primary key\n'
69+
'Or mark tables as skipped, see "exclude_tables" option\n\n\n'
70+
)
71+
72+
def prevent_binlog_removal(self):
73+
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
74+
return
75+
binlog_directory = os.path.join(self.replicator.config.binlog_replicator.data_dir, self.replicator.database)
76+
logger.info(f'touch binlog {binlog_directory}')
77+
if not os.path.exists(binlog_directory):
78+
return
79+
self.last_touch_time = time.time()
80+
touch_all_files(binlog_directory)
81+
82+
def save_state_if_required(self, force=False):
83+
curr_time = time.time()
84+
if curr_time - self.last_save_state_time < self.SAVE_STATE_INTERVAL and not force:
85+
return
86+
self.last_save_state_time = curr_time
87+
self.replicator.state.tables_last_record_version = self.replicator.clickhouse_api.tables_last_record_version
88+
self.replicator.state.save()
89+
90+
def perform_initial_replication(self):
91+
self.replicator.clickhouse_api.database = self.replicator.target_database_tmp
92+
logger.info('running initial replication')
93+
self.replicator.state.status = Status.PERFORMING_INITIAL_REPLICATION
94+
self.replicator.state.save()
95+
start_table = self.replicator.state.initial_replication_table
96+
for table in self.replicator.state.tables:
97+
if start_table and table != start_table:
98+
continue
99+
if self.replicator.single_table and self.replicator.single_table != table:
100+
continue
101+
self.perform_initial_replication_table(table)
102+
start_table = None
103+
104+
if not self.replicator.is_parallel_worker:
105+
logger.info(f'initial replication - swapping database')
106+
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
107+
self.replicator.clickhouse_api.execute_command(
108+
f'RENAME DATABASE `{self.replicator.target_database}` TO `{self.replicator.target_database}_old`',
109+
)
110+
self.replicator.clickhouse_api.execute_command(
111+
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
112+
)
113+
self.replicator.clickhouse_api.drop_database(f'{self.replicator.target_database}_old')
114+
else:
115+
self.replicator.clickhouse_api.execute_command(
116+
f'RENAME DATABASE `{self.replicator.target_database_tmp}` TO `{self.replicator.target_database}`',
117+
)
118+
self.replicator.clickhouse_api.database = self.replicator.target_database
119+
logger.info(f'initial replication - done')
120+
121+
def perform_initial_replication_table(self, table_name):
122+
logger.info(f'running initial replication for table {table_name}')
123+
124+
if not self.replicator.config.is_table_matches(table_name):
125+
logger.info(f'skip table {table_name} - not matching any allowed table')
126+
return
127+
128+
if not self.replicator.is_parallel_worker and self.replicator.config.initial_replication_threads > 1:
129+
self.replicator.state.initial_replication_table = table_name
130+
self.replicator.state.initial_replication_max_primary_key = None
131+
self.replicator.state.save()
132+
self.perform_initial_replication_table_parallel(table_name)
133+
return
134+
135+
max_primary_key = None
136+
if self.replicator.state.initial_replication_table == table_name:
137+
# continue replication from saved position
138+
max_primary_key = self.replicator.state.initial_replication_max_primary_key
139+
logger.info(f'continue from primary key {max_primary_key}')
140+
else:
141+
# starting replication from zero
142+
logger.info(f'replicating from scratch')
143+
self.replicator.state.initial_replication_table = table_name
144+
self.replicator.state.initial_replication_max_primary_key = None
145+
self.replicator.state.save()
146+
147+
mysql_table_structure, clickhouse_table_structure = self.replicator.state.tables_structure[table_name]
148+
149+
logger.debug(f'mysql table structure: {mysql_table_structure}')
150+
logger.debug(f'clickhouse table structure: {clickhouse_table_structure}')
151+
152+
field_types = [field.field_type for field in clickhouse_table_structure.fields]
153+
154+
primary_keys = clickhouse_table_structure.primary_keys
155+
primary_key_ids = clickhouse_table_structure.primary_key_ids
156+
primary_key_types = [field_types[key_idx] for key_idx in primary_key_ids]
157+
158+
stats_number_of_records = 0
159+
last_stats_dump_time = time.time()
160+
161+
while True:
162+
163+
query_start_values = max_primary_key
164+
if query_start_values is not None:
165+
for i in range(len(query_start_values)):
166+
key_type = primary_key_types[i]
167+
value = query_start_values[i]
168+
if 'int' not in key_type.lower():
169+
value = f"'{value}'"
170+
query_start_values[i] = value
171+
172+
records = self.replicator.mysql_api.get_records(
173+
table_name=table_name,
174+
order_by=primary_keys,
175+
limit=self.INITIAL_REPLICATION_BATCH_SIZE,
176+
start_value=query_start_values,
177+
worker_id=self.replicator.worker_id,
178+
total_workers=self.replicator.total_workers,
179+
)
180+
logger.debug(f'extracted {len(records)} records from mysql')
181+
182+
records = self.replicator.converter.convert_records(records, mysql_table_structure, clickhouse_table_structure)
183+
184+
if self.replicator.config.debug_log_level:
185+
logger.debug(f'records: {records}')
186+
187+
if not records:
188+
break
189+
self.replicator.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
190+
for record in records:
191+
record_primary_key = [record[key_idx] for key_idx in primary_key_ids]
192+
if max_primary_key is None:
193+
max_primary_key = record_primary_key
194+
else:
195+
max_primary_key = max(max_primary_key, record_primary_key)
196+
197+
self.replicator.state.initial_replication_max_primary_key = max_primary_key
198+
self.save_state_if_required()
199+
self.prevent_binlog_removal()
200+
201+
stats_number_of_records += len(records)
202+
curr_time = time.time()
203+
if curr_time - last_stats_dump_time >= 60.0:
204+
last_stats_dump_time = curr_time
205+
logger.info(
206+
f'replicating {table_name}, '
207+
f'replicated {stats_number_of_records} records, '
208+
f'primary key: {max_primary_key}',
209+
)
210+
211+
logger.info(
212+
f'finish replicating {table_name}, '
213+
f'replicated {stats_number_of_records} records, '
214+
f'primary key: {max_primary_key}',
215+
)
216+
217+
def perform_initial_replication_table_parallel(self, table_name):
218+
"""
219+
Execute initial replication for a table using multiple parallel worker processes.
220+
Each worker will handle a portion of the table based on its worker_id and total_workers.
221+
"""
222+
logger.info(f"Starting parallel replication for table {table_name} with {self.replicator.config.initial_replication_threads} workers")
223+
224+
# Create and launch worker processes
225+
processes = []
226+
for worker_id in range(self.replicator.config.initial_replication_threads):
227+
# Prepare command to launch a worker process
228+
cmd = [
229+
sys.executable, "-m", "mysql_ch_replicator.main",
230+
"db_replicator", # Required positional mode argument
231+
"--config", self.replicator.settings_file,
232+
"--db", self.replicator.database,
233+
"--worker_id", str(worker_id),
234+
"--total_workers", str(self.replicator.config.initial_replication_threads),
235+
"--table", table_name,
236+
"--target_db", self.replicator.target_database_tmp,
237+
"--initial_only=True",
238+
]
239+
240+
logger.info(f"Launching worker {worker_id}: {' '.join(cmd)}")
241+
process = subprocess.Popen(cmd)
242+
processes.append(process)
243+
244+
# Wait for all worker processes to complete
245+
logger.info(f"Waiting for {len(processes)} workers to complete replication of {table_name}")
246+
247+
try:
248+
while processes:
249+
for i, process in enumerate(processes[:]):
250+
# Check if process is still running
251+
if process.poll() is not None:
252+
exit_code = process.returncode
253+
if exit_code == 0:
254+
logger.info(f"Worker process {i} completed successfully")
255+
else:
256+
logger.error(f"Worker process {i} failed with exit code {exit_code}")
257+
# Optional: can raise an exception here to abort the entire operation
258+
raise Exception(f"Worker process failed with exit code {exit_code}")
259+
260+
processes.remove(process)
261+
262+
if processes:
263+
# Wait a bit before checking again
264+
time.sleep(0.1)
265+
266+
# Every 30 seconds, log progress
267+
if int(time.time()) % 30 == 0:
268+
logger.info(f"Still waiting for {len(processes)} workers to complete")
269+
except KeyboardInterrupt:
270+
logger.warning("Received interrupt, terminating worker processes")
271+
for process in processes:
272+
process.terminate()
273+
raise
274+
275+
logger.info(f"All workers completed replication of table {table_name}")

0 commit comments

Comments
 (0)