@@ -19,8 +19,8 @@ use nexus_types::deployment::BlueprintSource;
19
19
use nexus_types:: deployment:: PlanningReport ;
20
20
use nexus_types:: deployment:: { Blueprint , BlueprintTarget } ;
21
21
use nexus_types:: internal_api:: background:: BlueprintPlannerStatus ;
22
+ use nexus_types:: inventory:: Collection ;
22
23
use omicron_common:: api:: external:: LookupType ;
23
- use omicron_uuid_kinds:: CollectionUuid ;
24
24
use omicron_uuid_kinds:: GenericUuid as _;
25
25
use serde_json:: json;
26
26
use slog_error_chain:: InlineErrorChain ;
@@ -31,7 +31,7 @@ use tokio::sync::watch::{self, Receiver, Sender};
31
31
pub struct BlueprintPlanner {
32
32
datastore : Arc < DataStore > ,
33
33
rx_config : Receiver < ReconfiguratorConfigLoaderState > ,
34
- rx_inventory : Receiver < Option < CollectionUuid > > ,
34
+ rx_inventory : Receiver < Option < Arc < Collection > > > ,
35
35
rx_blueprint : Receiver < Option < Arc < ( BlueprintTarget , Blueprint ) > > > ,
36
36
tx_blueprint : Sender < Option < Arc < ( BlueprintTarget , Blueprint ) > > > ,
37
37
blueprint_limit : u64 ,
@@ -57,7 +57,7 @@ impl BlueprintPlanner {
57
57
pub fn new (
58
58
datastore : Arc < DataStore > ,
59
59
rx_config : Receiver < ReconfiguratorConfigLoaderState > ,
60
- rx_inventory : Receiver < Option < CollectionUuid > > ,
60
+ rx_inventory : Receiver < Option < Arc < Collection > > > ,
61
61
rx_blueprint : Receiver < Option < Arc < ( BlueprintTarget , Blueprint ) > > > ,
62
62
) -> Self {
63
63
let ( tx_blueprint, _) = watch:: channel ( None ) ;
@@ -114,10 +114,12 @@ impl BlueprintPlanner {
114
114
let ( target, parent) = & * loaded;
115
115
let parent_blueprint_id = parent. id ;
116
116
117
- // Get the inventory most recently seen by the collection
118
- // background task. The value is `Copy`, so with the deref
119
- // we don't block the channel.
120
- let Some ( collection_id) = * self . rx_inventory . borrow_and_update ( ) else {
117
+ // Get the inventory most recently seen by the inventory loader
118
+ // background task. We clone the Arc to avoid keeping the channel locked
119
+ // for the rest of our execution.
120
+ let Some ( collection) =
121
+ self . rx_inventory . borrow_and_update ( ) . as_ref ( ) . map ( Arc :: clone)
122
+ else {
121
123
warn ! (
122
124
& opctx. log,
123
125
"blueprint planning skipped" ;
@@ -127,25 +129,6 @@ impl BlueprintPlanner {
127
129
"no inventory collection available" ,
128
130
) ) ;
129
131
} ;
130
- let collection = match self
131
- . datastore
132
- . inventory_collection_read ( opctx, collection_id)
133
- . await
134
- {
135
- Ok ( collection) => collection,
136
- Err ( error) => {
137
- error ! (
138
- & opctx. log,
139
- "can't read inventory collection" ;
140
- "collection_id" => %collection_id,
141
- "error" => %error,
142
- ) ;
143
- return BlueprintPlannerStatus :: Error ( format ! (
144
- "can't read inventory collection {}: {}" ,
145
- collection_id, error
146
- ) ) ;
147
- }
148
- } ;
149
132
150
133
// Assemble the planning context.
151
134
let input = match PlanningInputFromDb :: assemble (
@@ -438,6 +421,7 @@ mod test {
438
421
use crate :: app:: background:: tasks:: blueprint_execution:: BlueprintExecutor ;
439
422
use crate :: app:: background:: tasks:: blueprint_load:: TargetBlueprintLoader ;
440
423
use crate :: app:: background:: tasks:: inventory_collection:: InventoryCollector ;
424
+ use crate :: app:: background:: tasks:: inventory_load:: InventoryLoader ;
441
425
use crate :: app:: { background:: Activator , quiesce:: NexusQuiesceHandle } ;
442
426
use assert_matches:: assert_matches;
443
427
use nexus_inventory:: now_db_precision;
@@ -467,10 +451,10 @@ mod test {
467
451
468
452
// Spin up the blueprint loader background task.
469
453
let ( tx_loader, _) = watch:: channel ( None ) ;
470
- let mut loader =
454
+ let mut bp_loader =
471
455
TargetBlueprintLoader :: new ( datastore. clone ( ) , tx_loader) ;
472
- let mut rx_loader = loader . watcher ( ) ;
473
- loader . activate ( & opctx) . await ;
456
+ let mut rx_loader = bp_loader . watcher ( ) ;
457
+ bp_loader . activate ( & opctx) . await ;
474
458
let ( _initial_target, initial_blueprint) = & * rx_loader
475
459
. borrow_and_update ( )
476
460
. clone ( )
@@ -490,9 +474,14 @@ mod test {
490
474
1 ,
491
475
false ,
492
476
) ;
493
- let rx_collector = collector. watcher ( ) ;
494
477
collector. activate ( & opctx) . await ;
495
478
479
+ // Spin up the inventory loader background task.
480
+ let mut inv_loader =
481
+ InventoryLoader :: new ( datastore. clone ( ) , watch:: Sender :: new ( None ) ) ;
482
+ let rx_inventory = inv_loader. watcher ( ) ;
483
+ inv_loader. activate ( & opctx) . await ;
484
+
496
485
// Enable the planner
497
486
let ( _tx, rx_config_loader) = watch:: channel (
498
487
ReconfiguratorConfigLoaderState :: Loaded ( ReconfiguratorConfigView {
@@ -509,10 +498,9 @@ mod test {
509
498
let mut planner = BlueprintPlanner :: new (
510
499
datastore. clone ( ) ,
511
500
rx_config_loader,
512
- rx_collector ,
501
+ rx_inventory ,
513
502
rx_loader. clone ( ) ,
514
503
) ;
515
- let _rx_planner = planner. watcher ( ) ;
516
504
517
505
// On activation, the planner should run successfully and generate
518
506
// a new target blueprint.
@@ -536,7 +524,7 @@ mod test {
536
524
} ;
537
525
538
526
// Load and check the new target blueprint.
539
- loader . activate ( & opctx) . await ;
527
+ bp_loader . activate ( & opctx) . await ;
540
528
let ( target, blueprint) = & * rx_loader
541
529
. borrow_and_update ( )
542
530
. clone ( )
@@ -571,7 +559,7 @@ mod test {
571
559
. expect ( "can't enable execution" ) ;
572
560
573
561
// Ping the loader again so it gets the updated target.
574
- loader . activate ( & opctx) . await ;
562
+ bp_loader . activate ( & opctx) . await ;
575
563
let ( target, blueprint) = & * rx_loader
576
564
. borrow_and_update ( )
577
565
. clone ( )
@@ -584,6 +572,7 @@ mod test {
584
572
585
573
// Trigger an inventory collection.
586
574
collector. activate ( & opctx) . await ;
575
+ inv_loader. activate ( & opctx) . await ;
587
576
588
577
// Execute the plan.
589
578
let ( dummy_tx, _dummy_rx) = watch:: channel ( PendingMgsUpdates :: new ( ) ) ;
0 commit comments