@@ -32,7 +32,7 @@ pub fn should_use_new_system(pr: u32) -> bool {
32
32
/// Returns `true` if at least one benchmark request was inserted.
33
33
async fn create_benchmark_request_master_commits (
34
34
ctxt : & SiteCtxt ,
35
- conn : & dyn database:: pool:: Connection ,
35
+ _conn : & dyn database:: pool:: Connection ,
36
36
index : & BenchmarkRequestIndex ,
37
37
) -> anyhow:: Result < bool > {
38
38
let now = Utc :: now ( ) ;
@@ -47,7 +47,7 @@ async fn create_benchmark_request_master_commits(
47
47
// TODO; delete at some point in the future
48
48
let cutoff: chrono:: DateTime < Utc > = chrono:: DateTime :: from_str ( "2025-08-27T00:00:00.000Z" ) ?;
49
49
50
- let mut inserted = false ;
50
+ let inserted = false ;
51
51
for master_commit in master_commits {
52
52
// We don't want to add masses of obsolete data
53
53
if master_commit. time >= cutoff && !index. contains_tag ( & master_commit. sha ) {
@@ -59,11 +59,14 @@ async fn create_benchmark_request_master_commits(
59
59
master_commit. time ,
60
60
) ;
61
61
log:: info!( "Inserting master benchmark request {benchmark:?}" ) ;
62
- if let Err ( error) = conn. insert_benchmark_request ( & benchmark) . await {
63
- log:: error!( "Failed to insert master benchmark request: {error:?}" ) ;
64
- } else {
65
- inserted = true ;
66
- }
62
+
63
+ // Do not create benchmark requests on production, to allow running in parallel with
64
+ // the old system.
65
+ // if let Err(error) = conn.insert_benchmark_request(&benchmark).await {
66
+ // log::error!("Failed to insert master benchmark request: {error:?}");
67
+ // } else {
68
+ // inserted = true;
69
+ // }
67
70
}
68
71
}
69
72
Ok ( inserted)
@@ -73,7 +76,7 @@ async fn create_benchmark_request_master_commits(
73
76
/// already in the database
74
77
/// Returns `true` if at least one benchmark request was inserted.
75
78
async fn create_benchmark_request_releases (
76
- conn : & dyn database:: pool:: Connection ,
79
+ _conn : & dyn database:: pool:: Connection ,
77
80
index : & BenchmarkRequestIndex ,
78
81
) -> anyhow:: Result < bool > {
79
82
let releases: String = reqwest:: get ( "https://static.rust-lang.org/manifests.txt" )
@@ -89,16 +92,19 @@ async fn create_benchmark_request_releases(
89
92
. filter_map ( parse_release_string)
90
93
. take ( 20 ) ;
91
94
92
- let mut inserted = false ;
95
+ let inserted = false ;
93
96
for ( name, commit_date) in releases {
94
97
if commit_date >= cutoff && !index. contains_tag ( & name) {
95
98
let release_request = BenchmarkRequest :: create_release ( & name, commit_date) ;
96
99
log:: info!( "Inserting release benchmark request {release_request:?}" ) ;
97
- if let Err ( error) = conn. insert_benchmark_request ( & release_request) . await {
98
- log:: error!( "Failed to insert release benchmark request: {error}" ) ;
99
- } else {
100
- inserted = true ;
101
- }
100
+
101
+ // Do not create benchmark requests on production, to allow running in parallel with
102
+ // the old system.
103
+ // if let Err(error) = conn.insert_benchmark_request(&release_request).await {
104
+ // log::error!("Failed to insert release benchmark request: {error}");
105
+ // } else {
106
+ // inserted = true;
107
+ // }
102
108
}
103
109
}
104
110
Ok ( inserted)
@@ -211,18 +217,18 @@ pub async fn build_queue(
211
217
/// This is performed atomically, in a transaction.
212
218
pub async fn enqueue_benchmark_request (
213
219
conn : & mut dyn database:: pool:: Connection ,
214
- benchmark_request : & BenchmarkRequest ,
220
+ request : & BenchmarkRequest ,
215
221
) -> anyhow:: Result < ( ) > {
216
222
let mut tx = conn. transaction ( ) . await ;
217
223
218
- let Some ( request_tag) = benchmark_request . tag ( ) else {
219
- panic ! ( "Benchmark request {benchmark_request :?} has no tag" ) ;
224
+ let Some ( request_tag) = request . tag ( ) else {
225
+ panic ! ( "Benchmark request {request :?} has no tag" ) ;
220
226
} ;
221
227
222
- log:: info!( "Enqueuing jobs for request {benchmark_request :?}" ) ;
228
+ log:: info!( "Enqueuing jobs for request {request :?}" ) ;
223
229
224
- let backends = benchmark_request . backends ( ) ?;
225
- let profiles = benchmark_request . profiles ( ) ?;
230
+ let backends = request . backends ( ) ?;
231
+ let profiles = request . profiles ( ) ?;
226
232
// Prevent the error from spamming the logs
227
233
let mut has_emitted_parent_sha_error = false ;
228
234
@@ -245,32 +251,36 @@ pub async fn enqueue_benchmark_request(
245
251
// If the parent job has been deleted from the database
246
252
// but was already benchmarked then the collector will ignore
247
253
// it as it will see it already has results.
248
- if let Some ( parent_sha) = benchmark_request. parent_sha ( ) {
249
- let ( is_foreign_key_violation, result) = tx
250
- . conn ( )
251
- . enqueue_parent_benchmark_job (
252
- parent_sha,
253
- target,
254
- backend,
255
- profile,
256
- benchmark_set as u32 ,
257
- )
258
- . await ;
259
-
260
- // At some point in time the parent_sha may not refer
261
- // to a `benchmark_request` and we want to be able to
262
- // see that error.
263
- if let Err ( e) = result {
264
- if is_foreign_key_violation && !has_emitted_parent_sha_error {
265
- log:: error!( "Failed to create job for parent sha {e:?}" ) ;
266
- has_emitted_parent_sha_error = true ;
267
- } else if has_emitted_parent_sha_error && is_foreign_key_violation {
268
- continue ;
269
- } else {
270
- return Err ( e) ;
271
- }
272
- }
273
- }
254
+
255
+ // Do not enqueue parent jobs to allow parallel execution with the old system
256
+ // If the parent artifact wouldn't be benchmarked yet, we would benchmark the
257
+ // parent with the new system.
258
+ // if let Some(parent_sha) = request.parent_sha() {
259
+ // let (is_foreign_key_violation, result) = tx
260
+ // .conn()
261
+ // .enqueue_parent_benchmark_job(
262
+ // parent_sha,
263
+ // target,
264
+ // backend,
265
+ // profile,
266
+ // benchmark_set as u32,
267
+ // )
268
+ // .await;
269
+ //
270
+ // // At some point in time the parent_sha may not refer
271
+ // // to a `benchmark_request` and we want to be able to
272
+ // // see that error.
273
+ // if let Err(e) = result {
274
+ // if is_foreign_key_violation && !has_emitted_parent_sha_error {
275
+ // log::error!("Failed to create job for parent sha {e:?}");
276
+ // has_emitted_parent_sha_error = true;
277
+ // } else if has_emitted_parent_sha_error && is_foreign_key_violation {
278
+ // continue;
279
+ // } else {
280
+ // return Err(e);
281
+ // }
282
+ // }
283
+ // }
274
284
}
275
285
}
276
286
}
@@ -294,12 +304,15 @@ async fn process_benchmark_requests(
294
304
) -> anyhow:: Result < Vec < BenchmarkRequest > > {
295
305
let queue = build_queue ( conn) . await ?;
296
306
307
+ log:: debug!( "Current queue: {queue:?}" ) ;
308
+
297
309
let mut completed = vec ! [ ] ;
298
310
for request in queue {
299
311
match request. status ( ) {
300
312
BenchmarkRequestStatus :: InProgress => {
301
313
let tag = request. tag ( ) . expect ( "In progress request without a tag" ) ;
302
314
if conn. maybe_mark_benchmark_request_as_completed ( tag) . await ? {
315
+ log:: info!( "Request {tag} marked as completed" ) ;
303
316
completed. push ( request) ;
304
317
continue ;
305
318
}
@@ -318,8 +331,9 @@ async fn process_benchmark_requests(
318
331
Ok ( completed)
319
332
}
320
333
321
- /// For queueing jobs, add the jobs you want to queue to this function
322
- async fn cron_enqueue_jobs ( ctxt : & SiteCtxt ) -> anyhow:: Result < ( ) > {
334
+ /// Creates new benchmark requests, enqueues jobs for ready benchmark requests and
335
+ /// finishes completed benchmark requests.
336
+ async fn perform_queue_tick ( ctxt : & SiteCtxt ) -> anyhow:: Result < ( ) > {
323
337
let mut conn = ctxt. conn ( ) . await ;
324
338
325
339
let index = ctxt. known_benchmark_requests . load ( ) ;
@@ -394,7 +408,10 @@ async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> {
394
408
}
395
409
396
410
/// Entry point for the cron job that manages the benchmark request and job queue.
397
- pub async fn cron_main ( site_ctxt : Arc < RwLock < Option < Arc < SiteCtxt > > > > , run_interval : Duration ) {
411
+ pub async fn create_queue_process (
412
+ site_ctxt : Arc < RwLock < Option < Arc < SiteCtxt > > > > ,
413
+ run_interval : Duration ,
414
+ ) {
398
415
let mut interval = time:: interval ( run_interval) ;
399
416
interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
400
417
@@ -405,7 +422,7 @@ pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, run_interv
405
422
let guard = ctxt. read ( ) ;
406
423
guard. as_ref ( ) . cloned ( )
407
424
} {
408
- match cron_enqueue_jobs ( & ctxt_clone) . await {
425
+ match perform_queue_tick ( & ctxt_clone) . await {
409
426
Ok ( _) => log:: info!( "Cron job finished" ) ,
410
427
Err ( e) => log:: error!( "Cron job failed to execute: {e:?}" ) ,
411
428
}
0 commit comments