Skip to content

Commit 958080f

Browse files
authored
Moved performance tests to separate file (#202)
1 parent 2d6bf2a commit 958080f

File tree

2 files changed

+302
-292
lines changed

2 files changed

+302
-292
lines changed

tests/test_mysql_ch_replicator.py

Lines changed: 0 additions & 292 deletions
Original file line numberDiff line numberDiff line change
@@ -420,166 +420,6 @@ def test_parse_mysql_table_structure():
420420
assert structure.table_name == 'user_preferences_portal'
421421

422422

423-
def get_last_file(directory, extension='.bin'):
424-
max_num = -1
425-
last_file = None
426-
ext_len = len(extension)
427-
428-
with os.scandir(directory) as it:
429-
for entry in it:
430-
if entry.is_file() and entry.name.endswith(extension):
431-
# Extract the numerical part by removing the extension
432-
num_part = entry.name[:-ext_len]
433-
try:
434-
num = int(num_part)
435-
if num > max_num:
436-
max_num = num
437-
last_file = entry.name
438-
except ValueError:
439-
# Skip files where the name before extension is not an integer
440-
continue
441-
return last_file
442-
443-
444-
def get_last_insert_from_binlog(cfg: config.Settings, db_name: str):
445-
binlog_dir_path = os.path.join(cfg.binlog_replicator.data_dir, db_name)
446-
if not os.path.exists(binlog_dir_path):
447-
return None
448-
last_file = get_last_file(binlog_dir_path)
449-
if last_file is None:
450-
return None
451-
reader = FileReader(os.path.join(binlog_dir_path, last_file))
452-
last_insert = None
453-
while True:
454-
event = reader.read_next_event()
455-
if event is None:
456-
break
457-
if event.event_type != EventType.ADD_EVENT.value:
458-
continue
459-
for record in event.records:
460-
last_insert = record
461-
return last_insert
462-
463-
464-
@pytest.mark.optional
465-
def test_performance_realtime_replication():
466-
config_file = 'tests/tests_config_perf.yaml'
467-
num_records = 100000
468-
469-
cfg = config.Settings()
470-
cfg.load(config_file)
471-
472-
mysql = mysql_api.MySQLApi(
473-
database=None,
474-
mysql_settings=cfg.mysql,
475-
)
476-
477-
ch = clickhouse_api.ClickhouseApi(
478-
database=TEST_DB_NAME,
479-
clickhouse_settings=cfg.clickhouse,
480-
)
481-
482-
prepare_env(cfg, mysql, ch)
483-
484-
mysql.execute(f'''
485-
CREATE TABLE `{TEST_TABLE_NAME}` (
486-
id int NOT NULL AUTO_INCREMENT,
487-
name varchar(2048),
488-
age int,
489-
PRIMARY KEY (id)
490-
);
491-
''')
492-
493-
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
494-
binlog_replicator_runner.run()
495-
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
496-
db_replicator_runner.run()
497-
498-
time.sleep(1)
499-
500-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True)
501-
502-
def _get_last_insert_name():
503-
record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME)
504-
if record is None:
505-
return None
506-
return record[1].decode('utf-8')
507-
508-
assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5)
509-
510-
# Wait for the database and table to be created in ClickHouse
511-
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
512-
ch.execute_command(f'USE `{TEST_DB_NAME}`')
513-
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
514-
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, retry_interval=0.5)
515-
516-
binlog_replicator_runner.stop()
517-
db_replicator_runner.stop()
518-
519-
time.sleep(1)
520-
521-
print("populating mysql data")
522-
523-
base_value = 'a' * 2000
524-
525-
for i in range(num_records):
526-
if i % 2000 == 0:
527-
print(f'populated {i} elements')
528-
mysql.execute(
529-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
530-
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
531-
)
532-
533-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)
534-
535-
print("running binlog_replicator")
536-
t1 = time.time()
537-
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
538-
binlog_replicator_runner.run()
539-
540-
assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_FINAL', retry_interval=0.5, max_wait_time=1000)
541-
t2 = time.time()
542-
543-
binlog_replicator_runner.stop()
544-
545-
time_delta = t2 - t1
546-
rps = num_records / time_delta
547-
548-
print('\n\n')
549-
print("*****************************")
550-
print("Binlog Replicator Performance:")
551-
print("records per second:", int(rps))
552-
print("total time (seconds):", round(time_delta, 2))
553-
print("*****************************")
554-
print('\n\n')
555-
556-
# Now test db_replicator performance
557-
print("running db_replicator")
558-
t1 = time.time()
559-
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
560-
db_replicator_runner.run()
561-
562-
# Make sure the database and table exist before querying
563-
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
564-
ch.execute_command(f'USE `{TEST_DB_NAME}`')
565-
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
566-
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 2, retry_interval=0.5, max_wait_time=1000)
567-
t2 = time.time()
568-
569-
db_replicator_runner.stop()
570-
571-
time_delta = t2 - t1
572-
rps = num_records / time_delta
573-
574-
print('\n\n')
575-
print("*****************************")
576-
print("DB Replicator Performance:")
577-
print("records per second:", int(rps))
578-
print("total time (seconds):", round(time_delta, 2))
579-
print("*****************************")
580-
print('\n\n')
581-
582-
583423
def test_alter_tokens_split():
584424
examples = [
585425
# basic examples from the prompt:
@@ -719,138 +559,6 @@ def test_parse_db_name_from_query(query, expected):
719559
assert BinlogReplicator._try_parse_db_name_from_query(query) == expected
720560

721561

722-
@pytest.mark.optional
723-
def test_performance_initial_only_replication():
724-
config_file = 'tests/tests_config_perf.yaml'
725-
num_records = 300000
726-
727-
cfg = config.Settings()
728-
cfg.load(config_file)
729-
730-
mysql = mysql_api.MySQLApi(
731-
database=None,
732-
mysql_settings=cfg.mysql,
733-
)
734-
735-
ch = clickhouse_api.ClickhouseApi(
736-
database=TEST_DB_NAME,
737-
clickhouse_settings=cfg.clickhouse,
738-
)
739-
740-
prepare_env(cfg, mysql, ch)
741-
742-
mysql.execute(f'''
743-
CREATE TABLE `{TEST_TABLE_NAME}` (
744-
id int NOT NULL AUTO_INCREMENT,
745-
name varchar(2048),
746-
age int,
747-
PRIMARY KEY (id)
748-
);
749-
''')
750-
751-
print("populating mysql data")
752-
753-
base_value = 'a' * 2000
754-
755-
for i in range(num_records):
756-
if i % 2000 == 0:
757-
print(f'populated {i} elements')
758-
mysql.execute(
759-
f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) "
760-
f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0,
761-
)
762-
763-
mysql.execute(f"INSERT INTO `{TEST_TABLE_NAME}` (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True)
764-
print(f"finished populating {num_records} records")
765-
766-
# Now test db_replicator performance in initial_only mode
767-
print("running db_replicator in initial_only mode")
768-
t1 = time.time()
769-
770-
db_replicator_runner = DbReplicatorRunner(
771-
TEST_DB_NAME,
772-
additional_arguments='--initial_only=True',
773-
cfg_file=config_file
774-
)
775-
db_replicator_runner.run()
776-
db_replicator_runner.wait_complete() # Wait for the process to complete
777-
778-
# Make sure the database and table exist
779-
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
780-
ch.execute_command(f'USE `{TEST_DB_NAME}`')
781-
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
782-
783-
# Check that all records were replicated
784-
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300)
785-
786-
t2 = time.time()
787-
788-
time_delta = t2 - t1
789-
rps = num_records / time_delta
790-
791-
print('\n\n')
792-
print("*****************************")
793-
print("DB Replicator Initial Only Mode Performance:")
794-
print("records per second:", int(rps))
795-
print("total time (seconds):", round(time_delta, 2))
796-
print("*****************************")
797-
print('\n\n')
798-
799-
# Clean up
800-
ch.drop_database(TEST_DB_NAME)
801-
802-
# Now test with parallel replication
803-
# Set initial_replication_threads in the config
804-
print("running db_replicator with parallel initial replication")
805-
806-
t1 = time.time()
807-
808-
# Create a custom config file for testing with parallel replication
809-
parallel_config_file = 'tests/tests_config_perf_parallel.yaml'
810-
if os.path.exists(parallel_config_file):
811-
os.remove(parallel_config_file)
812-
813-
with open(config_file, 'r') as src_file:
814-
config_content = src_file.read()
815-
config_content += f"\ninitial_replication_threads: 8\n"
816-
with open(parallel_config_file, 'w') as dest_file:
817-
dest_file.write(config_content)
818-
819-
# Use the DbReplicator directly to test the new parallel implementation
820-
db_replicator_runner = DbReplicatorRunner(
821-
TEST_DB_NAME,
822-
cfg_file=parallel_config_file
823-
)
824-
db_replicator_runner.run()
825-
826-
# Make sure the database and table exist
827-
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), retry_interval=0.5)
828-
ch.execute_command(f'USE `{TEST_DB_NAME}`')
829-
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), retry_interval=0.5)
830-
831-
# Check that all records were replicated
832-
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == num_records + 1, retry_interval=0.5, max_wait_time=300)
833-
834-
t2 = time.time()
835-
836-
time_delta = t2 - t1
837-
rps = num_records / time_delta
838-
839-
print('\n\n')
840-
print("*****************************")
841-
print("DB Replicator Parallel Mode Performance:")
842-
print("workers:", cfg.initial_replication_threads)
843-
print("records per second:", int(rps))
844-
print("total time (seconds):", round(time_delta, 2))
845-
print("*****************************")
846-
print('\n\n')
847-
848-
db_replicator_runner.stop()
849-
850-
# Clean up the temporary config file
851-
os.remove(parallel_config_file)
852-
853-
854562
def test_ignore_deletes():
855563
# Create a temporary config file with ignore_deletes=True
856564
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file:

0 commit comments

Comments
 (0)