|
5 | 5 | import yaml
|
6 | 6 | from pathlib import Path
|
7 | 7 | import logging
|
| 8 | +import datetime |
8 | 9 |
|
9 | 10 | from redisbench_admin.utils.benchmark_config import get_defaults
|
10 | 11 | from redisbench_admin.utils.remote import get_overall_dashboard_keynames
|
@@ -1283,3 +1284,129 @@ def test_self_contained_coordinator_blocking_read_valkey():
|
1283 | 1284 |
|
1284 | 1285 | except redis.exceptions.ConnectionError:
|
1285 | 1286 | pass
|
| 1287 | + |
| 1288 | + |
| 1289 | +def test_self_contained_coordinator_duplicated_ts(): |
| 1290 | + try: |
| 1291 | + if run_coordinator_tests_dockerhub(): |
| 1292 | + db_port = int(os.getenv("DATASINK_PORT", "6379")) |
| 1293 | + conn = redis.StrictRedis(port=db_port) |
| 1294 | + conn.ping() |
| 1295 | + conn.flushall() |
| 1296 | + |
| 1297 | + id = "dockerhub" |
| 1298 | + redis_version = "7.4.0" |
| 1299 | + run_image = f"redis:{redis_version}" |
| 1300 | + build_arch = "amd64" |
| 1301 | + testDetails = {} |
| 1302 | + build_os = "test_build_os" |
| 1303 | + |
| 1304 | + # generate 2 stream requests with the same timestamp |
| 1305 | + timestamp = int(datetime.datetime.now().timestamp()) |
| 1306 | + for _ in range(0, 2): |
| 1307 | + build_stream_fields, result = generate_benchmark_stream_request( |
| 1308 | + id, |
| 1309 | + conn, |
| 1310 | + run_image, |
| 1311 | + build_arch, |
| 1312 | + testDetails, |
| 1313 | + build_os, |
| 1314 | + git_timestamp_ms=timestamp, |
| 1315 | + use_git_timestamp=True, |
| 1316 | + ) |
| 1317 | + build_stream_fields["mnt_point"] = "" |
| 1318 | + if result is True: |
| 1319 | + benchmark_stream_id = conn.xadd( |
| 1320 | + STREAM_KEYNAME_NEW_BUILD_EVENTS, build_stream_fields |
| 1321 | + ) |
| 1322 | + logging.info( |
| 1323 | + "sucessfully requested a new run {}. Stream id: {}".format( |
| 1324 | + build_stream_fields, benchmark_stream_id |
| 1325 | + ) |
| 1326 | + ) |
| 1327 | + |
| 1328 | + assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS) |
| 1329 | + assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) == 2 |
| 1330 | + |
| 1331 | + running_platform = "fco-ThinkPad-T490" |
| 1332 | + |
| 1333 | + # process the 2 stream requests |
| 1334 | + for _ in range(0, 2): |
| 1335 | + |
| 1336 | + build_runners_consumer_group_create(conn, running_platform, "0") |
| 1337 | + datasink_conn = redis.StrictRedis(port=db_port) |
| 1338 | + docker_client = docker.from_env() |
| 1339 | + home = str(Path.home()) |
| 1340 | + stream_id = ">" |
| 1341 | + topologies_map = get_topologies( |
| 1342 | + "./redis_benchmarks_specification/setups/topologies/topologies.yml" |
| 1343 | + ) |
| 1344 | + # we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores |
| 1345 | + # and we need 1 core for DB and another for CLIENT |
| 1346 | + testsuite_spec_files = [ |
| 1347 | + "./utils/tests/test_data/test-suites/test-memtier-dockerhub.yml" |
| 1348 | + ] |
| 1349 | + defaults_filename = "./utils/tests/test_data/test-suites/defaults.yml" |
| 1350 | + ( |
| 1351 | + _, |
| 1352 | + _, |
| 1353 | + default_metrics, |
| 1354 | + _, |
| 1355 | + _, |
| 1356 | + _, |
| 1357 | + ) = get_defaults(defaults_filename) |
| 1358 | + |
| 1359 | + ( |
| 1360 | + result, |
| 1361 | + stream_id, |
| 1362 | + number_processed_streams, |
| 1363 | + num_process_test_suites, |
| 1364 | + ) = self_contained_coordinator_blocking_read( |
| 1365 | + conn, |
| 1366 | + True, |
| 1367 | + docker_client, |
| 1368 | + home, |
| 1369 | + stream_id, |
| 1370 | + datasink_conn, |
| 1371 | + testsuite_spec_files, |
| 1372 | + topologies_map, |
| 1373 | + running_platform, |
| 1374 | + False, |
| 1375 | + [], |
| 1376 | + "", |
| 1377 | + 0, |
| 1378 | + 6399, |
| 1379 | + 1, |
| 1380 | + False, |
| 1381 | + 5, |
| 1382 | + default_metrics, |
| 1383 | + "amd64", |
| 1384 | + None, |
| 1385 | + 0, |
| 1386 | + 10000, |
| 1387 | + "unstable", |
| 1388 | + "", |
| 1389 | + True, |
| 1390 | + False, |
| 1391 | + ) |
| 1392 | + assert result == True |
| 1393 | + assert number_processed_streams == 1 |
| 1394 | + assert num_process_test_suites == 1 |
| 1395 | + |
| 1396 | + stat_key = f"ci.benchmarks.redislabs/by.version/ci/redis/redis/memtier_benchmark-1Mkeys-load-string-with-10B-values/dockerhub/{running_platform}/oss-standalone/{redis_version}/ALL_STATS.Totals.Ops/sec" |
| 1397 | + assert datasink_conn.exists(stat_key) |
| 1398 | + rts = datasink_conn.ts() |
| 1399 | + |
| 1400 | + rts_info = rts.info(stat_key) |
| 1401 | + |
| 1402 | + # we have two datapoints |
| 1403 | + assert rts_info.total_samples == 2 |
| 1404 | + |
| 1405 | + # first was inserted on the original timestamp |
| 1406 | + assert rts_info.first_timestamp == timestamp |
| 1407 | + |
| 1408 | + # the second has clashed, so it was resolved by adding 1ms to the timestamp |
| 1409 | + assert rts_info.last_timestamp == timestamp + 1 |
| 1410 | + |
| 1411 | + except redis.exceptions.ConnectionError: |
| 1412 | + pass |
0 commit comments