Skip to content

Commit ee7c5fa

Browse files
authored
Fix version for parallel replication (#139)
1 parent 1a5e268 commit ee7c5fa

File tree

3 files changed

+174
-2
lines changed

3 files changed

+174
-2
lines changed

mysql_ch_replicator/clickhouse_api.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,25 @@ def get_system_setting(self, name):
292292
if not results:
293293
return None
294294
return results[0].get('value', None)
295+
296+
def get_max_record_version(self, table_name):
297+
"""
298+
Query the maximum _version value for a given table directly from ClickHouse.
299+
300+
Args:
301+
table_name: The name of the table to query
302+
303+
Returns:
304+
The maximum _version value as an integer, or None if the table doesn't exist
305+
or has no records
306+
"""
307+
try:
308+
query = f"SELECT MAX(_version) FROM `{self.database}`.`{table_name}`"
309+
result = self.client.query(query)
310+
if not result.result_rows or result.result_rows[0][0] is None:
311+
logger.warning(f"No records with _version found in table {table_name}")
312+
return None
313+
return result.result_rows[0][0]
314+
except Exception as e:
315+
logger.error(f"Error querying max _version for table {table_name}: {e}")
316+
return None

mysql_ch_replicator/db_replicator_initial.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import time
55
import sys
66
import subprocess
7+
import pickle
78
from logging import getLogger
89
from enum import Enum
910

@@ -213,6 +214,7 @@ def perform_initial_replication_table(self, table_name):
213214
f'replicated {stats_number_of_records} records, '
214215
f'primary key: {max_primary_key}',
215216
)
217+
self.save_state_if_required(force=True)
216218

217219
def perform_initial_replication_table_parallel(self, table_name):
218220
"""
@@ -273,3 +275,28 @@ def perform_initial_replication_table_parallel(self, table_name):
273275
raise
274276

275277
logger.info(f"All workers completed replication of table {table_name}")
278+
279+
# Consolidate record versions from all worker states
280+
logger.info(f"Consolidating record versions from worker states for table {table_name}")
281+
self.consolidate_worker_record_versions(table_name)
282+
283+
def consolidate_worker_record_versions(self, table_name):
284+
"""
285+
Query ClickHouse directly to get the maximum record version for the specified table
286+
and update the main state with this version.
287+
"""
288+
logger.info(f"Getting maximum record version from ClickHouse for table {table_name}")
289+
290+
# Query ClickHouse for the maximum record version
291+
max_version = self.replicator.clickhouse_api.get_max_record_version(table_name)
292+
293+
if max_version is not None and max_version > 0:
294+
current_version = self.replicator.state.tables_last_record_version.get(table_name, 0)
295+
if max_version > current_version:
296+
logger.info(f"Updating record version for table {table_name} from {current_version} to {max_version}")
297+
self.replicator.state.tables_last_record_version[table_name] = max_version
298+
self.replicator.state.save()
299+
else:
300+
logger.info(f"Current version {current_version} is already up-to-date with ClickHouse version {max_version}")
301+
else:
302+
logger.warning(f"No record version found in ClickHouse for table {table_name}")

test_mysql_ch_replicator.py

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,129 @@ def test_initial_only():
574574
db_replicator_runner.stop()
575575

576576

577+
def test_parallel_initial_replication_record_versions():
578+
"""
579+
Test that record versions are properly consolidated from worker states
580+
after parallel initial replication.
581+
"""
582+
# Only run this test with parallel configuration
583+
cfg_file = 'tests_config_parallel.yaml'
584+
cfg = config.Settings()
585+
cfg.load(cfg_file)
586+
587+
# Ensure we have parallel replication configured
588+
assert cfg.initial_replication_threads > 1, "This test requires initial_replication_threads > 1"
589+
590+
mysql = mysql_api.MySQLApi(
591+
database=None,
592+
mysql_settings=cfg.mysql,
593+
)
594+
595+
ch = clickhouse_api.ClickhouseApi(
596+
database=TEST_DB_NAME,
597+
clickhouse_settings=cfg.clickhouse,
598+
)
599+
600+
prepare_env(cfg, mysql, ch)
601+
602+
# Create a table with sufficient records for parallel processing
603+
mysql.execute(f'''
604+
CREATE TABLE `{TEST_TABLE_NAME}` (
605+
id int NOT NULL AUTO_INCREMENT,
606+
name varchar(255),
607+
age int,
608+
version int NOT NULL DEFAULT 1,
609+
PRIMARY KEY (id)
610+
);
611+
''')
612+
613+
# Insert a large number of records to ensure parallel processing
614+
for i in range(1, 1001):
615+
mysql.execute(
616+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, version) VALUES ('User{i}', {20+i%50}, {i});",
617+
commit=(i % 100 == 0) # Commit every 100 records
618+
)
619+
620+
# Run initial replication only with parallel workers
621+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=cfg_file)
622+
db_replicator_runner.run()
623+
624+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=10.0)
625+
626+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
627+
628+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=10.0)
629+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1000, max_wait_time=10.0)
630+
631+
db_replicator_runner.stop()
632+
633+
# Verify database and table were created
634+
assert TEST_DB_NAME in ch.get_databases()
635+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
636+
assert TEST_TABLE_NAME in ch.get_tables()
637+
638+
# Verify all records were replicated
639+
records = ch.select(TEST_TABLE_NAME)
640+
assert len(records) == 1000
641+
642+
# Instead of reading the state file directly, verify the record versions are correctly handled
643+
# by checking the max _version in the ClickHouse table
644+
versions_query = ch.query(f"SELECT MAX(_version) FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}`")
645+
max_version_in_ch = versions_query.result_rows[0][0]
646+
assert max_version_in_ch >= 200, f"Expected max _version to be at least 200, got {max_version_in_ch}"
647+
648+
649+
# Now test realtime replication to verify versions continue correctly
650+
# Start binlog replication
651+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=cfg_file)
652+
binlog_replicator_runner.run()
653+
654+
time.sleep(3.0)
655+
656+
# Start DB replicator in realtime mode
657+
realtime_db_replicator = DbReplicatorRunner(TEST_DB_NAME, cfg_file=cfg_file)
658+
realtime_db_replicator.run()
659+
660+
# Insert a new record with version 1001
661+
mysql.execute(
662+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age, version) VALUES ('UserRealtime', 99, 1001);",
663+
commit=True
664+
)
665+
666+
# Wait for the record to be replicated
667+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1001)
668+
669+
# Verify the new record was replicated correctly
670+
realtime_record = ch.select(TEST_TABLE_NAME, where="name='UserRealtime'")[0]
671+
assert realtime_record['age'] == 99
672+
assert realtime_record['version'] == 1001
673+
674+
# Check that the _version column in CH is a reasonable value
675+
# With parallel workers, the _version won't be > 1000 because each worker
676+
# has its own independent version counter and they never intersect
677+
versions_query = ch.query(f"SELECT _version FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` WHERE name='UserRealtime'")
678+
ch_version = versions_query.result_rows[0][0]
679+
680+
681+
# With parallel workers (default is 4), each worker would process ~250 records
682+
# So the version for the new record should be slightly higher than 250
683+
# but definitely lower than 1000
684+
assert ch_version > 0, f"ClickHouse _version should be > 0, but got {ch_version}"
685+
686+
# We expect version to be roughly: (total_records / num_workers) + 1
687+
# For 1000 records and 4 workers, expect around 251
688+
expected_version_approx = 1000 // cfg.initial_replication_threads + 1
689+
# Allow some flexibility in the exact expected value
690+
assert abs(ch_version - expected_version_approx) < 50, (
691+
f"ClickHouse _version should be close to {expected_version_approx}, but got {ch_version}"
692+
)
693+
694+
# Clean up
695+
binlog_replicator_runner.stop()
696+
realtime_db_replicator.stop()
697+
db_replicator_runner.stop()
698+
699+
577700
def test_database_tables_filtering():
578701
cfg = config.Settings()
579702
cfg.load('tests_config_databases_tables.yaml')
@@ -693,8 +816,8 @@ def test_datetime_exception():
693816
name varchar(255),
694817
modified_date DateTime(3) NOT NULL,
695818
test_date date NOT NULL,
696-
PRIMARY KEY (id)
697-
);
819+
PRIMARY KEY (id)
820+
);
698821
''')
699822

700823
mysql.execute(

0 commit comments

Comments
 (0)