Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,20 @@ def get_db_and_table_name(self, token, db_name):
table_name = token
db_name = strip_sql_name(db_name)
table_name = strip_sql_name(table_name)

if self.db_replicator:
# Check if database and table match config BEFORE applying mapping
matches_config = (
self.db_replicator.config.is_database_matches(db_name)
and self.db_replicator.config.is_table_matches(table_name))
# If we're dealing with a relative table name (no DB prefix), we need to check
# if the current db_name is already a target database name
if '.' not in token and self.db_replicator.target_database == db_name:
# This is a target database name, so for config matching we need to use the source database
matches_config = (
self.db_replicator.config.is_database_matches(self.db_replicator.database)
and self.db_replicator.config.is_table_matches(table_name))
else:
# Normal case: check if source database and table match config
matches_config = (
self.db_replicator.config.is_database_matches(db_name)
and self.db_replicator.config.is_table_matches(table_name))

# Apply database mapping AFTER checking matches_config
if db_name == self.db_replicator.database:
Expand Down
102 changes: 102 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import json
import uuid
import decimal
import tempfile
import yaml

import pytest
import requests
Expand Down Expand Up @@ -2278,6 +2280,9 @@ def test_schema_evolution_with_db_mapping():
clickhouse_settings=cfg.clickhouse,
)

ch.drop_database("mapped_target_db")
assert_wait(lambda: "mapped_target_db" not in ch.get_databases())

prepare_env(cfg, mysql, ch, db_name=TEST_DB_NAME)

# Create a test table with some columns using fully qualified name
Expand Down Expand Up @@ -2352,3 +2357,100 @@ def test_schema_evolution_with_db_mapping():
# Clean up
db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_dynamic_column_addition_user_config():
"""Test to verify handling of dynamically added columns using user's exact configuration.

This test reproduces the issue where columns are added on-the-fly via UPDATE
rather than through ALTER TABLE statements, leading to an index error in the converter.
"""
config_path = 'tests_config_dynamic_column.yaml'

cfg = config.Settings()
cfg.load(config_path)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=None,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch, db_name='test_replication')

# Prepare environment - drop and recreate databases
mysql.drop_database("test_replication")
mysql.create_database("test_replication")
mysql.set_database("test_replication")
ch.drop_database("test_replication_ch")
assert_wait(lambda: "test_replication_ch" not in ch.get_databases())

# Create the exact table structure from the user's example
mysql.execute('''
CREATE TABLE test_replication.replication_data (
code VARCHAR(255) NOT NULL PRIMARY KEY,
val_1 VARCHAR(255) NOT NULL
);
''')

# Insert initial data
mysql.execute(
"INSERT INTO test_replication.replication_data(code, val_1) VALUE ('test-1', '1');",
commit=True,
)

# Start the replication processes
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_path)
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner("test_replication", cfg_file=config_path)
db_replicator_runner.run()

# Wait for initial replication to complete
assert_wait(lambda: "test_replication_ch" in ch.get_databases())

# Set the database before checking tables
ch.execute_command("USE test_replication_ch")
assert_wait(lambda: "replication_data" in ch.get_tables())
assert_wait(lambda: len(ch.select("replication_data")) == 1)

# Verify initial data was replicated correctly
assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1')

# Update an existing field - this should work fine
mysql.execute("UPDATE test_replication.replication_data SET val_1 = '1200' WHERE code = 'test-1';", commit=True)
assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1200')

mysql.execute("USE test_replication");

# Add val_2 column
mysql.execute("ALTER TABLE replication_data ADD COLUMN val_2 VARCHAR(255);", commit=True)

# Now try to update with a field that doesn't exist
# This would have caused an error before our fix
mysql.execute("UPDATE test_replication.replication_data SET val_2 = '100' WHERE code = 'test-1';", commit=True)

# Verify replication processes are still running
binlog_pid = get_binlog_replicator_pid(cfg)
db_pid = get_db_replicator_pid(cfg, "test_replication")

assert binlog_pid is not None, "Binlog replicator process died"
assert db_pid is not None, "DB replicator process died"

# Verify the replication is still working after the dynamic column update
mysql.execute("UPDATE test_replication.replication_data SET val_1 = '1500' WHERE code = 'test-1';", commit=True)
assert_wait(lambda: ch.select("replication_data", where="code='test-1'")[0]['val_1'] == '1500')

print("Test passed - dynamic column was skipped without breaking replication")

# Cleanup
binlog_pid = get_binlog_replicator_pid(cfg)
if binlog_pid:
kill_process(binlog_pid)

db_pid = get_db_replicator_pid(cfg, "test_replication")
if db_pid:
kill_process(db_pid)
20 changes: 20 additions & 0 deletions tests_config_dynamic_column.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
mysql:
host: 'localhost'
port: 9306
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: 'test_replication'

target_databases:
test_replication: test_replication_ch