Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions mysql_ch_replicator/db_replicator_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def perform_initial_replication(self):
start_table = None

if not self.replicator.is_parallel_worker:
# Verify table structures after replication but before swapping databases
self.verify_table_structures_after_replication()

logger.info(f'initial replication - swapping database')
if self.replicator.target_database in self.replicator.clickhouse_api.get_databases():
self.replicator.clickhouse_api.execute_command(
Expand Down Expand Up @@ -216,6 +219,115 @@ def perform_initial_replication_table(self, table_name):
)
self.save_state_if_required(force=True)

def verify_table_structures_after_replication(self):
"""
Verify that MySQL table structures haven't changed during the initial replication process.
This helps ensure data integrity by confirming the source tables are the same as when
replication started.

Raises an exception if any table structure has changed, preventing the completion
of the initial replication process.
"""
logger.info('Verifying table structures after initial replication')

changed_tables = []

for table_name in self.replicator.state.tables:
if not self.replicator.config.is_table_matches(table_name):
continue

if self.replicator.single_table and self.replicator.single_table != table_name:
continue

# Get the current MySQL table structure
current_mysql_create_statement = self.replicator.mysql_api.get_table_create_statement(table_name)
current_mysql_structure = self.replicator.converter.parse_mysql_table_structure(
current_mysql_create_statement, required_table_name=table_name,
)

# Get the original structure used at the start of replication
original_mysql_structure, _ = self.replicator.state.tables_structure.get(table_name, (None, None))

if not original_mysql_structure:
logger.warning(f'Could not find original structure for table {table_name}')
continue

# Compare the structures in a deterministic way
structures_match = self._compare_table_structures(original_mysql_structure, current_mysql_structure)

if not structures_match:
logger.warning(
f'\n\n\n !!! WARNING - TABLE STRUCTURE CHANGED DURING REPLICATION (table "{table_name}") !!!\n\n'
'The MySQL table structure has changed since the initial replication started.\n'
'This may cause data inconsistency and replication issues.\n'
)
logger.error(f'Original structure: {original_mysql_structure}')
logger.error(f'Current structure: {current_mysql_structure}')
changed_tables.append(table_name)
else:
logger.info(f'Table structure verification passed for {table_name}')

# If any tables have changed, raise an exception to abort the replication process
if changed_tables:
error_message = (
f"Table structure changes detected in: {', '.join(changed_tables)}. "
"Initial replication aborted to prevent data inconsistency. "
"Please restart replication after reviewing the changes."
)
logger.error(error_message)
raise Exception(error_message)

logger.info('Table structure verification completed')

def _compare_table_structures(self, struct1, struct2):
"""
Compare two TableStructure objects in a deterministic way.
Returns True if the structures are equivalent, False otherwise.
"""
# Compare basic attributes
if struct1.table_name != struct2.table_name:
logger.error(f"Table name mismatch: {struct1.table_name} vs {struct2.table_name}")
return False

if struct1.charset != struct2.charset:
logger.error(f"Charset mismatch: {struct1.charset} vs {struct2.charset}")
return False

# Compare primary keys (order matters)
if len(struct1.primary_keys) != len(struct2.primary_keys):
logger.error(f"Primary key count mismatch: {len(struct1.primary_keys)} vs {len(struct2.primary_keys)}")
return False

for i, key in enumerate(struct1.primary_keys):
if key != struct2.primary_keys[i]:
logger.error(f"Primary key mismatch at position {i}: {key} vs {struct2.primary_keys[i]}")
return False

# Compare fields (count and attributes)
if len(struct1.fields) != len(struct2.fields):
logger.error(f"Field count mismatch: {len(struct1.fields)} vs {len(struct2.fields)}")
return False

for i, field1 in enumerate(struct1.fields):
field2 = struct2.fields[i]

if field1.name != field2.name:
logger.error(f"Field name mismatch at position {i}: {field1.name} vs {field2.name}")
return False

if field1.field_type != field2.field_type:
logger.error(f"Field type mismatch for {field1.name}: {field1.field_type} vs {field2.field_type}")
return False

# Compare parameters - normalize whitespace to avoid false positives
params1 = ' '.join(field1.parameters.lower().split())
params2 = ' '.join(field2.parameters.lower().split())
if params1 != params2:
logger.error(f"Field parameters mismatch for {field1.name}: {params1} vs {params2}")
return False

return True

def perform_initial_replication_table_parallel(self, table_name):
"""
Execute initial replication for a table using multiple parallel worker processes.
Expand Down
1 change: 0 additions & 1 deletion mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N
where = f'WHERE {hash_condition} '

query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}'
print("query:", query)

# Execute the actual query
self.cursor.execute(query)
Expand Down