Skip to content

Commit 9e8e4f4

Browse files
committed
Added test reproducing crash
1 parent 856256a commit 9e8e4f4

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed

test_mysql_ch_replicator.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2257,3 +2257,99 @@ def test_performance_initial_only_replication():
22572257

22582258
# Clean up the temporary config file
22592259
os.remove(parallel_config_file)
2260+
2261+
2262+
def test_schema_evolution_with_db_mapping():
2263+
"""Test case to reproduce issue where schema evolution doesn't work with database mapping."""
2264+
# Use the predefined config file with database mapping
2265+
config_file = "tests_config_db_mapping.yaml"
2266+
2267+
cfg = config.Settings()
2268+
cfg.load(config_file)
2269+
2270+
# Note: Not setting a specific database in MySQL API
2271+
mysql = mysql_api.MySQLApi(
2272+
database=None,
2273+
mysql_settings=cfg.mysql,
2274+
)
2275+
2276+
ch = clickhouse_api.ClickhouseApi(
2277+
database="mapped_target_db",
2278+
clickhouse_settings=cfg.clickhouse,
2279+
)
2280+
2281+
prepare_env(cfg, mysql, ch, db_name=TEST_DB_NAME)
2282+
2283+
# Create a test table with some columns using fully qualified name
2284+
mysql.execute(f'''
2285+
CREATE TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (
2286+
`id` int NOT NULL,
2287+
`name` varchar(255) NOT NULL,
2288+
PRIMARY KEY (`id`));
2289+
''')
2290+
2291+
mysql.execute(
2292+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (1, 'Original')",
2293+
commit=True,
2294+
)
2295+
2296+
# Start the replication
2297+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
2298+
binlog_replicator_runner.run()
2299+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
2300+
db_replicator_runner.run()
2301+
2302+
# Make sure initial replication works with the database mapping
2303+
assert_wait(lambda: "mapped_target_db" in ch.get_databases())
2304+
ch.execute_command(f'USE `mapped_target_db`')
2305+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
2306+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
2307+
2308+
# Now follow user's exact sequence of operations with fully qualified names
2309+
# 1. Add new column
2310+
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` ADD COLUMN added_new_column char(1)", commit=True)
2311+
2312+
# 2. Rename the column
2313+
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` RENAME COLUMN added_new_column TO rename_column_name", commit=True)
2314+
2315+
# 3. Modify column type
2316+
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` MODIFY rename_column_name varchar(5)", commit=True)
2317+
2318+
# 4. Insert data using the modified schema
2319+
mysql.execute(
2320+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name, rename_column_name) VALUES (2, 'Second', 'ABCDE')",
2321+
commit=True,
2322+
)
2323+
2324+
# 5. Drop the column - this is where the error was reported
2325+
mysql.execute(f"ALTER TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` DROP COLUMN rename_column_name", commit=True)
2326+
2327+
# 6. Add more inserts after schema changes to verify ongoing replication
2328+
mysql.execute(
2329+
f"INSERT INTO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` (id, name) VALUES (3, 'Third record after drop column')",
2330+
commit=True,
2331+
)
2332+
2333+
# Check if all changes were replicated correctly
2334+
time.sleep(5) # Allow time for processing the changes
2335+
result = ch.select(TEST_TABLE_NAME)
2336+
print(f"ClickHouse table contents: {result}")
2337+
2338+
# Verify all records are present
2339+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)
2340+
2341+
# Verify specific records exist
2342+
records = ch.select(TEST_TABLE_NAME)
2343+
print(f"Record type: {type(records[0])}") # Debug the record type
2344+
2345+
# Access by field name 'id' instead of by position
2346+
record_ids = [record['id'] for record in records]
2347+
assert 1 in record_ids, "Original record (id=1) not found"
2348+
assert 3 in record_ids, "New record (id=3) after schema changes not found"
2349+
2350+
# Note: This test will likely fail with "IndexError: list index out of range"
2351+
# as reported by the user when using database mapping with schema evolution
2352+
2353+
# Clean up
2354+
db_replicator_runner.stop()
2355+
binlog_replicator_runner.stop()

tests_config_db_mapping.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
mysql:
2+
host: 'localhost'
3+
port: 9306
4+
user: 'root'
5+
password: 'admin'
6+
7+
clickhouse:
8+
host: 'localhost'
9+
port: 9123
10+
user: 'default'
11+
password: 'admin'
12+
13+
binlog_replicator:
14+
data_dir: '/app/binlog/'
15+
records_per_file: 100000
16+
binlog_retention_period: 43200 # 12 hours in seconds
17+
18+
databases: '*test*'
19+
log_level: 'debug'
20+
optimize_interval: 3
21+
check_db_updated_interval: 3
22+
23+
# This mapping is the key part that causes issues with schema evolution
24+
target_databases:
25+
replication-test_db: mapped_target_db
26+
27+
http_host: 'localhost'
28+
http_port: 9128

0 commit comments

Comments
 (0)