@@ -1357,7 +1357,7 @@ def get_last_insert_from_binlog(cfg: config.Settings, db_name: str):
13571357
13581358
13591359@pytest .mark .optional
1360- def test_performance_dbreplicator ():
1360+ def test_performance_realtime_replication ():
13611361 config_file = 'tests_config_perf.yaml'
13621362 num_records = 100000
13631363
@@ -1387,6 +1387,8 @@ def test_performance_dbreplicator():
13871387
13881388 binlog_replicator_runner = BinlogReplicatorRunner (cfg_file = config_file )
13891389 binlog_replicator_runner .run ()
1390+ db_replicator_runner = DbReplicatorRunner (TEST_DB_NAME , cfg_file = config_file )
1391+ db_replicator_runner .run ()
13901392
13911393 time .sleep (1 )
13921394
@@ -1399,8 +1401,15 @@ def _get_last_insert_name():
13991401 return record [1 ].decode ('utf-8' )
14001402
14011403 assert_wait (lambda : _get_last_insert_name () == 'TEST_VALUE_1' , retry_interval = 0.5 )
1404+
1405+ # Wait for the database and table to be created in ClickHouse
1406+ assert_wait (lambda : TEST_DB_NAME in ch .get_databases (), retry_interval = 0.5 )
1407+ ch .execute_command (f'USE `{ TEST_DB_NAME } `' )
1408+ assert_wait (lambda : TEST_TABLE_NAME in ch .get_tables (), retry_interval = 0.5 )
1409+ assert_wait (lambda : len (ch .select (TEST_TABLE_NAME )) == 1 , retry_interval = 0.5 )
14021410
14031411 binlog_replicator_runner .stop ()
1412+ db_replicator_runner .stop ()
14041413
14051414 time .sleep (1 )
14061415
@@ -1418,7 +1427,7 @@ def _get_last_insert_name():
14181427
14191428 mysql .execute (f"INSERT INTO `{ TEST_TABLE_NAME } ` (name, age) VALUES ('TEST_VALUE_FINAL', 0);" , commit = True )
14201429
1421- print ("running db_replicator " )
1430+ print ("running binlog_replicator " )
14221431 t1 = time .time ()
14231432 binlog_replicator_runner = BinlogReplicatorRunner (cfg_file = config_file )
14241433 binlog_replicator_runner .run ()
@@ -1433,6 +1442,33 @@ def _get_last_insert_name():
14331442
14341443 print ('\n \n ' )
14351444 print ("*****************************" )
1445+ print ("Binlog Replicator Performance:" )
1446+ print ("records per second:" , int (rps ))
1447+ print ("total time (seconds):" , round (time_delta , 2 ))
1448+ print ("*****************************" )
1449+ print ('\n \n ' )
1450+
1451+ # Now test db_replicator performance
1452+ print ("running db_replicator" )
1453+ t1 = time .time ()
1454+ db_replicator_runner = DbReplicatorRunner (TEST_DB_NAME , cfg_file = config_file )
1455+ db_replicator_runner .run ()
1456+
1457+ # Make sure the database and table exist before querying
1458+ assert_wait (lambda : TEST_DB_NAME in ch .get_databases (), retry_interval = 0.5 )
1459+ ch .execute_command (f'USE `{ TEST_DB_NAME } `' )
1460+ assert_wait (lambda : TEST_TABLE_NAME in ch .get_tables (), retry_interval = 0.5 )
1461+ assert_wait (lambda : len (ch .select (TEST_TABLE_NAME )) == num_records + 2 , retry_interval = 0.5 , max_wait_time = 1000 )
1462+ t2 = time .time ()
1463+
1464+ db_replicator_runner .stop ()
1465+
1466+ time_delta = t2 - t1
1467+ rps = num_records / time_delta
1468+
1469+ print ('\n \n ' )
1470+ print ("*****************************" )
1471+ print ("DB Replicator Performance:" )
14361472 print ("records per second:" , int (rps ))
14371473 print ("total time (seconds):" , round (time_delta , 2 ))
14381474 print ("*****************************" )
@@ -1961,3 +1997,82 @@ def test_year_type():
19611997 run_all_runner .stop ()
19621998 assert_wait (lambda : 'stopping db_replicator' in read_logs (TEST_DB_NAME ))
19631999 assert ('Traceback' not in read_logs (TEST_DB_NAME ))
2000+
2001+ @pytest .mark .optional
2002+ def test_performance_initial_only_replication ():
2003+ config_file = 'tests_config_perf.yaml'
2004+ num_records = 1000000
2005+
2006+ cfg = config .Settings ()
2007+ cfg .load (config_file )
2008+
2009+ mysql = mysql_api .MySQLApi (
2010+ database = None ,
2011+ mysql_settings = cfg .mysql ,
2012+ )
2013+
2014+ ch = clickhouse_api .ClickhouseApi (
2015+ database = TEST_DB_NAME ,
2016+ clickhouse_settings = cfg .clickhouse ,
2017+ )
2018+
2019+ prepare_env (cfg , mysql , ch )
2020+
2021+ mysql .execute (f'''
2022+ CREATE TABLE `{ TEST_TABLE_NAME } ` (
2023+ id int NOT NULL AUTO_INCREMENT,
2024+ name varchar(2048),
2025+ age int,
2026+ PRIMARY KEY (id)
2027+ );
2028+ ''' )
2029+
2030+ print ("populating mysql data" )
2031+
2032+ base_value = 'a' * 2000
2033+
2034+ for i in range (num_records ):
2035+ if i % 2000 == 0 :
2036+ print (f'populated { i } elements' )
2037+ mysql .execute (
2038+ f"INSERT INTO `{ TEST_TABLE_NAME } ` (name, age) "
2039+ f"VALUES ('TEST_VALUE_{ i } _{ base_value } ', { i } );" , commit = i % 20 == 0 ,
2040+ )
2041+
2042+ mysql .execute (f"INSERT INTO `{ TEST_TABLE_NAME } ` (name, age) VALUES ('TEST_VALUE_FINAL', 0);" , commit = True )
2043+ print (f"finished populating { num_records } records" )
2044+
2045+ # Now test db_replicator performance in initial_only mode
2046+ print ("running db_replicator in initial_only mode" )
2047+ t1 = time .time ()
2048+
2049+ db_replicator_runner = DbReplicatorRunner (
2050+ TEST_DB_NAME ,
2051+ additional_arguments = '--initial_only=True' ,
2052+ cfg_file = config_file
2053+ )
2054+ db_replicator_runner .run ()
2055+ db_replicator_runner .wait_complete () # Wait for the process to complete
2056+
2057+ # Make sure the database and table exist
2058+ assert_wait (lambda : TEST_DB_NAME in ch .get_databases (), retry_interval = 0.5 )
2059+ ch .execute_command (f'USE `{ TEST_DB_NAME } `' )
2060+ assert_wait (lambda : TEST_TABLE_NAME in ch .get_tables (), retry_interval = 0.5 )
2061+
2062+ # Check that all records were replicated
2063+ assert_wait (lambda : len (ch .select (TEST_TABLE_NAME )) == num_records + 1 , retry_interval = 0.5 , max_wait_time = 300 )
2064+
2065+ t2 = time .time ()
2066+
2067+ time_delta = t2 - t1
2068+ rps = num_records / time_delta
2069+
2070+ print ('\n \n ' )
2071+ print ("*****************************" )
2072+ print ("DB Replicator Initial Only Mode Performance:" )
2073+ print ("records per second:" , int (rps ))
2074+ print ("total time (seconds):" , round (time_delta , 2 ))
2075+ print ("*****************************" )
2076+ print ('\n \n ' )
2077+
2078+ db_replicator_runner .stop ()
0 commit comments