diff --git a/mysql_ch_replicator/db_replicator_initial.py b/mysql_ch_replicator/db_replicator_initial.py index bc56e2d..3c83d1f 100644 --- a/mysql_ch_replicator/db_replicator_initial.py +++ b/mysql_ch_replicator/db_replicator_initial.py @@ -103,6 +103,9 @@ def perform_initial_replication(self): start_table = None if not self.replicator.is_parallel_worker: + # Verify table structures after replication but before swapping databases + self.verify_table_structures_after_replication() + logger.info(f'initial replication - swapping database') if self.replicator.target_database in self.replicator.clickhouse_api.get_databases(): self.replicator.clickhouse_api.execute_command( @@ -216,6 +219,115 @@ def perform_initial_replication_table(self, table_name): ) self.save_state_if_required(force=True) + def verify_table_structures_after_replication(self): + """ + Verify that MySQL table structures haven't changed during the initial replication process. + This helps ensure data integrity by confirming the source tables are the same as when + replication started. + + Raises an exception if any table structure has changed, preventing the completion + of the initial replication process. + """ + logger.info('Verifying table structures after initial replication') + + changed_tables = [] + + for table_name in self.replicator.state.tables: + if not self.replicator.config.is_table_matches(table_name): + continue + + if self.replicator.single_table and self.replicator.single_table != table_name: + continue + + # Get the current MySQL table structure + current_mysql_create_statement = self.replicator.mysql_api.get_table_create_statement(table_name) + current_mysql_structure = self.replicator.converter.parse_mysql_table_structure( + current_mysql_create_statement, required_table_name=table_name, + ) + + # Get the original structure used at the start of replication + original_mysql_structure, _ = self.replicator.state.tables_structure.get(table_name, (None, None)) + + if not original_mysql_structure: + logger.warning(f'Could not find original structure for table {table_name}') + continue + + # Compare the structures in a deterministic way + structures_match = self._compare_table_structures(original_mysql_structure, current_mysql_structure) + + if not structures_match: + logger.warning( + f'\n\n\n !!! WARNING - TABLE STRUCTURE CHANGED DURING REPLICATION (table "{table_name}") !!!\n\n' + 'The MySQL table structure has changed since the initial replication started.\n' + 'This may cause data inconsistency and replication issues.\n' + ) + logger.error(f'Original structure: {original_mysql_structure}') + logger.error(f'Current structure: {current_mysql_structure}') + changed_tables.append(table_name) + else: + logger.info(f'Table structure verification passed for {table_name}') + + # If any tables have changed, raise an exception to abort the replication process + if changed_tables: + error_message = ( + f"Table structure changes detected in: {', '.join(changed_tables)}. " + "Initial replication aborted to prevent data inconsistency. " + "Please restart replication after reviewing the changes." + ) + logger.error(error_message) + raise Exception(error_message) + + logger.info('Table structure verification completed') + + def _compare_table_structures(self, struct1, struct2): + """ + Compare two TableStructure objects in a deterministic way. + Returns True if the structures are equivalent, False otherwise. + """ + # Compare basic attributes + if struct1.table_name != struct2.table_name: + logger.error(f"Table name mismatch: {struct1.table_name} vs {struct2.table_name}") + return False + + if struct1.charset != struct2.charset: + logger.error(f"Charset mismatch: {struct1.charset} vs {struct2.charset}") + return False + + # Compare primary keys (order matters) + if len(struct1.primary_keys) != len(struct2.primary_keys): + logger.error(f"Primary key count mismatch: {len(struct1.primary_keys)} vs {len(struct2.primary_keys)}") + return False + + for i, key in enumerate(struct1.primary_keys): + if key != struct2.primary_keys[i]: + logger.error(f"Primary key mismatch at position {i}: {key} vs {struct2.primary_keys[i]}") + return False + + # Compare fields (count and attributes) + if len(struct1.fields) != len(struct2.fields): + logger.error(f"Field count mismatch: {len(struct1.fields)} vs {len(struct2.fields)}") + return False + + for i, field1 in enumerate(struct1.fields): + field2 = struct2.fields[i] + + if field1.name != field2.name: + logger.error(f"Field name mismatch at position {i}: {field1.name} vs {field2.name}") + return False + + if field1.field_type != field2.field_type: + logger.error(f"Field type mismatch for {field1.name}: {field1.field_type} vs {field2.field_type}") + return False + + # Compare parameters - normalize whitespace to avoid false positives + params1 = ' '.join(field1.parameters.lower().split()) + params2 = ' '.join(field2.parameters.lower().split()) + if params1 != params2: + logger.error(f"Field parameters mismatch for {field1.name}: {params1} vs {params2}") + return False + + return True + def perform_initial_replication_table_parallel(self, table_name): """ Execute initial replication for a table using multiple parallel worker processes. diff --git a/mysql_ch_replicator/mysql_api.py b/mysql_ch_replicator/mysql_api.py index 082eb78..2b1cc80 100644 --- a/mysql_ch_replicator/mysql_api.py +++ b/mysql_ch_replicator/mysql_api.py @@ -113,7 +113,6 @@ def get_records(self, table_name, order_by, limit, start_value=None, worker_id=N where = f'WHERE {hash_condition} ' query = f'SELECT * FROM `{table_name}` {where}ORDER BY {order_by_str} LIMIT {limit}' - print("query:", query) # Execute the actual query self.cursor.execute(query)