@@ -204,6 +204,13 @@ def main():
204
204
redis_proc_start_port = args .redis_proc_start_port
205
205
logging .info ("Redis Processes start port: {}" .format (redis_proc_start_port ))
206
206
207
+ priority_lower_limit = args .tests_priority_lower_limit
208
+ priority_upper_limit = args .tests_priority_upper_limit
209
+
210
+ logging .info (
211
+ f"Using priority for test filters [{ priority_lower_limit } ,{ priority_upper_limit } ]"
212
+ )
213
+
207
214
# TODO: confirm we do have enough cores to run the spec
208
215
# availabe_cpus = args.cpu_count
209
216
datasink_push_results_redistimeseries = args .datasink_push_results_redistimeseries
@@ -283,6 +290,8 @@ def main():
283
290
default_metrics ,
284
291
arch ,
285
292
github_token ,
293
+ priority_lower_limit ,
294
+ priority_upper_limit ,
286
295
)
287
296
288
297
@@ -307,6 +316,8 @@ def self_contained_coordinator_blocking_read(
307
316
default_metrics = None ,
308
317
arch = "amd64" ,
309
318
github_token = None ,
319
+ priority_lower_limit = 0 ,
320
+ priority_upper_limit = 10000 ,
310
321
):
311
322
num_process_streams = 0
312
323
num_process_test_suites = 0
@@ -354,6 +365,8 @@ def self_contained_coordinator_blocking_read(
354
365
default_metrics ,
355
366
arch ,
356
367
github_token ,
368
+ priority_lower_limit ,
369
+ priority_upper_limit ,
357
370
)
358
371
num_process_streams = num_process_streams + 1
359
372
num_process_test_suites = num_process_test_suites + total_test_suite_runs
@@ -427,6 +440,8 @@ def process_self_contained_coordinator_stream(
427
440
default_metrics = [],
428
441
arch = "amd64" ,
429
442
github_token = None ,
443
+ priority_lower_limit = 0 ,
444
+ priority_upper_limit = 10000 ,
430
445
):
431
446
stream_id = "n/a"
432
447
overall_result = False
@@ -459,6 +474,24 @@ def process_self_contained_coordinator_stream(
459
474
run_arch ,
460
475
) = extract_build_info_from_streamdata (testDetails )
461
476
477
+ if b"priority_upper_limit" in testDetails :
478
+ stream_priority_upper_limit = int (
479
+ testDetails [b"priority_upper_limit" ].decode ()
480
+ )
481
+ logging .info (
482
+ f"detected a priority_upper_limit definition on the streamdata { stream_priority_upper_limit } . will replace the default upper limit of { priority_upper_limit } "
483
+ )
484
+ priority_upper_limit = stream_priority_upper_limit
485
+
486
+ if b"priority_lower_limit" in testDetails :
487
+ stream_priority_lower_limit = int (
488
+ testDetails [b"priority_lower_limit" ].decode ()
489
+ )
490
+ logging .info (
491
+ f"detected a priority_lower_limit definition on the streamdata { stream_priority_lower_limit } . will replace the default lower limit of { priority_lower_limit } "
492
+ )
493
+ priority_lower_limit = stream_priority_lower_limit
494
+
462
495
if b"pull_request" in testDetails :
463
496
pull_request = testDetails [b"pull_request" ].decode ()
464
497
logging .info (
@@ -517,8 +550,6 @@ def process_self_contained_coordinator_stream(
517
550
images_loaded = docker_client .images .load (airgap_docker_image_bin )
518
551
logging .info ("Successfully loaded images {}" .format (images_loaded ))
519
552
520
- filtered_test_files = []
521
-
522
553
stream_time_ms = stream_id .split ("-" )[0 ]
523
554
zset_running_platform_benchmarks = f"ci.benchmarks.redis/ci/redis/redis:benchmarks:{ running_platform } :zset"
524
555
res = conn .zadd (
@@ -531,50 +562,40 @@ def process_self_contained_coordinator_stream(
531
562
532
563
stream_test_list_pending = f"ci.benchmarks.redis/ci/redis/redis:benchmarks:{ stream_id } :{ running_platform } :tests_pending"
533
564
stream_test_list_running = f"ci.benchmarks.redis/ci/redis/redis:benchmarks:{ stream_id } :{ running_platform } :tests_running"
565
+ stream_test_list_failed = f"ci.benchmarks.redis/ci/redis/redis:benchmarks:{ stream_id } :{ running_platform } :tests_failed"
534
566
stream_test_list_completed = f"ci.benchmarks.redis/ci/redis/redis:benchmarks:{ stream_id } :{ running_platform } :tests_completed"
535
- for test_file in testsuite_spec_files :
536
- if defaults_filename in test_file :
537
- continue
538
-
539
- if test_regexp != ".*" :
540
- logging .info (
541
- "Filtering all tests via a regular expression: {}" .format (
542
- test_regexp
543
- )
544
- )
545
- tags_regex_string = re .compile (test_regexp )
546
567
547
- match_obj = re .search (tags_regex_string , test_file )
548
- if match_obj is None :
549
- logging .info (
550
- "Skipping {} given it does not match regex {}" .format (
551
- test_file , test_regexp
552
- )
553
- )
554
- continue
568
+ filtered_test_files = filter_test_files (
569
+ defaults_filename ,
570
+ priority_lower_limit ,
571
+ priority_upper_limit ,
572
+ test_regexp ,
573
+ testsuite_spec_files ,
574
+ )
555
575
576
+ for test_file in filtered_test_files :
556
577
with open (test_file , "r" ) as stream :
557
578
(
558
- result ,
579
+ _ ,
559
580
benchmark_config ,
560
581
test_name ,
561
582
) = get_final_benchmark_config (None , stream , "" )
562
- if result is False :
563
- logging .error (
564
- "Skipping {} given there were errors while calling get_final_benchmark_config()" .format (
565
- test_file
566
- )
567
- )
568
- continue
569
- conn .lpush (stream_test_list_pending , test_name )
570
- conn .expire (stream_test_list_pending , REDIS_BINS_EXPIRE_SECS )
571
- logging .info (
572
- f"Added test named { test_name } to the pending test list in key { stream_test_list_pending } "
573
- )
574
- filtered_test_files .append (test_file )
583
+ conn .lpush (stream_test_list_pending , test_name )
584
+ conn .expire (stream_test_list_pending , REDIS_BINS_EXPIRE_SECS )
585
+ logging .info (
586
+ f"Added test named { test_name } to the pending test list in key { stream_test_list_pending } "
587
+ )
588
+
575
589
pending_tests = len (filtered_test_files )
590
+ failed_tests = 0
591
+ benchmark_suite_start_datetime = datetime .datetime .utcnow ()
576
592
comment_body = generate_benchmark_started_pr_comment (
577
- stream_id , pending_tests , len (filtered_test_files )
593
+ stream_id ,
594
+ pending_tests ,
595
+ len (filtered_test_files ),
596
+ failed_tests ,
597
+ benchmark_suite_start_datetime ,
598
+ 0 ,
578
599
)
579
600
# update on github if needed
580
601
if is_actionable_pr :
@@ -1125,12 +1146,31 @@ def process_self_contained_coordinator_stream(
1125
1146
conn .lrem (stream_test_list_running , 1 , test_name )
1126
1147
conn .lpush (stream_test_list_completed , test_name )
1127
1148
conn .expire (stream_test_list_completed , REDIS_BINS_EXPIRE_SECS )
1149
+ if test_result is False :
1150
+ conn .lpush (stream_test_list_failed , test_name )
1151
+ failed_tests = failed_tests + 1
1152
+ logging .warning (
1153
+ f"updating key { stream_test_list_failed } with the failed test: { test_name } . Total failed tests { failed_tests } ."
1154
+ )
1128
1155
pending_tests = pending_tests - 1
1129
1156
1157
+ benchmark_suite_end_datetime = datetime .datetime .utcnow ()
1158
+ benchmark_suite_duration = (
1159
+ benchmark_suite_end_datetime - benchmark_suite_start_datetime
1160
+ )
1161
+ benchmark_suite_duration_secs = (
1162
+ benchmark_suite_duration .total_seconds ()
1163
+ )
1164
+
1130
1165
# update on github if needed
1131
1166
if is_actionable_pr :
1132
1167
comment_body = generate_benchmark_started_pr_comment (
1133
- stream_id , pending_tests , len (filtered_test_files )
1168
+ stream_id ,
1169
+ pending_tests ,
1170
+ len (filtered_test_files ),
1171
+ failed_tests ,
1172
+ benchmark_suite_start_datetime ,
1173
+ benchmark_suite_duration_secs ,
1134
1174
)
1135
1175
update_comment_if_needed (
1136
1176
auto_approve_github ,
@@ -1164,6 +1204,79 @@ def process_self_contained_coordinator_stream(
1164
1204
return stream_id , overall_result , total_test_suite_runs
1165
1205
1166
1206
1207
+ def filter_test_files (
1208
+ defaults_filename ,
1209
+ priority_lower_limit ,
1210
+ priority_upper_limit ,
1211
+ test_regexp ,
1212
+ testsuite_spec_files ,
1213
+ ):
1214
+ filtered_test_files = []
1215
+ for test_file in testsuite_spec_files :
1216
+ if defaults_filename in test_file :
1217
+ continue
1218
+
1219
+ if test_regexp != ".*" :
1220
+ logging .info (
1221
+ "Filtering all tests via a regular expression: {}" .format (test_regexp )
1222
+ )
1223
+ tags_regex_string = re .compile (test_regexp )
1224
+
1225
+ match_obj = re .search (tags_regex_string , test_file )
1226
+ if match_obj is None :
1227
+ logging .info (
1228
+ "Skipping {} given it does not match regex {}" .format (
1229
+ test_file , test_regexp
1230
+ )
1231
+ )
1232
+ continue
1233
+
1234
+ with open (test_file , "r" ) as stream :
1235
+ (
1236
+ result ,
1237
+ benchmark_config ,
1238
+ test_name ,
1239
+ ) = get_final_benchmark_config (None , stream , "" )
1240
+ if result is False :
1241
+ logging .error (
1242
+ "Skipping {} given there were errors while calling get_final_benchmark_config()" .format (
1243
+ test_file
1244
+ )
1245
+ )
1246
+ continue
1247
+
1248
+ if "priority" in benchmark_config :
1249
+ priority = benchmark_config ["priority" ]
1250
+
1251
+ if priority is not None :
1252
+ if priority > priority_upper_limit :
1253
+ logging .warning (
1254
+ "Skipping test {} giving the priority limit ({}) is above the priority value ({})" .format (
1255
+ test_name , priority_upper_limit , priority
1256
+ )
1257
+ )
1258
+
1259
+ continue
1260
+ if priority < priority_lower_limit :
1261
+ logging .warning (
1262
+ "Skipping test {} giving the priority limit ({}) is bellow the priority value ({})" .format (
1263
+ test_name , priority_lower_limit , priority
1264
+ )
1265
+ )
1266
+
1267
+ continue
1268
+ logging .info (
1269
+ "Test {} priority ({}) is within the priority limit [{},{}]" .format (
1270
+ test_name ,
1271
+ priority ,
1272
+ priority_lower_limit ,
1273
+ priority_upper_limit ,
1274
+ )
1275
+ )
1276
+ filtered_test_files .append (test_file )
1277
+ return filtered_test_files
1278
+
1279
+
1167
1280
def data_prepopulation_step (
1168
1281
benchmark_config ,
1169
1282
benchmark_tool_workdir ,
0 commit comments