Skip to content

Commit 6a6f443

Browse files
committed
db_replicator refactor, splitted into several classes
1 parent 3d74847 commit 6a6f443

File tree

2 files changed

+23
-256
lines changed

2 files changed

+23
-256
lines changed

mysql_ch_replicator/db_replicator.py

Lines changed: 18 additions & 251 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,23 @@
1919
from .table_structure import TableStructure, TableField
2020
from .binlog_replicator import DataReader, LogEvent, EventType
2121
from .utils import GracefulKiller, touch_all_files, format_floats
22+
from .db_replicator_initial import DbReplicatorInitial
23+
from .common import Status
2224

2325

2426
logger = getLogger(__name__)
2527

2628

27-
class Status(Enum):
28-
NONE = 0
29-
CREATING_INITIAL_STRUCTURES = 1
30-
PERFORMING_INITIAL_REPLICATION = 2
31-
RUNNING_REALTIME_REPLICATION = 3
29+
@dataclass
30+
class Statistics:
31+
last_transaction: tuple = None
32+
events_count: int = 0
33+
insert_events_count: int = 0
34+
insert_records_count: int = 0
35+
erase_events_count: int = 0
36+
erase_records_count: int = 0
37+
no_events_count: int = 0
38+
cpu_load: float = 0.0
3239

3340

3441
class State:
@@ -87,21 +94,8 @@ def remove(self):
8794
os.remove(file_name + '.tmp')
8895

8996

90-
@dataclass
91-
class Statistics:
92-
last_transaction: tuple = None
93-
events_count: int = 0
94-
insert_events_count: int = 0
95-
insert_records_count: int = 0
96-
erase_events_count: int = 0
97-
erase_records_count: int = 0
98-
no_events_count: int = 0
99-
cpu_load: float = 0.0
100-
101-
10297
class DbReplicator:
10398

104-
INITIAL_REPLICATION_BATCH_SIZE = 50000
10599
SAVE_STATE_INTERVAL = 10
106100
STATS_DUMP_INTERVAL = 60
107101
BINLOG_TOUCH_INTERVAL = 120
@@ -189,6 +183,9 @@ def __init__(self, config: Settings, database: str, target_database: str = None,
189183
self.last_records_upload_time = 0
190184
self.last_touch_time = 0
191185
self.start_time = time.time()
186+
187+
# Create the initial replicator instance
188+
self.initial_replicator = DbReplicatorInitial(self)
192189

193190
def create_state(self):
194191
return State(self.state_path)
@@ -204,18 +201,6 @@ def validate_database_settings(self):
204201
'Otherwise you will get DUPLICATES in your SELECT queries\n\n\n'
205202
)
206203

207-
def validate_mysql_structure(self, mysql_structure: TableStructure):
208-
for key_idx in mysql_structure.primary_key_ids:
209-
primary_field: TableField = mysql_structure.fields[key_idx]
210-
if 'not null' not in primary_field.parameters.lower():
211-
logger.warning('primary key validation failed')
212-
logger.warning(
213-
f'\n\n\n !!! WARNING - PRIMARY KEY NULLABLE (field "{primary_field.name}", table "{mysql_structure.table_name}") !!!\n\n'
214-
'There could be errors replicating nullable primary key\n'
215-
'Please ensure all tables has NOT NULL parameter for primary key\n'
216-
'Or mark tables as skipped, see "exclude_tables" option\n\n\n'
217-
)
218-
219204
def run(self):
220205
try:
221206
logger.info('launched db_replicator')
@@ -233,7 +218,7 @@ def run(self):
233218
self.run_realtime_replication()
234219
return
235220
if self.state.status == Status.PERFORMING_INITIAL_REPLICATION:
236-
self.perform_initial_replication()
221+
self.initial_replicator.perform_initial_replication()
237222
self.run_realtime_replication()
238223
return
239224

@@ -249,42 +234,13 @@ def run(self):
249234
self.state.last_processed_transaction = self.data_reader.get_last_transaction_id()
250235
self.state.save()
251236
logger.info(f'last known transaction {self.state.last_processed_transaction}')
252-
self.create_initial_structure()
253-
self.perform_initial_replication()
237+
self.initial_replicator.create_initial_structure()
238+
self.initial_replicator.perform_initial_replication()
254239
self.run_realtime_replication()
255240
except Exception:
256241
logger.error(f'unhandled exception', exc_info=True)
257242
raise
258243

259-
def create_initial_structure(self):
260-
self.state.status = Status.CREATING_INITIAL_STRUCTURES
261-
for table in self.state.tables:
262-
self.create_initial_structure_table(table)
263-
self.state.save()
264-
265-
def create_initial_structure_table(self, table_name):
266-
if not self.config.is_table_matches(table_name):
267-
return
268-
269-
if self.single_table and self.single_table != table_name:
270-
return
271-
272-
mysql_create_statement = self.mysql_api.get_table_create_statement(table_name)
273-
mysql_structure = self.converter.parse_mysql_table_structure(
274-
mysql_create_statement, required_table_name=table_name,
275-
)
276-
self.validate_mysql_structure(mysql_structure)
277-
clickhouse_structure = self.converter.convert_table_structure(mysql_structure)
278-
279-
# Always set if_not_exists to True to prevent errors when tables already exist
280-
clickhouse_structure.if_not_exists = True
281-
282-
self.state.tables_structure[table_name] = (mysql_structure, clickhouse_structure)
283-
indexes = self.config.get_indexes(self.database, table_name)
284-
285-
if not self.is_parallel_worker:
286-
self.clickhouse_api.create_table(clickhouse_structure, additional_indexes=indexes)
287-
288244
def prevent_binlog_removal(self):
289245
if time.time() - self.last_touch_time < self.BINLOG_TOUCH_INTERVAL:
290246
return
@@ -295,195 +251,6 @@ def prevent_binlog_removal(self):
295251
self.last_touch_time = time.time()
296252
touch_all_files(binlog_directory)
297253

298-
def perform_initial_replication(self):
299-
self.clickhouse_api.database = self.target_database_tmp
300-
logger.info('running initial replication')
301-
self.state.status = Status.PERFORMING_INITIAL_REPLICATION
302-
self.state.save()
303-
start_table = self.state.initial_replication_table
304-
for table in self.state.tables:
305-
if start_table and table != start_table:
306-
continue
307-
if self.single_table and self.single_table != table:
308-
continue
309-
self.perform_initial_replication_table(table)
310-
start_table = None
311-
312-
if not self.is_parallel_worker:
313-
logger.info(f'initial replication - swapping database')
314-
if self.target_database in self.clickhouse_api.get_databases():
315-
self.clickhouse_api.execute_command(
316-
f'RENAME DATABASE `{self.target_database}` TO `{self.target_database}_old`',
317-
)
318-
self.clickhouse_api.execute_command(
319-
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
320-
)
321-
self.clickhouse_api.drop_database(f'{self.target_database}_old')
322-
else:
323-
self.clickhouse_api.execute_command(
324-
f'RENAME DATABASE `{self.target_database_tmp}` TO `{self.target_database}`',
325-
)
326-
self.clickhouse_api.database = self.target_database
327-
logger.info(f'initial replication - done')
328-
329-
def perform_initial_replication_table(self, table_name):
330-
logger.info(f'running initial replication for table {table_name}')
331-
332-
if not self.config.is_table_matches(table_name):
333-
logger.info(f'skip table {table_name} - not matching any allowed table')
334-
return
335-
336-
if not self.is_parallel_worker and self.config.initial_replication_threads > 1:
337-
self.state.initial_replication_table = table_name
338-
self.state.initial_replication_max_primary_key = None
339-
self.state.save()
340-
self.perform_initial_replication_table_parallel(table_name)
341-
return
342-
343-
max_primary_key = None
344-
if self.state.initial_replication_table == table_name:
345-
# continue replication from saved position
346-
max_primary_key = self.state.initial_replication_max_primary_key
347-
logger.info(f'continue from primary key {max_primary_key}')
348-
else:
349-
# starting replication from zero
350-
logger.info(f'replicating from scratch')
351-
self.state.initial_replication_table = table_name
352-
self.state.initial_replication_max_primary_key = None
353-
self.state.save()
354-
355-
mysql_table_structure, clickhouse_table_structure = self.state.tables_structure[table_name]
356-
357-
logger.debug(f'mysql table structure: {mysql_table_structure}')
358-
logger.debug(f'clickhouse table structure: {clickhouse_table_structure}')
359-
360-
field_types = [field.field_type for field in clickhouse_table_structure.fields]
361-
362-
primary_keys = clickhouse_table_structure.primary_keys
363-
primary_key_ids = clickhouse_table_structure.primary_key_ids
364-
primary_key_types = [field_types[key_idx] for key_idx in primary_key_ids]
365-
366-
#logger.debug(f'primary key name: {primary_key}, type: {primary_key_type}')
367-
368-
stats_number_of_records = 0
369-
last_stats_dump_time = time.time()
370-
371-
while True:
372-
373-
query_start_values = max_primary_key
374-
if query_start_values is not None:
375-
for i in range(len(query_start_values)):
376-
key_type = primary_key_types[i]
377-
value = query_start_values[i]
378-
if 'int' not in key_type.lower():
379-
value = f"'{value}'"
380-
query_start_values[i] = value
381-
382-
records = self.mysql_api.get_records(
383-
table_name=table_name,
384-
order_by=primary_keys,
385-
limit=DbReplicator.INITIAL_REPLICATION_BATCH_SIZE,
386-
start_value=query_start_values,
387-
worker_id=self.worker_id,
388-
total_workers=self.total_workers,
389-
)
390-
logger.debug(f'extracted {len(records)} records from mysql')
391-
392-
records = self.converter.convert_records(records, mysql_table_structure, clickhouse_table_structure)
393-
394-
if self.config.debug_log_level:
395-
logger.debug(f'records: {records}')
396-
397-
if not records:
398-
break
399-
self.clickhouse_api.insert(table_name, records, table_structure=clickhouse_table_structure)
400-
for record in records:
401-
record_primary_key = [record[key_idx] for key_idx in primary_key_ids]
402-
if max_primary_key is None:
403-
max_primary_key = record_primary_key
404-
else:
405-
max_primary_key = max(max_primary_key, record_primary_key)
406-
407-
self.state.initial_replication_max_primary_key = max_primary_key
408-
self.save_state_if_required()
409-
self.prevent_binlog_removal()
410-
411-
stats_number_of_records += len(records)
412-
curr_time = time.time()
413-
if curr_time - last_stats_dump_time >= 60.0:
414-
last_stats_dump_time = curr_time
415-
logger.info(
416-
f'replicating {table_name}, '
417-
f'replicated {stats_number_of_records} records, '
418-
f'primary key: {max_primary_key}',
419-
)
420-
421-
logger.info(
422-
f'finish replicating {table_name}, '
423-
f'replicated {stats_number_of_records} records, '
424-
f'primary key: {max_primary_key}',
425-
)
426-
427-
def perform_initial_replication_table_parallel(self, table_name):
428-
"""
429-
Execute initial replication for a table using multiple parallel worker processes.
430-
Each worker will handle a portion of the table based on its worker_id and total_workers.
431-
"""
432-
logger.info(f"Starting parallel replication for table {table_name} with {self.config.initial_replication_threads} workers")
433-
434-
# Create and launch worker processes
435-
processes = []
436-
for worker_id in range(self.config.initial_replication_threads):
437-
# Prepare command to launch a worker process
438-
cmd = [
439-
sys.executable, "-m", "mysql_ch_replicator.main",
440-
"db_replicator", # Required positional mode argument
441-
"--config", self.settings_file,
442-
"--db", self.database,
443-
"--worker_id", str(worker_id),
444-
"--total_workers", str(self.config.initial_replication_threads),
445-
"--table", table_name,
446-
"--target_db", self.target_database_tmp,
447-
"--initial_only=True",
448-
]
449-
450-
logger.info(f"Launching worker {worker_id}: {' '.join(cmd)}")
451-
process = subprocess.Popen(cmd)
452-
processes.append(process)
453-
454-
# Wait for all worker processes to complete
455-
logger.info(f"Waiting for {len(processes)} workers to complete replication of {table_name}")
456-
457-
try:
458-
while processes:
459-
for i, process in enumerate(processes[:]):
460-
# Check if process is still running
461-
if process.poll() is not None:
462-
exit_code = process.returncode
463-
if exit_code == 0:
464-
logger.info(f"Worker process {i} completed successfully")
465-
else:
466-
logger.error(f"Worker process {i} failed with exit code {exit_code}")
467-
# Optional: can raise an exception here to abort the entire operation
468-
raise Exception(f"Worker process failed with exit code {exit_code}")
469-
470-
processes.remove(process)
471-
472-
if processes:
473-
# Wait a bit before checking again
474-
time.sleep(0.1)
475-
476-
# Every 30 seconds, log progress
477-
if int(time.time()) % 30 == 0:
478-
logger.info(f"Still waiting for {len(processes)} workers to complete")
479-
except KeyboardInterrupt:
480-
logger.warning("Received interrupt, terminating worker processes")
481-
for process in processes:
482-
process.terminate()
483-
raise
484-
485-
logger.info(f"All workers completed replication of table {table_name}")
486-
487254
def run_realtime_replication(self):
488255
if self.initial_only:
489256
logger.info('skip running realtime replication, only initial replication was requested')

test_mysql_ch_replicator.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from mysql_ch_replicator import mysql_api
1515
from mysql_ch_replicator import clickhouse_api
1616
from mysql_ch_replicator.binlog_replicator import State as BinlogState, FileReader, EventType, BinlogReplicator
17-
from mysql_ch_replicator.db_replicator import State as DbReplicatorState, DbReplicator
17+
from mysql_ch_replicator.db_replicator import State as DbReplicatorState, DbReplicator, DbReplicatorInitial
1818
from mysql_ch_replicator.converter import MysqlToClickhouseConverter
1919

2020
from mysql_ch_replicator.runner import ProcessRunner
@@ -1049,7 +1049,7 @@ def test_json():
10491049

10501050

10511051
def test_string_primary_key(monkeypatch):
1052-
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1052+
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
10531053

10541054
cfg = config.Settings()
10551055
cfg.load(CONFIG_FILE)
@@ -1111,7 +1111,7 @@ def test_string_primary_key(monkeypatch):
11111111

11121112

11131113
def test_if_exists_if_not_exists(monkeypatch):
1114-
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1114+
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
11151115

11161116
cfg = config.Settings()
11171117
cfg.load(CONFIG_FILE)
@@ -1152,7 +1152,7 @@ def test_if_exists_if_not_exists(monkeypatch):
11521152

11531153

11541154
def test_percona_migration(monkeypatch):
1155-
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1155+
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
11561156

11571157
cfg = config.Settings()
11581158
cfg.load(CONFIG_FILE)
@@ -1230,7 +1230,7 @@ def test_percona_migration(monkeypatch):
12301230

12311231

12321232
def test_add_column_first_after_and_drop_column(monkeypatch):
1233-
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
1233+
monkeypatch.setattr(DbReplicatorInitial, 'INITIAL_REPLICATION_BATCH_SIZE', 1)
12341234

12351235
cfg = config.Settings()
12361236
cfg.load(CONFIG_FILE)

0 commit comments

Comments
 (0)