@@ -15,24 +15,32 @@ use serde_json::json;
15
15
use std:: sync:: Arc ;
16
16
use tokio:: sync:: watch;
17
17
18
+ /// Enum that allows downstream tasks to know whether this task has had a chance
19
+ /// to read the current chicken switches from the database.
20
+ #[ derive( Debug , Clone , PartialEq , Eq ) ]
21
+ pub enum ReconfiguratorChickenSwitchesLoaderState {
22
+ NotYetLoaded ,
23
+ Loaded ( ReconfiguratorChickenSwitchesView ) ,
24
+ }
25
+
18
26
/// Background task that tracks reconfigurator chicken switches from the DB
19
27
pub struct ChickenSwitchesLoader {
20
28
datastore : Arc < DataStore > ,
21
- tx : watch:: Sender < ReconfiguratorChickenSwitchesView > ,
22
- rx : watch:: Receiver < ReconfiguratorChickenSwitchesView > ,
29
+ tx : watch:: Sender < ReconfiguratorChickenSwitchesLoaderState > ,
23
30
}
24
31
25
32
impl ChickenSwitchesLoader {
26
33
pub fn new ( datastore : Arc < DataStore > ) -> Self {
27
- let ( tx, rx) =
28
- watch:: channel ( ReconfiguratorChickenSwitchesView :: default ( ) ) ;
29
- Self { datastore, tx, rx }
34
+ let ( tx, _rx) = watch:: channel (
35
+ ReconfiguratorChickenSwitchesLoaderState :: NotYetLoaded ,
36
+ ) ;
37
+ Self { datastore, tx }
30
38
}
31
39
32
40
pub fn watcher (
33
41
& self ,
34
- ) -> watch:: Receiver < ReconfiguratorChickenSwitchesView > {
35
- self . rx . clone ( )
42
+ ) -> watch:: Receiver < ReconfiguratorChickenSwitchesLoaderState > {
43
+ self . tx . subscribe ( )
36
44
}
37
45
}
38
46
@@ -55,15 +63,21 @@ impl BackgroundTask for ChickenSwitchesLoader {
55
63
json ! ( { "error" : message } )
56
64
}
57
65
Ok ( switches) => {
58
- let switches = switches. unwrap_or_default ( ) ;
66
+ let switches =
67
+ ReconfiguratorChickenSwitchesLoaderState :: Loaded (
68
+ switches. unwrap_or_default ( ) ,
69
+ ) ;
59
70
let updated = self . tx . send_if_modified ( |s| {
60
71
if * s != switches {
61
- * s = switches;
72
+ * s = switches. clone ( ) ;
62
73
return true ;
63
74
}
64
75
false
65
76
} ) ;
66
- debug ! ( opctx. log, "chicken switches load complete" ) ;
77
+ debug ! (
78
+ opctx. log, "chicken switches load complete" ;
79
+ "switches" => ?switches,
80
+ ) ;
67
81
json ! ( { "chicken_switches_updated" : updated } )
68
82
}
69
83
}
@@ -94,14 +108,35 @@ mod test {
94
108
) ;
95
109
96
110
let mut task = ChickenSwitchesLoader :: new ( datastore. clone ( ) ) ;
111
+
112
+ // Initial state should be `NotYetLoaded`.
113
+ let mut rx = task. watcher ( ) ;
114
+ assert_eq ! (
115
+ * rx. borrow_and_update( ) ,
116
+ ReconfiguratorChickenSwitchesLoaderState :: NotYetLoaded
117
+ ) ;
118
+
119
+ // We haven't inserted anything into the DB, so the initial activation
120
+ // should populate the channel with our default values.
121
+ let default_switches = ReconfiguratorChickenSwitchesView :: default ( ) ;
97
122
let out = task. activate ( & opctx) . await ;
98
- assert_eq ! ( out[ "chicken_switches_updated" ] , false ) ;
123
+ assert_eq ! ( out[ "chicken_switches_updated" ] , true ) ;
124
+ assert ! ( rx. has_changed( ) . unwrap( ) ) ;
125
+ assert_eq ! (
126
+ * rx. borrow_and_update( ) ,
127
+ ReconfiguratorChickenSwitchesLoaderState :: Loaded (
128
+ default_switches. clone( )
129
+ )
130
+ ) ;
131
+
132
+ // Insert an initial set of switches.
133
+ let expected_switches = ReconfiguratorChickenSwitches {
134
+ planner_enabled : !default_switches. switches . planner_enabled ,
135
+ planner_switches : PlannerChickenSwitches :: default ( ) ,
136
+ } ;
99
137
let switches = ReconfiguratorChickenSwitchesParam {
100
138
version : 1 ,
101
- switches : ReconfiguratorChickenSwitches {
102
- planner_enabled : true ,
103
- planner_switches : PlannerChickenSwitches :: default ( ) ,
104
- } ,
139
+ switches : expected_switches,
105
140
} ;
106
141
datastore
107
142
. reconfigurator_chicken_switches_insert_latest_version (
@@ -111,14 +146,31 @@ mod test {
111
146
. unwrap ( ) ;
112
147
let out = task. activate ( & opctx) . await ;
113
148
assert_eq ! ( out[ "chicken_switches_updated" ] , true ) ;
149
+ assert ! ( rx. has_changed( ) . unwrap( ) ) ;
150
+ {
151
+ let view = match rx. borrow_and_update ( ) . clone ( ) {
152
+ ReconfiguratorChickenSwitchesLoaderState :: NotYetLoaded => {
153
+ panic ! ( "unexpected value" )
154
+ }
155
+ ReconfiguratorChickenSwitchesLoaderState :: Loaded ( view) => view,
156
+ } ;
157
+ assert_eq ! ( view. version, 1 ) ;
158
+ assert_eq ! ( view. switches, expected_switches) ;
159
+ }
160
+
161
+ // Activating again should not change things.
114
162
let out = task. activate ( & opctx) . await ;
115
163
assert_eq ! ( out[ "chicken_switches_updated" ] , false ) ;
164
+ assert ! ( !rx. has_changed( ) . unwrap( ) ) ;
165
+
166
+ // Insert a new version.
167
+ let expected_switches = ReconfiguratorChickenSwitches {
168
+ planner_enabled : !expected_switches. planner_enabled ,
169
+ planner_switches : PlannerChickenSwitches :: default ( ) ,
170
+ } ;
116
171
let switches = ReconfiguratorChickenSwitchesParam {
117
172
version : 2 ,
118
- switches : ReconfiguratorChickenSwitches {
119
- planner_enabled : false ,
120
- planner_switches : PlannerChickenSwitches :: default ( ) ,
121
- } ,
173
+ switches : expected_switches,
122
174
} ;
123
175
datastore
124
176
. reconfigurator_chicken_switches_insert_latest_version (
@@ -128,5 +180,16 @@ mod test {
128
180
. unwrap ( ) ;
129
181
let out = task. activate ( & opctx) . await ;
130
182
assert_eq ! ( out[ "chicken_switches_updated" ] , true ) ;
183
+ assert ! ( rx. has_changed( ) . unwrap( ) ) ;
184
+ {
185
+ let view = match rx. borrow_and_update ( ) . clone ( ) {
186
+ ReconfiguratorChickenSwitchesLoaderState :: NotYetLoaded => {
187
+ panic ! ( "unexpected value" )
188
+ }
189
+ ReconfiguratorChickenSwitchesLoaderState :: Loaded ( view) => view,
190
+ } ;
191
+ assert_eq ! ( view. version, 2 ) ;
192
+ assert_eq ! ( view. switches, expected_switches) ;
193
+ }
131
194
}
132
195
}
0 commit comments