@@ -3084,6 +3084,126 @@ def test_resume_initial_replication_with_ignore_deletes():
30843084 os .unlink (config_file )
30853085
30863086
3087+ def test_charset_configuration ():
3088+ """
3089+ Test that charset configuration is properly loaded and used for MySQL connections.
3090+ This test verifies that utf8mb4 charset can be configured to properly handle
3091+ 4-byte Unicode characters in JSON fields.
3092+ """
3093+ # Create a temporary config file with explicit charset configuration
3094+ with tempfile .NamedTemporaryFile (mode = 'w' , suffix = '.yaml' , delete = False ) as temp_config_file :
3095+ config_file = temp_config_file .name
3096+
3097+ # Load base config and add charset setting
3098+ with open (CONFIG_FILE , 'r' ) as f :
3099+ base_config = yaml .safe_load (f )
3100+
3101+ # Ensure charset is set to utf8mb4
3102+ base_config ['mysql' ]['charset' ] = 'utf8mb4'
3103+
3104+ yaml .dump (base_config , temp_config_file )
3105+
3106+ try :
3107+ cfg = config .Settings ()
3108+ cfg .load (config_file )
3109+
3110+ # Verify charset is loaded correctly
3111+ assert hasattr (cfg .mysql , 'charset' ), "MysqlSettings should have charset attribute"
3112+ assert cfg .mysql .charset == 'utf8mb4' , f"Expected charset utf8mb4, got { cfg .mysql .charset } "
3113+
3114+ mysql = mysql_api .MySQLApi (None , cfg .mysql )
3115+ ch = clickhouse_api .ClickhouseApi (None , cfg .clickhouse )
3116+
3117+ prepare_env (cfg , mysql , ch )
3118+
3119+ mysql .database = TEST_DB_NAME
3120+ ch .database = TEST_DB_NAME
3121+
3122+ # Create table with JSON field
3123+ mysql .execute (f"""
3124+ CREATE TABLE IF NOT EXISTS { TEST_TABLE_NAME } (
3125+ id INT AUTO_INCREMENT PRIMARY KEY,
3126+ json_data JSON
3127+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
3128+ """ , commit = True )
3129+
3130+ # Insert data with 4-byte Unicode characters (emoji and Arabic text)
3131+ test_data = {
3132+ "ar" : "مرحباً بالعالم" , # Arabic: Hello World
3133+ "emoji" : "🌍🎉✨" ,
3134+ "cn" : "你好世界" , # Chinese: Hello World
3135+ "en" : "Hello World"
3136+ }
3137+
3138+ mysql .execute (
3139+ f"INSERT INTO { TEST_TABLE_NAME } (json_data) VALUES (%s)" ,
3140+ args = (json .dumps (test_data , ensure_ascii = False ),),
3141+ commit = True
3142+ )
3143+
3144+ # Verify the data can be read back correctly
3145+ mysql .cursor .execute (f"SELECT json_data FROM { TEST_TABLE_NAME } " )
3146+ result = mysql .cursor .fetchone ()
3147+ assert result is not None , "Should have retrieved a record"
3148+
3149+ retrieved_data = json .loads (result [0 ]) if isinstance (result [0 ], str ) else result [0 ]
3150+ assert retrieved_data ['ar' ] == test_data ['ar' ], f"Arabic text mismatch: { retrieved_data ['ar' ]} != { test_data ['ar' ]} "
3151+ assert retrieved_data ['emoji' ] == test_data ['emoji' ], f"Emoji mismatch: { retrieved_data ['emoji' ]} != { test_data ['emoji' ]} "
3152+ assert retrieved_data ['cn' ] == test_data ['cn' ], f"Chinese text mismatch: { retrieved_data ['cn' ]} != { test_data ['cn' ]} "
3153+
3154+ # Test binlog replication with charset
3155+ binlog_replicator_runner = BinlogReplicatorRunner (cfg_file = config_file )
3156+ binlog_replicator_runner .run ()
3157+
3158+ try :
3159+ # Start db replicator
3160+ db_replicator_runner = DbReplicatorRunner (TEST_DB_NAME , cfg_file = config_file )
3161+ db_replicator_runner .run ()
3162+
3163+ # Wait for database and table to be created in ClickHouse
3164+ assert_wait (lambda : TEST_DB_NAME in ch .get_databases (), max_wait_time = 20 )
3165+ ch .execute_command (f'USE `{ TEST_DB_NAME } `' )
3166+ assert_wait (lambda : TEST_TABLE_NAME in ch .get_tables (), max_wait_time = 20 )
3167+
3168+ # Wait for replication
3169+ assert_wait (lambda : len (ch .select (TEST_TABLE_NAME )) == 1 , max_wait_time = 20 )
3170+
3171+ # Verify data in ClickHouse
3172+ ch_records = ch .select (TEST_TABLE_NAME )
3173+ assert len (ch_records ) == 1 , f"Expected 1 record in ClickHouse, got { len (ch_records )} "
3174+
3175+ # Access the json_data column using dictionary access
3176+ ch_record = ch_records [0 ]
3177+ ch_json_data = json .loads (ch_record ['json_data' ]) if isinstance (ch_record ['json_data' ], str ) else ch_record ['json_data' ]
3178+
3179+ # Verify Unicode characters are preserved correctly
3180+ assert ch_json_data ['ar' ] == test_data ['ar' ], f"Arabic text not preserved in CH: { ch_json_data .get ('ar' )} "
3181+ assert ch_json_data ['emoji' ] == test_data ['emoji' ], f"Emoji not preserved in CH: { ch_json_data .get ('emoji' )} "
3182+ assert ch_json_data ['cn' ] == test_data ['cn' ], f"Chinese text not preserved in CH: { ch_json_data .get ('cn' )} "
3183+
3184+ # Test realtime replication with more Unicode data
3185+ more_data = {"test" : "🔥 Real-time 测试 اختبار" }
3186+ mysql .execute (
3187+ f"INSERT INTO { TEST_TABLE_NAME } (json_data) VALUES (%s)" ,
3188+ args = (json .dumps (more_data , ensure_ascii = False ),),
3189+ commit = True
3190+ )
3191+
3192+ assert_wait (lambda : len (ch .select (TEST_TABLE_NAME )) == 2 , max_wait_time = 20 )
3193+
3194+ # Verify the second record
3195+ ch_records = ch .select (TEST_TABLE_NAME )
3196+ assert len (ch_records ) == 2 , f"Expected 2 records in ClickHouse, got { len (ch_records )} "
3197+
3198+ db_replicator_runner .stop ()
3199+ finally :
3200+ binlog_replicator_runner .stop ()
3201+
3202+ finally :
3203+ # Clean up temp config file
3204+ os .unlink (config_file )
3205+
3206+
30873207@pytest .mark .parametrize ("input_sql,expected_output" , [
30883208 # Basic single quote comment
30893209 (
@@ -3222,3 +3342,4 @@ def normalize_whitespace(text):
32223342 return re .sub (r'[ \t]+' , ' ' , text ).strip ()
32233343
32243344 assert normalize_whitespace (result ) == normalize_whitespace (expected_output ), f"Failed for input: { input_sql } "
3345+
0 commit comments