1
1
mod utils;
2
2
3
- use std:: { str:: FromStr , sync:: Arc } ;
4
-
5
3
use crate :: job_queue:: utils:: { parse_release_string, ExtractIf } ;
6
4
use crate :: load:: { partition_in_place, SiteCtxt } ;
7
5
use chrono:: Utc ;
8
6
use collector:: benchmark_set:: benchmark_set_count;
9
7
use database:: { BenchmarkRequest , BenchmarkRequestIndex , BenchmarkRequestStatus , Target } ;
10
8
use hashbrown:: HashSet ;
11
9
use parking_lot:: RwLock ;
10
+ use std:: { str:: FromStr , sync:: Arc } ;
12
11
use tokio:: time:: { self , Duration } ;
13
12
14
13
pub fn run_new_queue ( ) -> bool {
@@ -237,31 +236,30 @@ pub async fn enqueue_benchmark_request(
237
236
Ok ( ( ) )
238
237
}
239
238
240
- /// Try to find a benchmark request that should be enqueue next, and if such request is found,
241
- /// enqueue it.
242
- async fn try_enqueue_next_benchmark_request (
239
+ /// Update the state of benchmark requests.
240
+ /// If there is a request that has artifacts ready, and nothing is currently in-progress,
241
+ /// it will be enqueued.
242
+ /// If there is a request whose jobs have all completed, it will be marked as completed.
243
+ async fn process_benchmark_requests (
243
244
conn : & mut dyn database:: pool:: Connection ,
244
245
index : & mut BenchmarkRequestIndex ,
245
246
) -> anyhow:: Result < ( ) > {
246
247
let queue = build_queue ( conn, index) . await ?;
247
248
248
- #[ allow( clippy:: never_loop) ]
249
249
for request in queue {
250
250
match request. status ( ) {
251
- BenchmarkRequestStatus :: ArtifactsReady => {
252
- enqueue_benchmark_request ( conn, & request) . await ?;
253
- break ;
254
- }
255
251
BenchmarkRequestStatus :: InProgress => {
256
- if conn
257
- . maybe_mark_benchmark_request_as_completed ( request. tag ( ) . unwrap ( ) )
258
- . await ?
259
- {
260
- index. add_tag ( request. tag ( ) . unwrap ( ) ) ;
252
+ let tag = request. tag ( ) . expect ( "In progress request without a tag" ) ;
253
+ if conn. maybe_mark_benchmark_request_as_completed ( tag) . await ? {
254
+ index. add_tag ( tag) ;
261
255
continue ;
262
256
}
263
257
break ;
264
258
}
259
+ BenchmarkRequestStatus :: ArtifactsReady => {
260
+ enqueue_benchmark_request ( conn, & request) . await ?;
261
+ break ;
262
+ }
265
263
BenchmarkRequestStatus :: WaitingForArtifacts
266
264
| BenchmarkRequestStatus :: Completed { .. } => {
267
265
unreachable ! ( "Unexpected request {request:?} found in request queue" ) ;
@@ -274,12 +272,16 @@ async fn try_enqueue_next_benchmark_request(
274
272
/// For queueing jobs, add the jobs you want to queue to this function
275
273
async fn cron_enqueue_jobs ( site_ctxt : & Arc < SiteCtxt > ) -> anyhow:: Result < ( ) > {
276
274
let mut conn = site_ctxt. conn ( ) . await ;
275
+
277
276
let mut index = conn. load_benchmark_request_index ( ) . await ?;
277
+
278
278
// Put the master commits into the `benchmark_requests` queue
279
279
create_benchmark_request_master_commits ( site_ctxt, & * conn, & index) . await ?;
280
280
// Put the releases into the `benchmark_requests` queue
281
281
create_benchmark_request_releases ( & * conn, & index) . await ?;
282
- try_enqueue_next_benchmark_request ( & mut * conn, & mut index) . await ?;
282
+ // Enqueue waiting requests and try to complete in-progress ones
283
+ process_benchmark_requests ( & mut * conn, & mut index) . await ?;
284
+
283
285
Ok ( ( ) )
284
286
}
285
287
0 commit comments