diff --git a/mysql_ch_replicator/converter.py b/mysql_ch_replicator/converter.py index ea1ee29..48d710e 100644 --- a/mysql_ch_replicator/converter.py +++ b/mysql_ch_replicator/converter.py @@ -530,11 +530,20 @@ def get_db_and_table_name(self, token, db_name): table_name = token db_name = strip_sql_name(db_name) table_name = strip_sql_name(table_name) + if self.db_replicator: - # Check if database and table match config BEFORE applying mapping - matches_config = ( - self.db_replicator.config.is_database_matches(db_name) - and self.db_replicator.config.is_table_matches(table_name)) + # If we're dealing with a relative table name (no DB prefix), we need to check + # if the current db_name is already a target database name + if '.' not in token and self.db_replicator.target_database == db_name: + # This is a target database name, so for config matching we need to use the source database + matches_config = ( + self.db_replicator.config.is_database_matches(self.db_replicator.database) + and self.db_replicator.config.is_table_matches(table_name)) + else: + # Normal case: check if source database and table match config + matches_config = ( + self.db_replicator.config.is_database_matches(db_name) + and self.db_replicator.config.is_table_matches(table_name)) # Apply database mapping AFTER checking matches_config if db_name == self.db_replicator.database: diff --git a/test_mysql_ch_replicator.py b/test_mysql_ch_replicator.py index 8334e3c..918ef1b 100644 --- a/test_mysql_ch_replicator.py +++ b/test_mysql_ch_replicator.py @@ -6,6 +6,8 @@ import json import uuid import decimal +import tempfile +import yaml import pytest import requests @@ -2278,6 +2280,9 @@ def test_schema_evolution_with_db_mapping(): clickhouse_settings=cfg.clickhouse, ) + ch.drop_database("mapped_target_db") + assert_wait(lambda: "mapped_target_db" not in ch.get_databases()) + prepare_env(cfg, mysql, ch, db_name=TEST_DB_NAME) # Create a test table with some columns using fully qualified name @@ -2352,3 +2357,100 @@ def test_schema_evolution_with_db_mapping(): # Clean up db_replicator_runner.stop() binlog_replicator_runner.stop() + + +def test_dynamic_column_addition_user_config(): + """Test to verify handling of dynamically added columns using user's exact configuration. + + This test reproduces the issue where columns are added on-the-fly via UPDATE + rather than through ALTER TABLE statements, leading to an index error in the converter. + """ + config_path = 'tests_config_dynamic_column.yaml' + + cfg = config.Settings() + cfg.load(config_path) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=None, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch, db_name='test_replication') + + # Prepare environment - drop and recreate databases + mysql.drop_database("test_replication") + mysql.create_database("test_replication") + mysql.set_database("test_replication") + ch.drop_database("test_replication_ch") + assert_wait(lambda: "test_replication_ch" not in ch.get_databases()) + + # Create the exact table structure from the user's example + mysql.execute(''' + CREATE TABLE test_replication.replication_data ( + code VARCHAR(255) NOT NULL PRIMARY KEY, + val_1 VARCHAR(255) NOT NULL + ); + ''') + + # Insert initial data + mysql.execute( + "INSERT INTO test_replication.replication_data(code, val_1) VALUE ('test-1', '1');", + commit=True, + ) + + # Start the replication processes + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_path) + binlog_replicator_runner.run() + db_replicator_runner = DbReplicatorRunner("test_replication", cfg_file=config_path) + db_replicator_runner.run() + + # Wait for initial replication to complete + assert_wait(lambda: "test_replication_ch" in ch.get_databases()) + + # Set the database before checking tables + ch.execute_command("USE test_replication_ch") + assert_wait(lambda: "replication_data" in ch.get_tables()) + assert_wait(lambda: len(ch.select("replication_data")) == 1) + + # Verify initial data was replicated correctly + assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1') + + # Update an existing field - this should work fine + mysql.execute("UPDATE test_replication.replication_data SET val_1 = '1200' WHERE code = 'test-1';", commit=True) + assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1200') + + mysql.execute("USE test_replication"); + + # Add val_2 column + mysql.execute("ALTER TABLE replication_data ADD COLUMN val_2 VARCHAR(255);", commit=True) + + # Now try to update with a field that doesn't exist + # This would have caused an error before our fix + mysql.execute("UPDATE test_replication.replication_data SET val_2 = '100' WHERE code = 'test-1';", commit=True) + + # Verify replication processes are still running + binlog_pid = get_binlog_replicator_pid(cfg) + db_pid = get_db_replicator_pid(cfg, "test_replication") + + assert binlog_pid is not None, "Binlog replicator process died" + assert db_pid is not None, "DB replicator process died" + + # Verify the replication is still working after the dynamic column update + mysql.execute("UPDATE test_replication.replication_data SET val_1 = '1500' WHERE code = 'test-1';", commit=True) + assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1500') + + print("Test passed - dynamic column was skipped without breaking replication") + + # Cleanup + binlog_pid = get_binlog_replicator_pid(cfg) + if binlog_pid: + kill_process(binlog_pid) + + db_pid = get_db_replicator_pid(cfg, "test_replication") + if db_pid: + kill_process(db_pid) diff --git a/tests_config_dynamic_column.yaml b/tests_config_dynamic_column.yaml new file mode 100644 index 0000000..4ba381d --- /dev/null +++ b/tests_config_dynamic_column.yaml @@ -0,0 +1,20 @@ +mysql: + host: 'localhost' + port: 9306 + user: 'root' + password: 'admin' + +clickhouse: + host: 'localhost' + port: 9123 + user: 'default' + password: 'admin' + +binlog_replicator: + data_dir: '/app/binlog/' + records_per_file: 100000 + +databases: 'test_replication' + +target_databases: + test_replication: test_replication_ch