|
1 | 1 | import datetime |
2 | 2 | import json |
3 | 3 | import os |
4 | | -import tempfile |
5 | 4 | import uuid |
6 | 5 | import zoneinfo |
7 | 6 |
|
8 | | -import yaml |
9 | | - |
10 | 7 | from common import * |
11 | 8 | from mysql_ch_replicator import clickhouse_api |
12 | 9 | from mysql_ch_replicator import config |
@@ -390,108 +387,77 @@ def test_timezone_conversion(): |
390 | 387 | Test that MySQL timestamp fields are converted to ClickHouse DateTime64 with custom timezone. |
391 | 388 | This test reproduces the issue from GitHub issue #170. |
392 | 389 | """ |
393 | | - # Create a temporary config file with custom timezone |
394 | | - config_content = """ |
395 | | -mysql: |
396 | | - host: 'localhost' |
397 | | - port: 9306 |
398 | | - user: 'root' |
399 | | - password: 'admin' |
400 | | -
|
401 | | -clickhouse: |
402 | | - host: 'localhost' |
403 | | - port: 9123 |
404 | | - user: 'default' |
405 | | - password: 'admin' |
406 | | -
|
407 | | -binlog_replicator: |
408 | | - data_dir: '/app/binlog/' |
409 | | - records_per_file: 100000 |
410 | | -
|
411 | | -databases: '*test*' |
412 | | -log_level: 'debug' |
413 | | -mysql_timezone: 'America/New_York' |
414 | | -""" |
415 | | - |
416 | | - # Create temporary config file |
417 | | - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: |
418 | | - f.write(config_content) |
419 | | - temp_config_file = f.name |
| 390 | + config_file = 'tests/tests_config_timezone.yaml' |
420 | 391 |
|
421 | | - try: |
422 | | - cfg = config.Settings() |
423 | | - cfg.load(temp_config_file) |
424 | | - |
425 | | - # Verify timezone is loaded correctly |
426 | | - assert cfg.mysql_timezone == 'America/New_York' |
427 | | - |
428 | | - mysql = mysql_api.MySQLApi( |
429 | | - database=None, |
430 | | - mysql_settings=cfg.mysql, |
431 | | - ) |
| 392 | + cfg = config.Settings() |
| 393 | + cfg.load(config_file) |
| 394 | + |
| 395 | + # Verify timezone is loaded correctly |
| 396 | + assert cfg.mysql_timezone == 'America/New_York' |
| 397 | + |
| 398 | + mysql = mysql_api.MySQLApi( |
| 399 | + database=None, |
| 400 | + mysql_settings=cfg.mysql, |
| 401 | + ) |
432 | 402 |
|
433 | | - ch = clickhouse_api.ClickhouseApi( |
434 | | - database=TEST_DB_NAME, |
435 | | - clickhouse_settings=cfg.clickhouse, |
436 | | - ) |
| 403 | + ch = clickhouse_api.ClickhouseApi( |
| 404 | + database=TEST_DB_NAME, |
| 405 | + clickhouse_settings=cfg.clickhouse, |
| 406 | + ) |
437 | 407 |
|
438 | | - prepare_env(cfg, mysql, ch) |
439 | | - |
440 | | - # Create table with timestamp fields |
441 | | - mysql.execute(f''' |
442 | | - CREATE TABLE `{TEST_TABLE_NAME}` ( |
443 | | - id int NOT NULL AUTO_INCREMENT, |
444 | | - name varchar(255), |
445 | | - created_at timestamp NULL, |
446 | | - updated_at timestamp(3) NULL, |
447 | | - PRIMARY KEY (id) |
448 | | - ); |
449 | | - ''') |
| 408 | + prepare_env(cfg, mysql, ch) |
450 | 409 |
|
451 | | - # Insert test data with specific timestamp |
452 | | - mysql.execute( |
453 | | - f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) " |
454 | | - f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');", |
455 | | - commit=True, |
456 | | - ) |
| 410 | + # Create table with timestamp fields |
| 411 | + mysql.execute(f''' |
| 412 | + CREATE TABLE `{TEST_TABLE_NAME}` ( |
| 413 | + id int NOT NULL AUTO_INCREMENT, |
| 414 | + name varchar(255), |
| 415 | + created_at timestamp NULL, |
| 416 | + updated_at timestamp(3) NULL, |
| 417 | + PRIMARY KEY (id) |
| 418 | + ); |
| 419 | + ''') |
457 | 420 |
|
458 | | - # Run replication |
459 | | - run_all_runner = RunAllRunner(cfg_file=temp_config_file) |
460 | | - run_all_runner.run() |
| 421 | + # Insert test data with specific timestamp |
| 422 | + mysql.execute( |
| 423 | + f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) " |
| 424 | + f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');", |
| 425 | + commit=True, |
| 426 | + ) |
461 | 427 |
|
462 | | - assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) |
463 | | - ch.execute_command(f'USE `{TEST_DB_NAME}`') |
464 | | - assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) |
465 | | - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) |
| 428 | + # Run replication |
| 429 | + run_all_runner = RunAllRunner(cfg_file=config_file) |
| 430 | + run_all_runner.run() |
466 | 431 |
|
467 | | - # Get the table structure from ClickHouse |
468 | | - table_info = ch.query(f'DESCRIBE `{TEST_TABLE_NAME}`') |
469 | | - |
470 | | - # Check that timestamp fields are converted to DateTime64 with timezone |
471 | | - created_at_type = None |
472 | | - updated_at_type = None |
473 | | - for row in table_info.result_rows: |
474 | | - if row[0] == 'created_at': |
475 | | - created_at_type = row[1] |
476 | | - elif row[0] == 'updated_at': |
477 | | - updated_at_type = row[1] |
478 | | - |
479 | | - # Verify the types include the timezone |
480 | | - assert created_at_type is not None |
481 | | - assert updated_at_type is not None |
482 | | - assert 'America/New_York' in created_at_type |
483 | | - assert 'America/New_York' in updated_at_type |
484 | | - |
485 | | - # Verify data was inserted correctly |
486 | | - results = ch.select(TEST_TABLE_NAME) |
487 | | - assert len(results) == 1 |
488 | | - assert results[0]['name'] == 'test_timezone' |
489 | | - |
490 | | - run_all_runner.stop() |
491 | | - |
492 | | - finally: |
493 | | - # Clean up temporary config file |
494 | | - os.unlink(temp_config_file) |
| 432 | + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) |
| 433 | + ch.execute_command(f'USE `{TEST_DB_NAME}`') |
| 434 | + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables()) |
| 435 | + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1) |
| 436 | + |
| 437 | + # Get the table structure from ClickHouse |
| 438 | + table_info = ch.query(f'DESCRIBE `{TEST_TABLE_NAME}`') |
| 439 | + |
| 440 | + # Check that timestamp fields are converted to DateTime64 with timezone |
| 441 | + created_at_type = None |
| 442 | + updated_at_type = None |
| 443 | + for row in table_info.result_rows: |
| 444 | + if row[0] == 'created_at': |
| 445 | + created_at_type = row[1] |
| 446 | + elif row[0] == 'updated_at': |
| 447 | + updated_at_type = row[1] |
| 448 | + |
| 449 | + # Verify the types include the timezone |
| 450 | + assert created_at_type is not None |
| 451 | + assert updated_at_type is not None |
| 452 | + assert 'America/New_York' in created_at_type |
| 453 | + assert 'America/New_York' in updated_at_type |
| 454 | + |
| 455 | + # Verify data was inserted correctly |
| 456 | + results = ch.select(TEST_TABLE_NAME) |
| 457 | + assert len(results) == 1 |
| 458 | + assert results[0]['name'] == 'test_timezone' |
| 459 | + |
| 460 | + run_all_runner.stop() |
495 | 461 |
|
496 | 462 |
|
497 | 463 | def test_timezone_conversion_values(): |
@@ -908,115 +874,99 @@ def test_charset_configuration(): |
908 | 874 | This test verifies that utf8mb4 charset can be configured to properly handle |
909 | 875 | 4-byte Unicode characters in JSON fields. |
910 | 876 | """ |
911 | | - # Create a temporary config file with explicit charset configuration |
912 | | - with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file: |
913 | | - config_file = temp_config_file.name |
914 | | - |
915 | | - # Load base config and add charset setting |
916 | | - with open(CONFIG_FILE, 'r') as f: |
917 | | - base_config = yaml.safe_load(f) |
918 | | - |
919 | | - # Ensure charset is set to utf8mb4 |
920 | | - base_config['mysql']['charset'] = 'utf8mb4' |
921 | | - |
922 | | - yaml.dump(base_config, temp_config_file) |
| 877 | + config_file = 'tests/tests_config_charset.yaml' |
| 878 | + |
| 879 | + cfg = config.Settings() |
| 880 | + cfg.load(config_file) |
| 881 | + |
| 882 | + # Verify charset is loaded correctly |
| 883 | + assert hasattr(cfg.mysql, 'charset'), "MysqlSettings should have charset attribute" |
| 884 | + assert cfg.mysql.charset == 'utf8mb4', f"Expected charset utf8mb4, got {cfg.mysql.charset}" |
| 885 | + |
| 886 | + mysql = mysql_api.MySQLApi(None, cfg.mysql) |
| 887 | + ch = clickhouse_api.ClickhouseApi(None, cfg.clickhouse) |
| 888 | + |
| 889 | + prepare_env(cfg, mysql, ch) |
| 890 | + |
| 891 | + mysql.database = TEST_DB_NAME |
| 892 | + ch.database = TEST_DB_NAME |
| 893 | + |
| 894 | + # Create table with JSON field |
| 895 | + mysql.execute(f""" |
| 896 | + CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} ( |
| 897 | + id INT AUTO_INCREMENT PRIMARY KEY, |
| 898 | + json_data JSON |
| 899 | + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |
| 900 | + """, commit=True) |
| 901 | + |
| 902 | + # Insert data with 4-byte Unicode characters (emoji and Arabic text) |
| 903 | + test_data = { |
| 904 | + "ar": "مرحباً بالعالم", # Arabic: Hello World |
| 905 | + "emoji": "🌍🎉✨", |
| 906 | + "cn": "你好世界", # Chinese: Hello World |
| 907 | + "en": "Hello World" |
| 908 | + } |
| 909 | + |
| 910 | + mysql.execute( |
| 911 | + f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)", |
| 912 | + args=(json.dumps(test_data, ensure_ascii=False),), |
| 913 | + commit=True |
| 914 | + ) |
| 915 | + |
| 916 | + # Verify the data can be read back correctly |
| 917 | + mysql.cursor.execute(f"SELECT json_data FROM {TEST_TABLE_NAME}") |
| 918 | + result = mysql.cursor.fetchone() |
| 919 | + assert result is not None, "Should have retrieved a record" |
| 920 | + |
| 921 | + retrieved_data = json.loads(result[0]) if isinstance(result[0], str) else result[0] |
| 922 | + assert retrieved_data['ar'] == test_data['ar'], f"Arabic text mismatch: {retrieved_data['ar']} != {test_data['ar']}" |
| 923 | + assert retrieved_data['emoji'] == test_data['emoji'], f"Emoji mismatch: {retrieved_data['emoji']} != {test_data['emoji']}" |
| 924 | + assert retrieved_data['cn'] == test_data['cn'], f"Chinese text mismatch: {retrieved_data['cn']} != {test_data['cn']}" |
| 925 | + |
| 926 | + # Test binlog replication with charset |
| 927 | + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) |
| 928 | + binlog_replicator_runner.run() |
923 | 929 |
|
924 | 930 | try: |
925 | | - cfg = config.Settings() |
926 | | - cfg.load(config_file) |
927 | | - |
928 | | - # Verify charset is loaded correctly |
929 | | - assert hasattr(cfg.mysql, 'charset'), "MysqlSettings should have charset attribute" |
930 | | - assert cfg.mysql.charset == 'utf8mb4', f"Expected charset utf8mb4, got {cfg.mysql.charset}" |
| 931 | + # Start db replicator |
| 932 | + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) |
| 933 | + db_replicator_runner.run() |
931 | 934 |
|
932 | | - mysql = mysql_api.MySQLApi(None, cfg.mysql) |
933 | | - ch = clickhouse_api.ClickhouseApi(None, cfg.clickhouse) |
| 935 | + # Wait for database and table to be created in ClickHouse |
| 936 | + assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=20) |
| 937 | + ch.execute_command(f'USE `{TEST_DB_NAME}`') |
| 938 | + assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=20) |
934 | 939 |
|
935 | | - prepare_env(cfg, mysql, ch) |
| 940 | + # Wait for replication |
| 941 | + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, max_wait_time=20) |
936 | 942 |
|
937 | | - mysql.database = TEST_DB_NAME |
938 | | - ch.database = TEST_DB_NAME |
| 943 | + # Verify data in ClickHouse |
| 944 | + ch_records = ch.select(TEST_TABLE_NAME) |
| 945 | + assert len(ch_records) == 1, f"Expected 1 record in ClickHouse, got {len(ch_records)}" |
939 | 946 |
|
940 | | - # Create table with JSON field |
941 | | - mysql.execute(f""" |
942 | | - CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} ( |
943 | | - id INT AUTO_INCREMENT PRIMARY KEY, |
944 | | - json_data JSON |
945 | | - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |
946 | | - """, commit=True) |
| 947 | + # Access the json_data column using dictionary access |
| 948 | + ch_record = ch_records[0] |
| 949 | + ch_json_data = json.loads(ch_record['json_data']) if isinstance(ch_record['json_data'], str) else ch_record['json_data'] |
947 | 950 |
|
948 | | - # Insert data with 4-byte Unicode characters (emoji and Arabic text) |
949 | | - test_data = { |
950 | | - "ar": "مرحباً بالعالم", # Arabic: Hello World |
951 | | - "emoji": "🌍🎉✨", |
952 | | - "cn": "你好世界", # Chinese: Hello World |
953 | | - "en": "Hello World" |
954 | | - } |
| 951 | + # Verify Unicode characters are preserved correctly |
| 952 | + assert ch_json_data['ar'] == test_data['ar'], f"Arabic text not preserved in CH: {ch_json_data.get('ar')}" |
| 953 | + assert ch_json_data['emoji'] == test_data['emoji'], f"Emoji not preserved in CH: {ch_json_data.get('emoji')}" |
| 954 | + assert ch_json_data['cn'] == test_data['cn'], f"Chinese text not preserved in CH: {ch_json_data.get('cn')}" |
955 | 955 |
|
| 956 | + # Test realtime replication with more Unicode data |
| 957 | + more_data = {"test": "🔥 Real-time 测试 اختبار"} |
956 | 958 | mysql.execute( |
957 | 959 | f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)", |
958 | | - args=(json.dumps(test_data, ensure_ascii=False),), |
| 960 | + args=(json.dumps(more_data, ensure_ascii=False),), |
959 | 961 | commit=True |
960 | 962 | ) |
961 | 963 |
|
962 | | - # Verify the data can be read back correctly |
963 | | - mysql.cursor.execute(f"SELECT json_data FROM {TEST_TABLE_NAME}") |
964 | | - result = mysql.cursor.fetchone() |
965 | | - assert result is not None, "Should have retrieved a record" |
966 | | - |
967 | | - retrieved_data = json.loads(result[0]) if isinstance(result[0], str) else result[0] |
968 | | - assert retrieved_data['ar'] == test_data['ar'], f"Arabic text mismatch: {retrieved_data['ar']} != {test_data['ar']}" |
969 | | - assert retrieved_data['emoji'] == test_data['emoji'], f"Emoji mismatch: {retrieved_data['emoji']} != {test_data['emoji']}" |
970 | | - assert retrieved_data['cn'] == test_data['cn'], f"Chinese text mismatch: {retrieved_data['cn']} != {test_data['cn']}" |
| 964 | + assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2, max_wait_time=20) |
971 | 965 |
|
972 | | - # Test binlog replication with charset |
973 | | - binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) |
974 | | - binlog_replicator_runner.run() |
| 966 | + # Verify the second record |
| 967 | + ch_records = ch.select(TEST_TABLE_NAME) |
| 968 | + assert len(ch_records) == 2, f"Expected 2 records in ClickHouse, got {len(ch_records)}" |
975 | 969 |
|
976 | | - try: |
977 | | - # Start db replicator |
978 | | - db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) |
979 | | - db_replicator_runner.run() |
980 | | - |
981 | | - # Wait for database and table to be created in ClickHouse |
982 | | - assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=20) |
983 | | - ch.execute_command(f'USE `{TEST_DB_NAME}`') |
984 | | - assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=20) |
985 | | - |
986 | | - # Wait for replication |
987 | | - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, max_wait_time=20) |
988 | | - |
989 | | - # Verify data in ClickHouse |
990 | | - ch_records = ch.select(TEST_TABLE_NAME) |
991 | | - assert len(ch_records) == 1, f"Expected 1 record in ClickHouse, got {len(ch_records)}" |
992 | | - |
993 | | - # Access the json_data column using dictionary access |
994 | | - ch_record = ch_records[0] |
995 | | - ch_json_data = json.loads(ch_record['json_data']) if isinstance(ch_record['json_data'], str) else ch_record['json_data'] |
996 | | - |
997 | | - # Verify Unicode characters are preserved correctly |
998 | | - assert ch_json_data['ar'] == test_data['ar'], f"Arabic text not preserved in CH: {ch_json_data.get('ar')}" |
999 | | - assert ch_json_data['emoji'] == test_data['emoji'], f"Emoji not preserved in CH: {ch_json_data.get('emoji')}" |
1000 | | - assert ch_json_data['cn'] == test_data['cn'], f"Chinese text not preserved in CH: {ch_json_data.get('cn')}" |
1001 | | - |
1002 | | - # Test realtime replication with more Unicode data |
1003 | | - more_data = {"test": "🔥 Real-time 测试 اختبار"} |
1004 | | - mysql.execute( |
1005 | | - f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)", |
1006 | | - args=(json.dumps(more_data, ensure_ascii=False),), |
1007 | | - commit=True |
1008 | | - ) |
1009 | | - |
1010 | | - assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2, max_wait_time=20) |
1011 | | - |
1012 | | - # Verify the second record |
1013 | | - ch_records = ch.select(TEST_TABLE_NAME) |
1014 | | - assert len(ch_records) == 2, f"Expected 2 records in ClickHouse, got {len(ch_records)}" |
1015 | | - |
1016 | | - db_replicator_runner.stop() |
1017 | | - finally: |
1018 | | - binlog_replicator_runner.stop() |
1019 | | - |
| 970 | + db_replicator_runner.stop() |
1020 | 971 | finally: |
1021 | | - # Clean up temp config file |
1022 | | - os.unlink(config_file) |
| 972 | + binlog_replicator_runner.stop() |
0 commit comments