@@ -1997,3 +1997,82 @@ def test_year_type():
19971997 run_all_runner .stop ()
19981998 assert_wait (lambda : 'stopping db_replicator' in read_logs (TEST_DB_NAME ))
19991999 assert ('Traceback' not in read_logs (TEST_DB_NAME ))
2000+
2001+ @pytest .mark .optional
2002+ def test_performance_initial_only_replication ():
2003+ config_file = 'tests_config_perf.yaml'
2004+ num_records = 1000000
2005+
2006+ cfg = config .Settings ()
2007+ cfg .load (config_file )
2008+
2009+ mysql = mysql_api .MySQLApi (
2010+ database = None ,
2011+ mysql_settings = cfg .mysql ,
2012+ )
2013+
2014+ ch = clickhouse_api .ClickhouseApi (
2015+ database = TEST_DB_NAME ,
2016+ clickhouse_settings = cfg .clickhouse ,
2017+ )
2018+
2019+ prepare_env (cfg , mysql , ch )
2020+
2021+ mysql .execute (f'''
2022+ CREATE TABLE `{ TEST_TABLE_NAME } ` (
2023+ id int NOT NULL AUTO_INCREMENT,
2024+ name varchar(2048),
2025+ age int,
2026+ PRIMARY KEY (id)
2027+ );
2028+ ''' )
2029+
2030+ print ("populating mysql data" )
2031+
2032+ base_value = 'a' * 2000
2033+
2034+ for i in range (num_records ):
2035+ if i % 2000 == 0 :
2036+ print (f'populated { i } elements' )
2037+ mysql .execute (
2038+ f"INSERT INTO `{ TEST_TABLE_NAME } ` (name, age) "
2039+ f"VALUES ('TEST_VALUE_{ i } _{ base_value } ', { i } );" , commit = i % 20 == 0 ,
2040+ )
2041+
2042+ mysql .execute (f"INSERT INTO `{ TEST_TABLE_NAME } ` (name, age) VALUES ('TEST_VALUE_FINAL', 0);" , commit = True )
2043+ print (f"finished populating { num_records } records" )
2044+
2045+ # Now test db_replicator performance in initial_only mode
2046+ print ("running db_replicator in initial_only mode" )
2047+ t1 = time .time ()
2048+
2049+ db_replicator_runner = DbReplicatorRunner (
2050+ TEST_DB_NAME ,
2051+ additional_arguments = '--initial_only=True' ,
2052+ cfg_file = config_file
2053+ )
2054+ db_replicator_runner .run ()
2055+ db_replicator_runner .wait_complete () # Wait for the process to complete
2056+
2057+ # Make sure the database and table exist
2058+ assert_wait (lambda : TEST_DB_NAME in ch .get_databases (), retry_interval = 0.5 )
2059+ ch .execute_command (f'USE `{ TEST_DB_NAME } `' )
2060+ assert_wait (lambda : TEST_TABLE_NAME in ch .get_tables (), retry_interval = 0.5 )
2061+
2062+ # Check that all records were replicated
2063+ assert_wait (lambda : len (ch .select (TEST_TABLE_NAME )) == num_records + 1 , retry_interval = 0.5 , max_wait_time = 300 )
2064+
2065+ t2 = time .time ()
2066+
2067+ time_delta = t2 - t1
2068+ rps = num_records / time_delta
2069+
2070+ print ('\n \n ' )
2071+ print ("*****************************" )
2072+ print ("DB Replicator Initial Only Mode Performance:" )
2073+ print ("records per second:" , int (rps ))
2074+ print ("total time (seconds):" , round (time_delta , 2 ))
2075+ print ("*****************************" )
2076+ print ('\n \n ' )
2077+
2078+ db_replicator_runner .stop ()
0 commit comments