11import json
22import os .path
3+ import random
34import time
45import pickle
6+ import hashlib
57from logging import getLogger
68from enum import Enum
79from dataclasses import dataclass
810from collections import defaultdict
11+ import sys
12+ import subprocess
13+ import select
914
1015from .config import Settings , MysqlSettings , ClickhouseSettings
1116from .mysql_api import MySQLApi
@@ -106,10 +111,15 @@ class DbReplicator:
106111
107112 READ_LOG_INTERVAL = 0.3
108113
109- def __init__ (self , config : Settings , database : str , target_database : str = None , initial_only : bool = False ):
114+ def __init__ (self , config : Settings , database : str , target_database : str = None , initial_only : bool = False ,
115+ worker_id : int = None , total_workers : int = None , table : str = None ):
110116 self .config = config
111117 self .database = database
112-
118+ self .worker_id = worker_id
119+ self .total_workers = total_workers
120+ self .settings_file = config .settings_file
121+ self .single_table = table # Store the single table to process
122+
113123 # use same as source database by default
114124 self .target_database = database
115125
@@ -122,9 +132,42 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
122132 if target_database :
123133 self .target_database = target_database
124134
125- self .target_database_tmp = self .target_database + '_tmp'
126135 self .initial_only = initial_only
127136
137+ # Handle state file differently for parallel workers
138+ if self .worker_id is not None and self .total_workers is not None :
139+ # For worker processes in parallel mode, use a different state file with a deterministic name
140+ self .is_parallel_worker = True
141+
142+ # Determine table name for the state file
143+ table_identifier = self .single_table if self .single_table else "all_tables"
144+
145+ # Create a hash of the table name to ensure it's filesystem-safe
146+ if self .single_table :
147+ # Use a hex digest of the table name to ensure it's filesystem-safe
148+ table_identifier = hashlib .sha256 (self .single_table .encode ('utf-8' )).hexdigest ()[:16 ]
149+ else :
150+ table_identifier = "all_tables"
151+
152+ # Create a deterministic state file path that includes worker_id, total_workers, and table hash
153+ self .state_path = os .path .join (
154+ self .config .binlog_replicator .data_dir ,
155+ self .database ,
156+ f'state_worker_{ self .worker_id } _of_{ self .total_workers } _{ table_identifier } .pckl'
157+ )
158+
159+ logger .info (f"Worker { self .worker_id } /{ self .total_workers } using state file: { self .state_path } " )
160+
161+ if self .single_table :
162+ logger .info (f"Worker { self .worker_id } focusing only on table: { self .single_table } " )
163+ else :
164+ self .state_path = os .path .join (self .config .binlog_replicator .data_dir , self .database , 'state.pckl' )
165+ self .is_parallel_worker = False
166+
167+ self .target_database_tmp = self .target_database + '_tmp'
168+ if self .is_parallel_worker :
169+ self .target_database_tmp = self .target_database
170+
128171 self .mysql_api = MySQLApi (
129172 database = self .database ,
130173 mysql_settings = config .mysql ,
@@ -148,7 +191,7 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
148191 self .start_time = time .time ()
149192
150193 def create_state (self ):
151- return State (os . path . join ( self .config . binlog_replicator . data_dir , self . database , 'state.pckl' ) )
194+ return State (self .state_path )
152195
153196 def validate_database_settings (self ):
154197 if not self .initial_only :
@@ -196,7 +239,9 @@ def run(self):
196239
197240 logger .info ('recreating database' )
198241 self .clickhouse_api .database = self .target_database_tmp
199- self .clickhouse_api .recreate_database ()
242+ if not self .is_parallel_worker :
243+ self .clickhouse_api .recreate_database ()
244+
200245 self .state .tables = self .mysql_api .get_tables ()
201246 self .state .tables = [
202247 table for table in self .state .tables if self .config .is_table_matches (table )
@@ -220,6 +265,10 @@ def create_initial_structure(self):
220265 def create_initial_structure_table (self , table_name ):
221266 if not self .config .is_table_matches (table_name ):
222267 return
268+
269+ if self .single_table and self .single_table != table_name :
270+ return
271+
223272 mysql_create_statement = self .mysql_api .get_table_create_statement (table_name )
224273 mysql_structure = self .converter .parse_mysql_table_structure (
225274 mysql_create_statement , required_table_name = table_name ,
@@ -232,7 +281,9 @@ def create_initial_structure_table(self, table_name):
232281
233282 self .state .tables_structure [table_name ] = (mysql_structure , clickhouse_structure )
234283 indexes = self .config .get_indexes (self .database , table_name )
235- self .clickhouse_api .create_table (clickhouse_structure , additional_indexes = indexes )
284+
285+ if not self .is_parallel_worker :
286+ self .clickhouse_api .create_table (clickhouse_structure , additional_indexes = indexes )
236287
237288 def prevent_binlog_removal (self ):
238289 if time .time () - self .last_touch_time < self .BINLOG_TOUCH_INTERVAL :
@@ -253,22 +304,26 @@ def perform_initial_replication(self):
253304 for table in self .state .tables :
254305 if start_table and table != start_table :
255306 continue
307+ if self .single_table and self .single_table != table :
308+ continue
256309 self .perform_initial_replication_table (table )
257310 start_table = None
258- logger .info (f'initial replication - swapping database' )
259- if self .target_database in self .clickhouse_api .get_databases ():
260- self .clickhouse_api .execute_command (
261- f'RENAME DATABASE `{ self .target_database } ` TO `{ self .target_database } _old`' ,
262- )
263- self .clickhouse_api .execute_command (
264- f'RENAME DATABASE `{ self .target_database_tmp } ` TO `{ self .target_database } `' ,
265- )
266- self .clickhouse_api .drop_database (f'{ self .target_database } _old' )
267- else :
268- self .clickhouse_api .execute_command (
269- f'RENAME DATABASE `{ self .target_database_tmp } ` TO `{ self .target_database } `' ,
270- )
271- self .clickhouse_api .database = self .target_database
311+
312+ if not self .is_parallel_worker :
313+ logger .info (f'initial replication - swapping database' )
314+ if self .target_database in self .clickhouse_api .get_databases ():
315+ self .clickhouse_api .execute_command (
316+ f'RENAME DATABASE `{ self .target_database } ` TO `{ self .target_database } _old`' ,
317+ )
318+ self .clickhouse_api .execute_command (
319+ f'RENAME DATABASE `{ self .target_database_tmp } ` TO `{ self .target_database } `' ,
320+ )
321+ self .clickhouse_api .drop_database (f'{ self .target_database } _old' )
322+ else :
323+ self .clickhouse_api .execute_command (
324+ f'RENAME DATABASE `{ self .target_database_tmp } ` TO `{ self .target_database } `' ,
325+ )
326+ self .clickhouse_api .database = self .target_database
272327 logger .info (f'initial replication - done' )
273328
274329 def perform_initial_replication_table (self , table_name ):
@@ -278,6 +333,13 @@ def perform_initial_replication_table(self, table_name):
278333 logger .info (f'skip table { table_name } - not matching any allowed table' )
279334 return
280335
336+ if not self .is_parallel_worker and self .config .initial_replication_threads > 1 :
337+ self .state .initial_replication_table = table_name
338+ self .state .initial_replication_max_primary_key = None
339+ self .state .save ()
340+ self .perform_initial_replication_table_parallel (table_name )
341+ return
342+
281343 max_primary_key = None
282344 if self .state .initial_replication_table == table_name :
283345 # continue replication from saved position
@@ -322,6 +384,8 @@ def perform_initial_replication_table(self, table_name):
322384 order_by = primary_keys ,
323385 limit = DbReplicator .INITIAL_REPLICATION_BATCH_SIZE ,
324386 start_value = query_start_values ,
387+ worker_id = self .worker_id ,
388+ total_workers = self .total_workers ,
325389 )
326390 logger .debug (f'extracted { len (records )} records from mysql' )
327391
@@ -360,6 +424,66 @@ def perform_initial_replication_table(self, table_name):
360424 f'primary key: { max_primary_key } ' ,
361425 )
362426
427+ def perform_initial_replication_table_parallel (self , table_name ):
428+ """
429+ Execute initial replication for a table using multiple parallel worker processes.
430+ Each worker will handle a portion of the table based on its worker_id and total_workers.
431+ """
432+ logger .info (f"Starting parallel replication for table { table_name } with { self .config .initial_replication_threads } workers" )
433+
434+ # Create and launch worker processes
435+ processes = []
436+ for worker_id in range (self .config .initial_replication_threads ):
437+ # Prepare command to launch a worker process
438+ cmd = [
439+ sys .executable , "-m" , "mysql_ch_replicator.main" ,
440+ "db_replicator" , # Required positional mode argument
441+ "--config" , self .settings_file ,
442+ "--db" , self .database ,
443+ "--worker_id" , str (worker_id ),
444+ "--total_workers" , str (self .config .initial_replication_threads ),
445+ "--table" , table_name ,
446+ "--target_db" , self .target_database_tmp ,
447+ "--initial_only=True" ,
448+ ]
449+
450+ logger .info (f"Launching worker { worker_id } : { ' ' .join (cmd )} " )
451+ process = subprocess .Popen (cmd )
452+ processes .append (process )
453+
454+ # Wait for all worker processes to complete
455+ logger .info (f"Waiting for { len (processes )} workers to complete replication of { table_name } " )
456+
457+ try :
458+ while processes :
459+ for i , process in enumerate (processes [:]):
460+ # Check if process is still running
461+ if process .poll () is not None :
462+ exit_code = process .returncode
463+ if exit_code == 0 :
464+ logger .info (f"Worker process { i } completed successfully" )
465+ else :
466+ logger .error (f"Worker process { i } failed with exit code { exit_code } " )
467+ # Optional: can raise an exception here to abort the entire operation
468+ raise Exception (f"Worker process failed with exit code { exit_code } " )
469+
470+ processes .remove (process )
471+
472+ if processes :
473+ # Wait a bit before checking again
474+ time .sleep (0.1 )
475+
476+ # Every 30 seconds, log progress
477+ if int (time .time ()) % 30 == 0 :
478+ logger .info (f"Still waiting for { len (processes )} workers to complete" )
479+ except KeyboardInterrupt :
480+ logger .warning ("Received interrupt, terminating worker processes" )
481+ for process in processes :
482+ process .terminate ()
483+ raise
484+
485+ logger .info (f"All workers completed replication of table { table_name } " )
486+
363487 def run_realtime_replication (self ):
364488 if self .initial_only :
365489 logger .info ('skip running realtime replication, only initial replication was requested' )
0 commit comments