Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 144 additions & 194 deletions tests/test_data_types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import datetime
import json
import os
import tempfile
import uuid
import zoneinfo

import yaml

from common import *
from mysql_ch_replicator import clickhouse_api
from mysql_ch_replicator import config
Expand Down Expand Up @@ -390,108 +387,77 @@ def test_timezone_conversion():
Test that MySQL timestamp fields are converted to ClickHouse DateTime64 with custom timezone.
This test reproduces the issue from GitHub issue #170.
"""
# Create a temporary config file with custom timezone
config_content = """
mysql:
host: 'localhost'
port: 9306
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: '*test*'
log_level: 'debug'
mysql_timezone: 'America/New_York'
"""

# Create temporary config file
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(config_content)
temp_config_file = f.name
config_file = 'tests/tests_config_timezone.yaml'

try:
cfg = config.Settings()
cfg.load(temp_config_file)

# Verify timezone is loaded correctly
assert cfg.mysql_timezone == 'America/New_York'

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)
cfg = config.Settings()
cfg.load(config_file)

# Verify timezone is loaded correctly
assert cfg.mysql_timezone == 'America/New_York'

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)
ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

# Create table with timestamp fields
mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
created_at timestamp NULL,
updated_at timestamp(3) NULL,
PRIMARY KEY (id)
);
''')
prepare_env(cfg, mysql, ch)

# Insert test data with specific timestamp
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) "
f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');",
commit=True,
)
# Create table with timestamp fields
mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
id int NOT NULL AUTO_INCREMENT,
name varchar(255),
created_at timestamp NULL,
updated_at timestamp(3) NULL,
PRIMARY KEY (id)
);
''')

# Run replication
run_all_runner = RunAllRunner(cfg_file=temp_config_file)
run_all_runner.run()
# Insert test data with specific timestamp
mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) "
f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');",
commit=True,
)

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
# Run replication
run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

# Get the table structure from ClickHouse
table_info = ch.query(f'DESCRIBE `{TEST_TABLE_NAME}`')

# Check that timestamp fields are converted to DateTime64 with timezone
created_at_type = None
updated_at_type = None
for row in table_info.result_rows:
if row[0] == 'created_at':
created_at_type = row[1]
elif row[0] == 'updated_at':
updated_at_type = row[1]

# Verify the types include the timezone
assert created_at_type is not None
assert updated_at_type is not None
assert 'America/New_York' in created_at_type
assert 'America/New_York' in updated_at_type

# Verify data was inserted correctly
results = ch.select(TEST_TABLE_NAME)
assert len(results) == 1
assert results[0]['name'] == 'test_timezone'

run_all_runner.stop()

finally:
# Clean up temporary config file
os.unlink(temp_config_file)
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

# Get the table structure from ClickHouse
table_info = ch.query(f'DESCRIBE `{TEST_TABLE_NAME}`')

# Check that timestamp fields are converted to DateTime64 with timezone
created_at_type = None
updated_at_type = None
for row in table_info.result_rows:
if row[0] == 'created_at':
created_at_type = row[1]
elif row[0] == 'updated_at':
updated_at_type = row[1]

# Verify the types include the timezone
assert created_at_type is not None
assert updated_at_type is not None
assert 'America/New_York' in created_at_type
assert 'America/New_York' in updated_at_type

# Verify data was inserted correctly
results = ch.select(TEST_TABLE_NAME)
assert len(results) == 1
assert results[0]['name'] == 'test_timezone'

run_all_runner.stop()


def test_timezone_conversion_values():
Expand Down Expand Up @@ -908,115 +874,99 @@ def test_charset_configuration():
This test verifies that utf8mb4 charset can be configured to properly handle
4-byte Unicode characters in JSON fields.
"""
# Create a temporary config file with explicit charset configuration
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file:
config_file = temp_config_file.name

# Load base config and add charset setting
with open(CONFIG_FILE, 'r') as f:
base_config = yaml.safe_load(f)

# Ensure charset is set to utf8mb4
base_config['mysql']['charset'] = 'utf8mb4'

yaml.dump(base_config, temp_config_file)
config_file = 'tests/tests_config_charset.yaml'

cfg = config.Settings()
cfg.load(config_file)

# Verify charset is loaded correctly
assert hasattr(cfg.mysql, 'charset'), "MysqlSettings should have charset attribute"
assert cfg.mysql.charset == 'utf8mb4', f"Expected charset utf8mb4, got {cfg.mysql.charset}"

mysql = mysql_api.MySQLApi(None, cfg.mysql)
ch = clickhouse_api.ClickhouseApi(None, cfg.clickhouse)

prepare_env(cfg, mysql, ch)

mysql.database = TEST_DB_NAME
ch.database = TEST_DB_NAME

# Create table with JSON field
mysql.execute(f"""
CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} (
id INT AUTO_INCREMENT PRIMARY KEY,
json_data JSON
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""", commit=True)

# Insert data with 4-byte Unicode characters (emoji and Arabic text)
test_data = {
"ar": "مرحباً بالعالم", # Arabic: Hello World
"emoji": "🌍🎉✨",
"cn": "你好世界", # Chinese: Hello World
"en": "Hello World"
}

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)",
args=(json.dumps(test_data, ensure_ascii=False),),
commit=True
)

# Verify the data can be read back correctly
mysql.cursor.execute(f"SELECT json_data FROM {TEST_TABLE_NAME}")
result = mysql.cursor.fetchone()
assert result is not None, "Should have retrieved a record"

retrieved_data = json.loads(result[0]) if isinstance(result[0], str) else result[0]
assert retrieved_data['ar'] == test_data['ar'], f"Arabic text mismatch: {retrieved_data['ar']} != {test_data['ar']}"
assert retrieved_data['emoji'] == test_data['emoji'], f"Emoji mismatch: {retrieved_data['emoji']} != {test_data['emoji']}"
assert retrieved_data['cn'] == test_data['cn'], f"Chinese text mismatch: {retrieved_data['cn']} != {test_data['cn']}"

# Test binlog replication with charset
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()

try:
cfg = config.Settings()
cfg.load(config_file)

# Verify charset is loaded correctly
assert hasattr(cfg.mysql, 'charset'), "MysqlSettings should have charset attribute"
assert cfg.mysql.charset == 'utf8mb4', f"Expected charset utf8mb4, got {cfg.mysql.charset}"
# Start db replicator
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

mysql = mysql_api.MySQLApi(None, cfg.mysql)
ch = clickhouse_api.ClickhouseApi(None, cfg.clickhouse)
# Wait for database and table to be created in ClickHouse
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=20)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=20)

prepare_env(cfg, mysql, ch)
# Wait for replication
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, max_wait_time=20)

mysql.database = TEST_DB_NAME
ch.database = TEST_DB_NAME
# Verify data in ClickHouse
ch_records = ch.select(TEST_TABLE_NAME)
assert len(ch_records) == 1, f"Expected 1 record in ClickHouse, got {len(ch_records)}"

# Create table with JSON field
mysql.execute(f"""
CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} (
id INT AUTO_INCREMENT PRIMARY KEY,
json_data JSON
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""", commit=True)
# Access the json_data column using dictionary access
ch_record = ch_records[0]
ch_json_data = json.loads(ch_record['json_data']) if isinstance(ch_record['json_data'], str) else ch_record['json_data']

# Insert data with 4-byte Unicode characters (emoji and Arabic text)
test_data = {
"ar": "مرحباً بالعالم", # Arabic: Hello World
"emoji": "🌍🎉✨",
"cn": "你好世界", # Chinese: Hello World
"en": "Hello World"
}
# Verify Unicode characters are preserved correctly
assert ch_json_data['ar'] == test_data['ar'], f"Arabic text not preserved in CH: {ch_json_data.get('ar')}"
assert ch_json_data['emoji'] == test_data['emoji'], f"Emoji not preserved in CH: {ch_json_data.get('emoji')}"
assert ch_json_data['cn'] == test_data['cn'], f"Chinese text not preserved in CH: {ch_json_data.get('cn')}"

# Test realtime replication with more Unicode data
more_data = {"test": "🔥 Real-time 测试 اختبار"}
mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)",
args=(json.dumps(test_data, ensure_ascii=False),),
args=(json.dumps(more_data, ensure_ascii=False),),
commit=True
)

# Verify the data can be read back correctly
mysql.cursor.execute(f"SELECT json_data FROM {TEST_TABLE_NAME}")
result = mysql.cursor.fetchone()
assert result is not None, "Should have retrieved a record"

retrieved_data = json.loads(result[0]) if isinstance(result[0], str) else result[0]
assert retrieved_data['ar'] == test_data['ar'], f"Arabic text mismatch: {retrieved_data['ar']} != {test_data['ar']}"
assert retrieved_data['emoji'] == test_data['emoji'], f"Emoji mismatch: {retrieved_data['emoji']} != {test_data['emoji']}"
assert retrieved_data['cn'] == test_data['cn'], f"Chinese text mismatch: {retrieved_data['cn']} != {test_data['cn']}"
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2, max_wait_time=20)

# Test binlog replication with charset
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()
# Verify the second record
ch_records = ch.select(TEST_TABLE_NAME)
assert len(ch_records) == 2, f"Expected 2 records in ClickHouse, got {len(ch_records)}"

try:
# Start db replicator
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

# Wait for database and table to be created in ClickHouse
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=20)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=20)

# Wait for replication
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, max_wait_time=20)

# Verify data in ClickHouse
ch_records = ch.select(TEST_TABLE_NAME)
assert len(ch_records) == 1, f"Expected 1 record in ClickHouse, got {len(ch_records)}"

# Access the json_data column using dictionary access
ch_record = ch_records[0]
ch_json_data = json.loads(ch_record['json_data']) if isinstance(ch_record['json_data'], str) else ch_record['json_data']

# Verify Unicode characters are preserved correctly
assert ch_json_data['ar'] == test_data['ar'], f"Arabic text not preserved in CH: {ch_json_data.get('ar')}"
assert ch_json_data['emoji'] == test_data['emoji'], f"Emoji not preserved in CH: {ch_json_data.get('emoji')}"
assert ch_json_data['cn'] == test_data['cn'], f"Chinese text not preserved in CH: {ch_json_data.get('cn')}"

# Test realtime replication with more Unicode data
more_data = {"test": "🔥 Real-time 测试 اختبار"}
mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)",
args=(json.dumps(more_data, ensure_ascii=False),),
commit=True
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2, max_wait_time=20)

# Verify the second record
ch_records = ch.select(TEST_TABLE_NAME)
assert len(ch_records) == 2, f"Expected 2 records in ClickHouse, got {len(ch_records)}"

db_replicator_runner.stop()
finally:
binlog_replicator_runner.stop()

db_replicator_runner.stop()
finally:
# Clean up temp config file
os.unlink(config_file)
binlog_replicator_runner.stop()
Loading