Skip to content

Commit 2952e64

Browse files
committed
Parallel initial replication
1 parent 9afe62e commit 2952e64

File tree

7 files changed

+293
-32
lines changed

7 files changed

+293
-32
lines changed

mysql_ch_replicator/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def __init__(self):
119119
self.http_port = 0
120120
self.types_mapping = {}
121121
self.target_databases = {}
122+
self.initial_replication_threads = 0
122123

123124
def load(self, settings_file):
124125
data = open(settings_file, 'r').read()
@@ -143,6 +144,7 @@ def load(self, settings_file):
143144
self.http_host = data.pop('http_host', '')
144145
self.http_port = data.pop('http_port', 0)
145146
self.target_databases = data.pop('target_databases', {})
147+
self.initial_replication_threads = data.pop('initial_replication_threads', 0)
146148

147149
indexes = data.pop('indexes', [])
148150
for index in indexes:
@@ -202,3 +204,7 @@ def validate(self):
202204
self.validate_log_level()
203205
if not isinstance(self.target_databases, dict):
204206
raise ValueError(f'wrong target databases {self.target_databases}')
207+
if not isinstance(self.initial_replication_threads, int):
208+
raise ValueError(f'initial_replication_threads should be an integer, not {type(self.initial_replication_threads)}')
209+
if self.initial_replication_threads < 0:
210+
raise ValueError(f'initial_replication_threads should be non-negative')

mysql_ch_replicator/db_replicator.py

Lines changed: 130 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import json
22
import os.path
3+
import random
34
import time
45
import pickle
56
from logging import getLogger
67
from enum import Enum
78
from dataclasses import dataclass
89
from collections import defaultdict
10+
import sys
11+
import subprocess
12+
import select
913

1014
from .config import Settings, MysqlSettings, ClickhouseSettings
1115
from .mysql_api import MySQLApi
@@ -106,10 +110,15 @@ class DbReplicator:
106110

107111
READ_LOG_INTERVAL = 0.3
108112

109-
def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False):
113+
def __init__(self, config: Settings, database: str, target_database: str = None, initial_only: bool = False,
114+
worker_id: int = None, total_workers: int = None, table: str = None):
110115
self.config = config
111116
self.database = database
112-
117+
self.worker_id = worker_id
118+
self.total_workers = total_workers
119+
self.settings_file = config.settings_file
120+
self.single_table = table # Store the single table to process
121+
113122
# use same as source database by default
114123
self.target_database = database
115124

@@ -122,9 +131,29 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
122131
if target_database:
123132
self.target_database = target_database
124133

125-
self.target_database_tmp = self.target_database + '_tmp'
126134
self.initial_only = initial_only
127135

136+
# Handle state file differently for parallel workers
137+
if self.worker_id is not None and self.total_workers is not None:
138+
# For worker processes in parallel mode, use a different state file
139+
self.is_parallel_worker = True
140+
self.state_path = os.path.join(
141+
self.config.binlog_replicator.data_dir,
142+
self.database,
143+
f'state_worker_{self.worker_id}_{random.randint(0,9999999999)}.pckl'
144+
)
145+
logger.info(f"Worker {self.worker_id}/{self.total_workers} using state file: {self.state_path}")
146+
147+
if self.single_table:
148+
logger.info(f"Worker {self.worker_id} focusing only on table: {self.single_table}")
149+
else:
150+
self.state_path = os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl')
151+
self.is_parallel_worker = False
152+
153+
self.target_database_tmp = self.target_database + '_tmp'
154+
if self.is_parallel_worker:
155+
self.target_database_tmp = self.target_database
156+
128157
self.mysql_api = MySQLApi(
129158
database=self.database,
130159
mysql_settings=config.mysql,
@@ -148,7 +177,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
148177
self.start_time = time.time()
149178

150179
def create_state(self):
151-
return State(os.path.join(self.config.binlog_replicator.data_dir, self.database, 'state.pckl'))
180+
return State(self.state_path)
152181

153182
def validate_database_settings(self):
154183
if not self.initial_only:
@@ -196,7 +225,9 @@ def run(self):
196225

197226
logger.info('recreating database')
198227
self.clickhouse_api.database = self.target_database_tmp
199-
self.clickhouse_api.recreate_database()
228+
if not self.is_parallel_worker:
229+
self.clickhouse_api.recreate_database()
230+
200231
self.state.tables = self.mysql_api.get_tables()
201232
self.state.tables = [
202233
table for table in self.state.tables if self.config.is_table_matches(table)
@@ -220,6 +251,10 @@ def create_initial_structure(self):
220251
def create_initial_structure_table(self, table_name):
221252
if not self.config.is_table_matches(table_name):
222253
return
254+
255+
if self.single_table and self.single_table != table_name:
256+
return
257+
223258
mysql_create_statement = self.mysql_api.get_table_create_statement(table_name)
224259
mysql_structure = self.converter.parse_mysql_table_structure(
225260
mysql_create_statement, required_table_name=table_name,
@@ -232,7 +267,9 @@ def create_initial_structure_table(self, table_name):
232267

233268
self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
234269
indexes = self.config.get_indexes(self.database, table_name)
235-
self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
270+
271+
if not self.is_parallel_worker:
272+
self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
236273

237274
def prevent_binlog_removal(self):
238275
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
@@ -253,22 +290,26 @@ def perform_initial_replication(self):
253290
for table in self.state.tables:
254291
if start_table and table != start_table:
255292
continue
293+
if self.single_table and self.single_table != table:
294+
continue
256295
self.perform_initial_replication_table(table)
257296
start_table = None
258-
logger.info(f'initial replication - swapping database')
259-
if self.target_database in self.clickhouse_api.get_databases():
260-
self.clickhouse_api.execute_command(
261-
f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`',
262-
)
263-
self.clickhouse_api.execute_command(
264-
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
265-
)
266-
self.clickhouse_api.drop_database(f'{self.target_database}_old')
267-
else:
268-
self.clickhouse_api.execute_command(
269-
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
270-
)
271-
self.clickhouse_api.database = self.target_database
297+
298+
if not self.is_parallel_worker:
299+
logger.info(f'initial replication - swapping database')
300+
if self.target_database in self.clickhouse_api.get_databases():
301+
self.clickhouse_api.execute_command(
302+
f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`',
303+
)
304+
self.clickhouse_api.execute_command(
305+
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
306+
)
307+
self.clickhouse_api.drop_database(f'{self.target_database}_old')
308+
else:
309+
self.clickhouse_api.execute_command(
310+
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
311+
)
312+
self.clickhouse_api.database = self.target_database
272313
logger.info(f'initial replication - done')
273314

274315
def perform_initial_replication_table(self, table_name):
@@ -278,6 +319,13 @@ def perform_initial_replication_table(self, table_name):
278319
logger.info(f'skip table {table_name} - not matching any allowed table')
279320
return
280321

322+
if not self.is_parallel_worker and self.config.initial_replication_threads > 1:
323+
self.state.initial_replication_table = table_name
324+
self.state.initial_replication_max_primary_key = None
325+
self.state.save()
326+
self.perform_initial_replication_table_parallel(table_name)
327+
return
328+
281329
max_primary_key = None
282330
if self.state.initial_replication_table == table_name:
283331
# continue replication from saved position
@@ -322,6 +370,8 @@ def perform_initial_replication_table(self, table_name):
322370
order_by=primary_keys,
323371
limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE,
324372
start_value=query_start_values,
373+
worker_id=self.worker_id,
374+
total_workers=self.total_workers,
325375
)
326376
logger.debug(f'extracted {len(records)} records from mysql')
327377

@@ -360,6 +410,66 @@ def perform_initial_replication_table(self, table_name):
360410
f'primary key: {max_primary_key}',
361411
)
362412

413+
def perform_initial_replication_table_parallel(self, table_name):
414+
"""
415+
Execute initial replication for a table using multiple parallel worker processes.
416+
Each worker will handle a portion of the table based on its worker_id and total_workers.
417+
"""
418+
logger.info(f"Starting parallel replication for table {table_name} with {self.config.initial_replication_threads} workers")
419+
420+
# Create and launch worker processes
421+
processes = []
422+
for worker_id in range(self.config.initial_replication_threads):
423+
# Prepare command to launch a worker process
424+
cmd = [
425+
sys.executable, "-m", "mysql_ch_replicator.main",
426+
"db_replicator", # Required positional mode argument
427+
"--config", self.settings_file,
428+
"--db", self.database,
429+
"--worker_id", str(worker_id),
430+
"--total_workers", str(self.config.initial_replication_threads),
431+
"--table", table_name,
432+
"--target_db", self.target_database_tmp,
433+
"--initial_only=True",
434+
]
435+
436+
logger.info(f"Launching worker {worker_id}: {' '.join(cmd)}")
437+
process = subprocess.Popen(cmd)
438+
processes.append(process)
439+
440+
# Wait for all worker processes to complete
441+
logger.info(f"Waiting for {len(processes)} workers to complete replication of {table_name}")
442+
443+
try:
444+
while processes:
445+
for i, process in enumerate(processes[:]):
446+
# Check if process is still running
447+
if process.poll() is not None:
448+
exit_code = process.returncode
449+
if exit_code == 0:
450+
logger.info(f"Worker process {i} completed successfully")
451+
else:
452+
logger.error(f"Worker process {i} failed with exit code {exit_code}")
453+
# Optional: can raise an exception here to abort the entire operation
454+
raise Exception(f"Worker process failed with exit code {exit_code}")
455+
456+
processes.remove(process)
457+
458+
if processes:
459+
# Wait a bit before checking again
460+
time.sleep(0.1)
461+
462+
# Every 30 seconds, log progress
463+
if int(time.time()) % 30 == 0:
464+
logger.info(f"Still waiting for {len(processes)} workers to complete")
465+
except KeyboardInterrupt:
466+
logger.warning("Received interrupt, terminating worker processes")
467+
for process in processes:
468+
process.terminate()
469+
raise
470+
471+
logger.info(f"All workers completed replication of table {table_name}")
472+
363473
def run_realtime_replication(self):
364474
if self.initial_only:
365475
logger.info('skip running realtime replication, only initial replication was requested')

mysql_ch_replicator/main.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,28 @@ def run_db_replicator(args, config: Settings):
8787
'db_replicator.log',
8888
)
8989

90-
set_logging_config(f'dbrepl {args.db}', log_file=log_file, log_level_str=config.log_level)
90+
# Set log tag according to whether this is a worker or main process
91+
if args.worker_id is not None:
92+
if args.table:
93+
log_tag = f'dbrepl {db_name} worker_{args.worker_id} table_{args.table}'
94+
else:
95+
log_tag = f'dbrepl {db_name} worker_{args.worker_id}'
96+
else:
97+
log_tag = f'dbrepl {db_name}'
98+
99+
set_logging_config(log_tag, log_file=log_file, log_level_str=config.log_level)
100+
101+
if args.table:
102+
logging.info(f"Processing specific table: {args.table}")
91103

92104
db_replicator = DbReplicator(
93105
config=config,
94106
database=db_name,
95107
target_database=getattr(args, 'target_db', None),
96108
initial_only=args.initial_only,
109+
worker_id=args.worker_id,
110+
total_workers=args.total_workers,
111+
table=args.table,
97112
)
98113
db_replicator.run()
99114

@@ -142,6 +157,18 @@ def main():
142157
"--initial_only", type=bool, default=False,
143158
help="don't run realtime replication, run initial replication only",
144159
)
160+
parser.add_argument(
161+
"--worker_id", type=int, default=None,
162+
help="Worker ID for parallel initial replication (0-based)",
163+
)
164+
parser.add_argument(
165+
"--total_workers", type=int, default=None,
166+
help="Total number of workers for parallel initial replication",
167+
)
168+
parser.add_argument(
169+
"--table", type=str, default=None,
170+
help="Specific table to process (used with --worker_id for parallel processing of a single table)",
171+
)
145172
args = parser.parse_args()
146173

147174
config = Settings()

mysql_ch_replicator/mysql_api.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,27 @@ def get_table_create_statement(self, table_name) -> str:
9393
create_statement = res[0][1].strip()
9494
return create_statement
9595

96-
def get_records(self, table_name, order_by, limit, start_value=None):
96+
def get_records(self, table_name, order_by, limit, start_value=None, worker_id=None, total_workers=None):
9797
self.reconnect_if_required()
98-
order_by = ','.join(order_by)
98+
order_by_str = ','.join(order_by)
9999
where = ''
100100
if start_value is not None:
101101
start_value = ','.join(map(str, start_value))
102-
where = f'WHERE ({order_by}) > ({start_value}) '
103-
query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by} LIMIT {limit}'
102+
where = f'WHERE ({order_by_str}) > ({start_value}) '
103+
104+
# Add partitioning filter for parallel processing if needed
105+
if worker_id is not None and total_workers is not None and total_workers > 1:
106+
concat_keys = f"CONCAT_WS('|', {', '.join([f'COALESCE({key}, \"\")' for key in order_by])})"
107+
hash_condition = f"CRC32({concat_keys}) % {total_workers} = {worker_id}"
108+
if where:
109+
where += f'AND {hash_condition} '
110+
else:
111+
where = f'WHERE {hash_condition} '
112+
113+
query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}'
114+
print("query:", query)
115+
116+
# Execute the actual query
104117
self.cursor.execute(query)
105118
res = self.cursor.fetchall()
106119
records = [x for x in res]

mysql_ch_replicator/runner.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,19 @@ def __init__(self, config_file):
2424

2525

2626
class DbReplicatorRunner(ProcessRunner):
27-
def __init__(self, db_name, config_file):
28-
super().__init__(f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator')
27+
def __init__(self, db_name, config_file, worker_id=None, total_workers=None, initial_only=False):
28+
cmd = f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator'
29+
30+
if worker_id is not None:
31+
cmd += f' --worker_id={worker_id}'
32+
33+
if total_workers is not None:
34+
cmd += f' --total_workers={total_workers}'
35+
36+
if initial_only:
37+
cmd += ' --initial_only=True'
38+
39+
super().__init__(cmd)
2940

3041

3142
class DbOptimizerRunner(ProcessRunner):

0 commit comments

Comments
 (0)