Skip to content

Commit ebc9eec

Browse files
authored
Handling create table LIKE another_table (#127)
1 parent 3021dd6 commit ebc9eec

File tree

2 files changed

+249
-0
lines changed

2 files changed

+249
-0
lines changed

mysql_ch_replicator/converter.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sqlparse
55
import re
66
from pyparsing import Suppress, CaselessKeyword, Word, alphas, alphanums, delimitedList
7+
import copy
78

89
from .table_structure import TableStructure, TableField
910
from .enum import (
@@ -735,7 +736,88 @@ def __convert_alter_table_change_column(self, db_name, table_name, tokens):
735736
query = f'ALTER TABLE `{db_name}`.`{table_name}` RENAME COLUMN {column_name} TO {new_column_name}'
736737
self.db_replicator.clickhouse_api.execute_command(query)
737738

739+
def _handle_create_table_like(self, create_statement, source_table_name, target_table_name, is_query_api=True):
740+
"""
741+
Helper method to handle CREATE TABLE LIKE statements.
742+
743+
Args:
744+
create_statement: The original CREATE TABLE LIKE statement
745+
source_table_name: Name of the source table being copied
746+
target_table_name: Name of the new table being created
747+
is_query_api: If True, returns both MySQL and CH structures; if False, returns only MySQL structure
748+
749+
Returns:
750+
Either (mysql_structure, ch_structure) if is_query_api=True, or just mysql_structure otherwise
751+
"""
752+
# Try to get the actual structure from the existing table structures first
753+
if (hasattr(self, 'db_replicator') and
754+
self.db_replicator is not None and
755+
hasattr(self.db_replicator, 'state') and
756+
hasattr(self.db_replicator.state, 'tables_structure')):
757+
758+
# Check if the source table structure is already in our state
759+
if source_table_name in self.db_replicator.state.tables_structure:
760+
# Get the existing structure
761+
source_mysql_structure, source_ch_structure = self.db_replicator.state.tables_structure[source_table_name]
762+
763+
# Create a new structure with the target table name
764+
new_mysql_structure = copy.deepcopy(source_mysql_structure)
765+
new_mysql_structure.table_name = target_table_name
766+
767+
# Convert to ClickHouse structure
768+
new_ch_structure = copy.deepcopy(source_ch_structure)
769+
new_ch_structure.table_name = target_table_name
770+
771+
return (new_mysql_structure, new_ch_structure) if is_query_api else new_mysql_structure
772+
773+
# If we couldn't get it from state, try with MySQL API
774+
if (hasattr(self, 'db_replicator') and
775+
self.db_replicator is not None and
776+
hasattr(self.db_replicator, 'mysql_api') and
777+
self.db_replicator.mysql_api is not None):
778+
779+
try:
780+
# Get the CREATE statement for the source table
781+
source_create_statement = self.db_replicator.mysql_api.get_table_create_statement(source_table_name)
782+
783+
# Parse the source table structure
784+
source_structure = self.parse_mysql_table_structure(source_create_statement)
785+
786+
# Copy the structure but keep the new table name
787+
mysql_structure = copy.deepcopy(source_structure)
788+
mysql_structure.table_name = target_table_name
789+
790+
if is_query_api:
791+
# Convert to ClickHouse structure
792+
ch_structure = self.convert_table_structure(mysql_structure)
793+
return mysql_structure, ch_structure
794+
else:
795+
return mysql_structure
796+
797+
except Exception as e:
798+
error_msg = f"Could not get source table structure for LIKE statement: {str(e)}"
799+
print(f"Error: {error_msg}")
800+
raise Exception(error_msg, create_statement)
801+
802+
# If we got here, we couldn't determine the structure
803+
raise Exception(f"Could not determine structure for source table '{source_table_name}' in LIKE statement", create_statement)
804+
738805
def parse_create_table_query(self, mysql_query) -> tuple[TableStructure, TableStructure]:
806+
# Special handling for CREATE TABLE LIKE statements
807+
if 'LIKE' in mysql_query.upper():
808+
# Check if this is a CREATE TABLE LIKE statement using regex
809+
create_like_pattern = r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?[`"]?([^`"\s]+)[`"]?\s+LIKE\s+[`"]?([^`"\s]+)[`"]?'
810+
match = re.search(create_like_pattern, mysql_query, re.IGNORECASE)
811+
812+
if match:
813+
# This is a CREATE TABLE LIKE statement
814+
new_table_name = match.group(1).strip('`"')
815+
source_table_name = match.group(2).strip('`"')
816+
817+
# Use the common helper method to handle the LIKE statement
818+
return self._handle_create_table_like(mysql_query, source_table_name, new_table_name, True)
819+
820+
# Regular parsing for non-LIKE statements
739821
mysql_table_structure = self.parse_mysql_table_structure(mysql_query)
740822
ch_table_structure = self.convert_table_structure(mysql_table_structure)
741823
return mysql_table_structure, ch_table_structure
@@ -779,6 +861,18 @@ def parse_mysql_table_structure(self, create_statement, required_table_name=None
779861
# style `<dbname>.<tablename>`
780862
structure.table_name = strip_sql_name(tokens[2].get_real_name())
781863

864+
# Handle CREATE TABLE ... LIKE statements
865+
if len(tokens) > 4 and tokens[3].normalized.upper() == 'LIKE':
866+
# Extract the source table name
867+
if not isinstance(tokens[4], sqlparse.sql.Identifier):
868+
raise Exception('wrong create statement', create_statement)
869+
870+
source_table_name = strip_sql_name(tokens[4].get_real_name())
871+
target_table_name = strip_sql_name(tokens[2].get_real_name())
872+
873+
# Use the common helper method to handle the LIKE statement
874+
return self._handle_create_table_like(create_statement, source_table_name, target_table_name, False)
875+
782876
if not isinstance(tokens[3], sqlparse.sql.Parenthesis):
783877
raise Exception('wrong create statement', create_statement)
784878

test_mysql_ch_replicator.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,3 +1657,158 @@ def test_enum_conversion():
16571657
])
16581658
def test_parse_db_name_from_query(query, expected):
16591659
assert BinlogReplicator._try_parse_db_name_from_query(query) == expected
1660+
1661+
1662+
def test_create_table_like():
1663+
"""
1664+
Test that CREATE TABLE ... LIKE statements are handled correctly.
1665+
The test creates a source table, then creates another table using LIKE,
1666+
and verifies that both tables have the same structure in ClickHouse.
1667+
"""
1668+
config_file = CONFIG_FILE
1669+
cfg = config.Settings()
1670+
cfg.load(config_file)
1671+
1672+
mysql = mysql_api.MySQLApi(
1673+
database=None,
1674+
mysql_settings=cfg.mysql,
1675+
)
1676+
1677+
ch = clickhouse_api.ClickhouseApi(
1678+
database=TEST_DB_NAME,
1679+
clickhouse_settings=cfg.clickhouse,
1680+
)
1681+
1682+
prepare_env(cfg, mysql, ch)
1683+
mysql.set_database(TEST_DB_NAME)
1684+
1685+
# Create the source table with a complex structure
1686+
mysql.execute(f'''
1687+
CREATE TABLE `source_table` (
1688+
id INT NOT NULL AUTO_INCREMENT,
1689+
name VARCHAR(255) NOT NULL,
1690+
age INT UNSIGNED,
1691+
email VARCHAR(100) UNIQUE,
1692+
status ENUM('active','inactive','pending') DEFAULT 'active',
1693+
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
1694+
data JSON,
1695+
PRIMARY KEY (id)
1696+
);
1697+
''')
1698+
1699+
# Get the CREATE statement for the source table
1700+
source_create = mysql.get_table_create_statement('source_table')
1701+
1702+
# Create a table using LIKE statement
1703+
mysql.execute(f'''
1704+
CREATE TABLE `derived_table` LIKE `source_table`;
1705+
''')
1706+
1707+
# Set up replication
1708+
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
1709+
binlog_replicator_runner.run()
1710+
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
1711+
db_replicator_runner.run()
1712+
1713+
# Wait for database to be created and renamed from tmp to final
1714+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=10.0)
1715+
1716+
# Use the correct database explicitly
1717+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
1718+
1719+
# Wait for tables to be created in ClickHouse with a longer timeout
1720+
assert_wait(lambda: 'source_table' in ch.get_tables(), max_wait_time=10.0)
1721+
assert_wait(lambda: 'derived_table' in ch.get_tables(), max_wait_time=10.0)
1722+
1723+
# Insert data into both tables to verify they work
1724+
mysql.execute("INSERT INTO `source_table` (name, age, email, status) VALUES ('Alice', 30, '[email protected]', 'active');", commit=True)
1725+
mysql.execute("INSERT INTO `derived_table` (name, age, email, status) VALUES ('Bob', 25, '[email protected]', 'pending');", commit=True)
1726+
1727+
# Wait for data to be replicated
1728+
assert_wait(lambda: len(ch.select('source_table')) == 1, max_wait_time=10.0)
1729+
assert_wait(lambda: len(ch.select('derived_table')) == 1, max_wait_time=10.0)
1730+
1731+
# Compare structures by reading descriptions in ClickHouse
1732+
source_desc = ch.execute_command("DESCRIBE TABLE source_table")
1733+
derived_desc = ch.execute_command("DESCRIBE TABLE derived_table")
1734+
1735+
# The structures should be identical
1736+
assert source_desc == derived_desc
1737+
1738+
# Verify the data in both tables
1739+
source_data = ch.select('source_table')[0]
1740+
derived_data = ch.select('derived_table')[0]
1741+
1742+
assert source_data['name'] == 'Alice'
1743+
assert derived_data['name'] == 'Bob'
1744+
1745+
# Both tables should have same column types
1746+
assert type(source_data['id']) == type(derived_data['id'])
1747+
assert type(source_data['name']) == type(derived_data['name'])
1748+
assert type(source_data['age']) == type(derived_data['age'])
1749+
1750+
# Now test realtime replication by creating a new table after the initial replication
1751+
mysql.execute(f'''
1752+
CREATE TABLE `realtime_table` (
1753+
id INT NOT NULL AUTO_INCREMENT,
1754+
title VARCHAR(100) NOT NULL,
1755+
description TEXT,
1756+
price DECIMAL(10,2),
1757+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
1758+
PRIMARY KEY (id)
1759+
);
1760+
''')
1761+
1762+
# Wait for the new table to be created in ClickHouse
1763+
assert_wait(lambda: 'realtime_table' in ch.get_tables(), max_wait_time=10.0)
1764+
1765+
# Insert data into the new table
1766+
mysql.execute("""
1767+
INSERT INTO `realtime_table` (title, description, price) VALUES
1768+
('Product 1', 'First product description', 19.99),
1769+
('Product 2', 'Second product description', 29.99),
1770+
('Product 3', 'Third product description', 39.99);
1771+
""", commit=True)
1772+
1773+
# Wait for data to be replicated
1774+
assert_wait(lambda: len(ch.select('realtime_table')) == 3, max_wait_time=10.0)
1775+
1776+
# Verify the data in the realtime table
1777+
realtime_data = ch.select('realtime_table')
1778+
assert len(realtime_data) == 3
1779+
1780+
# Verify specific values
1781+
products = sorted([record['title'] for record in realtime_data])
1782+
assert products == ['Product 1', 'Product 2', 'Product 3']
1783+
1784+
prices = sorted([float(record['price']) for record in realtime_data])
1785+
assert prices == [19.99, 29.99, 39.99]
1786+
1787+
# Now create another table using LIKE after initial replication
1788+
mysql.execute(f'''
1789+
CREATE TABLE `realtime_like_table` LIKE `realtime_table`;
1790+
''')
1791+
1792+
# Wait for the new LIKE table to be created in ClickHouse
1793+
assert_wait(lambda: 'realtime_like_table' in ch.get_tables(), max_wait_time=10.0)
1794+
1795+
# Insert data into the new LIKE table
1796+
mysql.execute("""
1797+
INSERT INTO `realtime_like_table` (title, description, price) VALUES
1798+
('Service A', 'Premium service', 99.99),
1799+
('Service B', 'Standard service', 49.99);
1800+
""", commit=True)
1801+
1802+
# Wait for data to be replicated
1803+
assert_wait(lambda: len(ch.select('realtime_like_table')) == 2, max_wait_time=10.0)
1804+
1805+
# Verify the data in the realtime LIKE table
1806+
like_data = ch.select('realtime_like_table')
1807+
assert len(like_data) == 2
1808+
1809+
services = sorted([record['title'] for record in like_data])
1810+
assert services == ['Service A', 'Service B']
1811+
1812+
# Clean up
1813+
db_replicator_runner.stop()
1814+
binlog_replicator_runner.stop()

0 commit comments

Comments
 (0)