|
10 | 10 | from mysql_ch_replicator import config |
11 | 11 | from mysql_ch_replicator import mysql_api |
12 | 12 | from mysql_ch_replicator import clickhouse_api |
13 | | -from mysql_ch_replicator.binlog_replicator import State as BinlogState |
| 13 | +from mysql_ch_replicator.binlog_replicator import State as BinlogState, FileReader, EventType |
14 | 14 | from mysql_ch_replicator.db_replicator import State as DbReplicatorState, DbReplicator |
15 | 15 | from mysql_ch_replicator.converter import MysqlToClickhouseConverter |
16 | 16 |
|
@@ -1269,3 +1269,127 @@ def test_parse_mysql_table_structure(): |
1269 | 1269 |
|
1270 | 1270 | assert structure.table_name == 'user_preferences_portal' |
1271 | 1271 |
|
| 1272 | + |
| 1273 | +def get_last_file(directory, extension='.bin'): |
| 1274 | + max_num = -1 |
| 1275 | + last_file = None |
| 1276 | + ext_len = len(extension) |
| 1277 | + |
| 1278 | + with os.scandir(directory) as it: |
| 1279 | + for entry in it: |
| 1280 | + if entry.is_file() and entry.name.endswith(extension): |
| 1281 | + # Extract the numerical part by removing the extension |
| 1282 | + num_part = entry.name[:-ext_len] |
| 1283 | + try: |
| 1284 | + num = int(num_part) |
| 1285 | + if num > max_num: |
| 1286 | + max_num = num |
| 1287 | + last_file = entry.name |
| 1288 | + except ValueError: |
| 1289 | + # Skip files where the name before extension is not an integer |
| 1290 | + continue |
| 1291 | + return last_file |
| 1292 | + |
| 1293 | + |
| 1294 | +def get_last_insert_from_binlog(cfg: config.Settings, db_name: str): |
| 1295 | + binlog_dir_path = os.path.join(cfg.binlog_replicator.data_dir, db_name) |
| 1296 | + if not os.path.exists(binlog_dir_path): |
| 1297 | + return None |
| 1298 | + last_file = get_last_file(binlog_dir_path) |
| 1299 | + if last_file is None: |
| 1300 | + return None |
| 1301 | + reader = FileReader(os.path.join(binlog_dir_path, last_file)) |
| 1302 | + last_insert = None |
| 1303 | + while True: |
| 1304 | + event = reader.read_next_event() |
| 1305 | + if event is None: |
| 1306 | + break |
| 1307 | + if event.event_type != EventType.ADD_EVENT.value: |
| 1308 | + continue |
| 1309 | + for record in event.records: |
| 1310 | + last_insert = record |
| 1311 | + return last_insert |
| 1312 | + |
| 1313 | + |
| 1314 | +@pytest.mark.optional |
| 1315 | +def test_performance_dbreplicator(): |
| 1316 | + config_file = 'tests_config_perf.yaml' |
| 1317 | + num_records = 100000 |
| 1318 | + |
| 1319 | + cfg = config.Settings() |
| 1320 | + cfg.load(config_file) |
| 1321 | + |
| 1322 | + mysql = mysql_api.MySQLApi( |
| 1323 | + database=None, |
| 1324 | + mysql_settings=cfg.mysql, |
| 1325 | + ) |
| 1326 | + |
| 1327 | + ch = clickhouse_api.ClickhouseApi( |
| 1328 | + database=TEST_DB_NAME, |
| 1329 | + clickhouse_settings=cfg.clickhouse, |
| 1330 | + ) |
| 1331 | + |
| 1332 | + prepare_env(cfg, mysql, ch) |
| 1333 | + |
| 1334 | + mysql.execute(f''' |
| 1335 | + CREATE TABLE {TEST_TABLE_NAME} ( |
| 1336 | + id int NOT NULL AUTO_INCREMENT, |
| 1337 | + name varchar(2048), |
| 1338 | + age int, |
| 1339 | + PRIMARY KEY (id) |
| 1340 | + ); |
| 1341 | + ''') |
| 1342 | + |
| 1343 | + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) |
| 1344 | + binlog_replicator_runner.run() |
| 1345 | + |
| 1346 | + time.sleep(1) |
| 1347 | + |
| 1348 | + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('TEST_VALUE_1', 33);", commit=True) |
| 1349 | + |
| 1350 | + def _get_last_insert_name(): |
| 1351 | + record = get_last_insert_from_binlog(cfg=cfg, db_name=TEST_DB_NAME) |
| 1352 | + if record is None: |
| 1353 | + return None |
| 1354 | + return record[1].decode('utf-8') |
| 1355 | + |
| 1356 | + assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_1', retry_interval=0.5) |
| 1357 | + |
| 1358 | + binlog_replicator_runner.stop() |
| 1359 | + |
| 1360 | + time.sleep(1) |
| 1361 | + |
| 1362 | + print("populating mysql data") |
| 1363 | + |
| 1364 | + base_value = 'a' * 2000 |
| 1365 | + |
| 1366 | + for i in range(num_records): |
| 1367 | + if i % 2000 == 0: |
| 1368 | + print(f'populated {i} elements') |
| 1369 | + mysql.execute( |
| 1370 | + f"INSERT INTO {TEST_TABLE_NAME} (name, age) " |
| 1371 | + f"VALUES ('TEST_VALUE_{i}_{base_value}', {i});", commit=i % 20 == 0, |
| 1372 | + ) |
| 1373 | + |
| 1374 | + mysql.execute(f"INSERT INTO {TEST_TABLE_NAME} (name, age) VALUES ('TEST_VALUE_FINAL', 0);", commit=True) |
| 1375 | + |
| 1376 | + print("running db_replicator") |
| 1377 | + t1 = time.time() |
| 1378 | + binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) |
| 1379 | + binlog_replicator_runner.run() |
| 1380 | + |
| 1381 | + assert_wait(lambda: _get_last_insert_name() == 'TEST_VALUE_FINAL', retry_interval=0.5, max_wait_time=1000) |
| 1382 | + t2 = time.time() |
| 1383 | + |
| 1384 | + binlog_replicator_runner.stop() |
| 1385 | + |
| 1386 | + time_delta = t2 - t1 |
| 1387 | + rps = num_records / time_delta |
| 1388 | + |
| 1389 | + print('\n\n') |
| 1390 | + print("*****************************") |
| 1391 | + print("records per second:", int(rps)) |
| 1392 | + print("total time (seconds):", round(time_delta, 2)) |
| 1393 | + print("*****************************") |
| 1394 | + print('\n\n') |
| 1395 | + |
0 commit comments