Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sqlparse
import re
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList
import copy

from .table_structure import TableStructure, TableField
from .enum import (
Expand Down Expand Up @@ -735,7 +736,88 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
self.db_replicator.clickhouse_api.execute_command(query)

def _handle_create_table_like(self, create_statement, source_table_name, target_table_name, is_query_api=True):
"""
Helper method to handle CREATE TABLE LIKE statements.

Args:
create_statement: The original CREATE TABLE LIKE statement
source_table_name: Name of the source table being copied
target_table_name: Name of the new table being created
is_query_api: If True, returns both MySQL and CH structures; if False, returns only MySQL structure

Returns:
Either (mysql_structure, ch_structure) if is_query_api=True, or just mysql_structure otherwise
"""
# Try to get the actual structure from the existing table structures first
if (hasattr(self, 'db_replicator') and
self.db_replicator is not None and
hasattr(self.db_replicator, 'state') and
hasattr(self.db_replicator.state, 'tables_structure')):

# Check if the source table structure is already in our state
if source_table_name in self.db_replicator.state.tables_structure:
# Get the existing structure
source_mysql_structure, source_ch_structure = self.db_replicator.state.tables_structure[source_table_name]

# Create a new structure with the target table name
new_mysql_structure = copy.deepcopy(source_mysql_structure)
new_mysql_structure.table_name = target_table_name

# Convert to ClickHouse structure
new_ch_structure = copy.deepcopy(source_ch_structure)
new_ch_structure.table_name = target_table_name

return (new_mysql_structure, new_ch_structure) if is_query_api else new_mysql_structure

# If we couldn't get it from state, try with MySQL API
if (hasattr(self, 'db_replicator') and
self.db_replicator is not None and
hasattr(self.db_replicator, 'mysql_api') and
self.db_replicator.mysql_api is not None):

try:
# Get the CREATE statement for the source table
source_create_statement = self.db_replicator.mysql_api.get_table_create_statement(source_table_name)

# Parse the source table structure
source_structure = self.parse_mysql_table_structure(source_create_statement)

# Copy the structure but keep the new table name
mysql_structure = copy.deepcopy(source_structure)
mysql_structure.table_name = target_table_name

if is_query_api:
# Convert to ClickHouse structure
ch_structure = self.convert_table_structure(mysql_structure)
return mysql_structure, ch_structure
else:
return mysql_structure

except Exception as e:
error_msg = f"Could not get source table structure for LIKE statement: {str(e)}"
print(f"Error: {error_msg}")
raise Exception(error_msg, create_statement)

# If we got here, we couldn't determine the structure
raise Exception(f"Could not determine structure for source table '{source_table_name}' in LIKE statement", create_statement)

def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]:
# Special handling for CREATE TABLE LIKE statements
if 'LIKE' in mysql_query.upper():
# Check if this is a CREATE TABLE LIKE statement using regex
create_like_pattern = r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?[`"]?([^`"\s]+)[`"]?\s+LIKE\s+[`"]?([^`"\s]+)[`"]?'
match = re.search(create_like_pattern, mysql_query, re.IGNORECASE)

if match:
# This is a CREATE TABLE LIKE statement
new_table_name = match.group(1).strip('`"')
source_table_name = match.group(2).strip('`"')

# Use the common helper method to handle the LIKE statement
return self._handle_create_table_like(mysql_query, source_table_name, new_table_name, True)

# Regular parsing for non-LIKE statements
mysql_table_structure = self.parse_mysql_table_structure(mysql_query)
ch_table_structure = self.convert_table_structure(mysql_table_structure)
return mysql_table_structure, ch_table_structure
Expand Down Expand Up @@ -779,6 +861,18 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
# style `<dbname>.<tablename>`
structure.table_name = strip_sql_name(tokens[2].get_real_name())

# Handle CREATE TABLE ... LIKE statements
if len(tokens) > 4 and tokens[3].normalized.upper() == 'LIKE':
# Extract the source table name
if not isinstance(tokens[4], sqlparse.sql.Identifier):
raise Exception('wrong create statement', create_statement)

source_table_name = strip_sql_name(tokens[4].get_real_name())
target_table_name = strip_sql_name(tokens[2].get_real_name())

# Use the common helper method to handle the LIKE statement
return self._handle_create_table_like(create_statement, source_table_name, target_table_name, False)

if not isinstance(tokens[3], sqlparse.sql.Parenthesis):
raise Exception('wrong create statement', create_statement)

Expand Down
155 changes: 155 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1657,3 +1657,158 @@ def test_enum_conversion():
])
def test_parse_db_name_from_query(query, expected):
assert BinlogReplicator._try_parse_db_name_from_query(query) == expected


def test_create_table_like():
"""
Test that CREATE TABLE ... LIKE statements are handled correctly.
The test creates a source table, then creates another table using LIKE,
and verifies that both tables have the same structure in ClickHouse.
"""
config_file = CONFIG_FILE
cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)
mysql.set_database(TEST_DB_NAME)

# Create the source table with a complex structure
mysql.execute(f'''
CREATE TABLE `source_table` (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
age INT UNSIGNED,
email VARCHAR(100) UNIQUE,
status ENUM('active','inactive','pending') DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
data JSON,
PRIMARY KEY (id)
);
''')

# Get the CREATE statement for the source table
source_create = mysql.get_table_create_statement('source_table')

# Create a table using LIKE statement
mysql.execute(f'''
CREATE TABLE `derived_table` LIKE `source_table`;
''')

# Set up replication
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

# Wait for database to be created and renamed from tmp to final
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=10.0)

# Use the correct database explicitly
ch.execute_command(f'USE `{TEST_DB_NAME}`')

# Wait for tables to be created in ClickHouse with a longer timeout
assert_wait(lambda: 'source_table' in ch.get_tables(), max_wait_time=10.0)
assert_wait(lambda: 'derived_table' in ch.get_tables(), max_wait_time=10.0)

# Insert data into both tables to verify they work
mysql.execute("INSERT INTO `source_table` (name, age, email, status) VALUES ('Alice', 30, '[email protected]', 'active');", commit=True)
mysql.execute("INSERT INTO `derived_table` (name, age, email, status) VALUES ('Bob', 25, '[email protected]', 'pending');", commit=True)

# Wait for data to be replicated
assert_wait(lambda: len(ch.select('source_table')) == 1, max_wait_time=10.0)
assert_wait(lambda: len(ch.select('derived_table')) == 1, max_wait_time=10.0)

# Compare structures by reading descriptions in ClickHouse
source_desc = ch.execute_command("DESCRIBE TABLE source_table")
derived_desc = ch.execute_command("DESCRIBE TABLE derived_table")

# The structures should be identical
assert source_desc == derived_desc

# Verify the data in both tables
source_data = ch.select('source_table')[0]
derived_data = ch.select('derived_table')[0]

assert source_data['name'] == 'Alice'
assert derived_data['name'] == 'Bob'

# Both tables should have same column types
assert type(source_data['id']) == type(derived_data['id'])
assert type(source_data['name']) == type(derived_data['name'])
assert type(source_data['age']) == type(derived_data['age'])

# Now test realtime replication by creating a new table after the initial replication
mysql.execute(f'''
CREATE TABLE `realtime_table` (
id INT NOT NULL AUTO_INCREMENT,
title VARCHAR(100) NOT NULL,
description TEXT,
price DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
''')

# Wait for the new table to be created in ClickHouse
assert_wait(lambda: 'realtime_table' in ch.get_tables(), max_wait_time=10.0)

# Insert data into the new table
mysql.execute("""
INSERT INTO `realtime_table` (title, description, price) VALUES
('Product 1', 'First product description', 19.99),
('Product 2', 'Second product description', 29.99),
('Product 3', 'Third product description', 39.99);
""", commit=True)

# Wait for data to be replicated
assert_wait(lambda: len(ch.select('realtime_table')) == 3, max_wait_time=10.0)

# Verify the data in the realtime table
realtime_data = ch.select('realtime_table')
assert len(realtime_data) == 3

# Verify specific values
products = sorted([record['title'] for record in realtime_data])
assert products == ['Product 1', 'Product 2', 'Product 3']

prices = sorted([float(record['price']) for record in realtime_data])
assert prices == [19.99, 29.99, 39.99]

# Now create another table using LIKE after initial replication
mysql.execute(f'''
CREATE TABLE `realtime_like_table` LIKE `realtime_table`;
''')

# Wait for the new LIKE table to be created in ClickHouse
assert_wait(lambda: 'realtime_like_table' in ch.get_tables(), max_wait_time=10.0)

# Insert data into the new LIKE table
mysql.execute("""
INSERT INTO `realtime_like_table` (title, description, price) VALUES
('Service A', 'Premium service', 99.99),
('Service B', 'Standard service', 49.99);
""", commit=True)

# Wait for data to be replicated
assert_wait(lambda: len(ch.select('realtime_like_table')) == 2, max_wait_time=10.0)

# Verify the data in the realtime LIKE table
like_data = ch.select('realtime_like_table')
assert len(like_data) == 2

services = sorted([record['title'] for record in like_data])
assert services == ['Service A', 'Service B']

# Clean up
db_replicator_runner.stop()
binlog_replicator_runner.stop()