@@ -363,6 +363,11 @@ impl Layout {
363
363
/// also block queries to the deployment, often for extended periods of
364
364
/// time. The rebuild strategy never blocks reads, it only ever blocks
365
365
/// writes.
366
+ ///
367
+ /// This method will only return an `Err` if storing pruning status
368
+ /// fails, e.g. because the database is not available. All errors that
369
+ /// happen during pruning itself will be stored in the `prune_state`
370
+ /// table and this method will return `Ok`
366
371
pub fn prune (
367
372
self : Arc < Self > ,
368
373
logger : & Logger ,
@@ -373,28 +378,38 @@ impl Layout {
373
378
) -> Result < ( ) , CancelableError < StoreError > > {
374
379
let tracker = status:: Tracker :: new ( conn, self . clone ( ) ) ?;
375
380
376
- reporter . start ( req) ;
381
+ let res = self . prune_inner ( logger , reporter , conn , req, cancel , & tracker ) ;
377
382
378
- let stats = self . version_stats ( conn, reporter, true , cancel) ?;
383
+ match res {
384
+ Ok ( _) => {
385
+ tracker. finish ( conn) ?;
386
+ }
387
+ Err ( e) => {
388
+ // If we get an error, we need to set the error in the
389
+ // database and finish the tracker
390
+ let err = e. to_string ( ) ;
391
+ tracker. error ( conn, & err) ?;
392
+ }
393
+ }
394
+
395
+ Ok ( ( ) )
396
+ }
379
397
398
+ fn prune_inner (
399
+ self : Arc < Self > ,
400
+ logger : & Logger ,
401
+ reporter : & mut dyn PruneReporter ,
402
+ conn : & mut PgConnection ,
403
+ req : & PruneRequest ,
404
+ cancel : & CancelHandle ,
405
+ tracker : & status:: Tracker ,
406
+ ) -> Result < ( ) , CancelableError < StoreError > > {
407
+ reporter. start ( req) ;
408
+ let stats = self . version_stats ( conn, reporter, true , cancel) ?;
380
409
let prunable_tables: Vec < _ > = self . prunable_tables ( & stats, req) . into_iter ( ) . collect ( ) ;
381
410
tracker. start ( conn, req, & prunable_tables) ?;
382
-
383
- // create a shadow namespace where we will put the copies of our
384
- // tables, but only create it in the database if we really need it
385
411
let dst_nsp = Namespace :: prune ( self . site . id ) ;
386
412
let mut recreate_dst_nsp = true ;
387
-
388
- // Go table by table; note that the subgraph writer can write in
389
- // between the execution of the `with_lock` block below, and might
390
- // therefore work with tables where some are pruned and some are not
391
- // pruned yet. That does not affect correctness since we make no
392
- // assumption about where the subgraph head is. If the subgraph
393
- // advances during this loop, we might have an unnecessarily
394
- // pessimistic but still safe value for `final_block`. We do assume
395
- // that `final_block` is far enough from the subgraph head that it
396
- // stays final even if a revert happens during this loop, but that
397
- // is the definition of 'final'
398
413
for ( table, strat) in & prunable_tables {
399
414
reporter. start_table ( table. name . as_str ( ) ) ;
400
415
tracker. start_table ( conn, table) ?;
@@ -417,7 +432,7 @@ impl Layout {
417
432
pair. copy_final_entities (
418
433
conn,
419
434
reporter,
420
- & tracker,
435
+ tracker,
421
436
req. earliest_block ,
422
437
req. final_block ,
423
438
cancel,
@@ -427,7 +442,7 @@ impl Layout {
427
442
// see also: deployment-lock-for-update
428
443
reporter. start_switch ( ) ;
429
444
deployment:: with_lock ( conn, & self . site , |conn| -> Result < _ , StoreError > {
430
- pair. copy_nonfinal_entities ( conn, reporter, & tracker, req. final_block ) ?;
445
+ pair. copy_nonfinal_entities ( conn, reporter, tracker, req. final_block ) ?;
431
446
cancel. check_cancel ( ) . map_err ( CancelableError :: from) ?;
432
447
433
448
conn. transaction ( |conn| pair. switch ( logger, conn) ) ?;
@@ -473,22 +488,15 @@ impl Layout {
473
488
reporter. finish_table ( table. name . as_str ( ) ) ;
474
489
tracker. finish_table ( conn, table) ?;
475
490
}
476
- // Get rid of the temporary prune schema if we actually created it
477
491
if !recreate_dst_nsp {
478
492
catalog:: drop_schema ( conn, dst_nsp. as_str ( ) ) ?;
479
493
}
480
-
481
494
for ( table, _) in & prunable_tables {
482
495
catalog:: set_last_pruned_block ( conn, & self . site , & table. name , req. earliest_block ) ?;
483
496
}
484
-
485
- // Analyze the new tables
486
497
let tables = prunable_tables. iter ( ) . map ( |( table, _) | * table) . collect ( ) ;
487
498
self . analyze_tables ( conn, reporter, tables, cancel) ?;
488
-
489
499
reporter. finish ( ) ;
490
- tracker. finish ( conn) ?;
491
-
492
500
Ok ( ( ) )
493
501
}
494
502
}
@@ -875,6 +883,21 @@ mod status {
875
883
. execute ( conn) ?;
876
884
Ok ( ( ) )
877
885
}
886
+
887
+ pub ( crate ) fn error ( & self , conn : & mut PgConnection , err : & str ) -> StoreResult < ( ) > {
888
+ use prune_state as ps;
889
+
890
+ update ( ps:: table)
891
+ . filter ( ps:: id. eq ( self . layout . site . id ) )
892
+ . filter ( ps:: run. eq ( self . run ) )
893
+ . set ( (
894
+ ps:: finished_at. eq ( diesel:: dsl:: now) ,
895
+ ps:: errored_at. eq ( diesel:: dsl:: now) ,
896
+ ps:: error. eq ( err) ,
897
+ ) )
898
+ . execute ( conn) ?;
899
+ Ok ( ( ) )
900
+ }
878
901
}
879
902
880
903
/// A helper to read pruning progress from the database
0 commit comments