Skip to content

Commit a4c14cf

Browse files
committed
Launch watchers once per operator process
Operator would pick up a cluster to be installed and not launch watchers. Use a mutex in the cluster context to launch a watcher per cluster object. Signed-off-by: Jakob Naucke <[email protected]>
1 parent 9d84842 commit a4c14cf

File tree

3 files changed

+120
-20
lines changed

3 files changed

+120
-20
lines changed

operator/src/main.rs

Lines changed: 102 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44
// SPDX-License-Identifier: MIT
55

6-
use std::sync::Arc;
6+
use std::sync::{Arc, Mutex};
77
use std::time::Duration;
88

99
use anyhow::Result;
@@ -29,6 +29,12 @@ mod trustee;
2929
use crate::conditions::*;
3030
use operator::*;
3131

32+
struct ClusterContext {
33+
client: Client,
34+
/// UID of cluster that watchers are based on
35+
uid: Mutex<Option<String>>,
36+
}
37+
3238
fn is_installed(status: Option<TrustedExecutionClusterStatus>) -> bool {
3339
let chk = |c: &Condition| c.type_ == INSTALLED_CONDITION && c.status == "True";
3440
status
@@ -37,16 +43,53 @@ fn is_installed(status: Option<TrustedExecutionClusterStatus>) -> bool {
3743
.unwrap_or(false)
3844
}
3945

46+
/// Launch watchers. Is run once per TrustedExecutionCluster and operator process.
47+
/// Returns whether watchers were launched.
48+
async fn launch_watchers(
49+
cluster: Arc<TrustedExecutionCluster>,
50+
ctx: Arc<ClusterContext>,
51+
name: &str,
52+
) -> Result<bool> {
53+
let client = ctx.client.clone();
54+
let mut launch_watchers = false;
55+
if let Ok(mut ctx_uid) = ctx.uid.lock() {
56+
let err = format!("TrustedExecutionCluster {name} had no UID");
57+
let cluster_uid = cluster.metadata.uid.clone().expect(&err);
58+
if ctx_uid.is_none() || ctx_uid.clone() != Some(cluster_uid.clone()) {
59+
launch_watchers = true;
60+
*ctx_uid = Some(cluster_uid);
61+
}
62+
} else {
63+
warn!("Failed to acquire lock on context UID store");
64+
}
65+
if launch_watchers {
66+
info!(
67+
"First registration of TrustedExecutionCluster {name} by this operator. \
68+
Launching watchers."
69+
);
70+
register_server::launch_keygen_controller(client.clone()).await;
71+
let owner_reference = generate_owner_reference(&*cluster)?;
72+
let rv_ctx = RvContextData {
73+
client,
74+
owner_reference: owner_reference.clone(),
75+
pcrs_compute_image: cluster.spec.pcrs_compute_image.clone(),
76+
};
77+
reference_values::launch_rv_image_controller(rv_ctx.clone()).await;
78+
reference_values::launch_rv_job_controller(rv_ctx.clone()).await;
79+
}
80+
Ok(launch_watchers)
81+
}
82+
4083
async fn reconcile(
4184
cluster: Arc<TrustedExecutionCluster>,
42-
client: Arc<Client>,
85+
ctx: Arc<ClusterContext>,
4386
) -> Result<Action, ControllerError> {
4487
let generation = cluster.metadata.generation;
4588
let known_address = cluster.spec.public_trustee_addr.is_some();
4689
let address_condition = known_trustee_address_condition(known_address, generation);
4790
let mut conditions = Some(vec![address_condition]);
4891

49-
let kube_client = Arc::unwrap_or_clone(client);
92+
let kube_client = ctx.client.clone();
5093
let err = "trusted execution cluster had no name";
5194
let name = &cluster.metadata.name.clone().expect(err);
5295
let clusters: Api<TrustedExecutionCluster> = Api::default_namespaced(kube_client.clone());
@@ -59,6 +102,7 @@ async fn reconcile(
59102
return Ok(Action::await_change());
60103
}
61104

105+
let _ = launch_watchers(cluster.clone(), ctx, name).await?;
62106
if is_installed(cluster.status.clone()) {
63107
return Ok(Action::await_change());
64108
}
@@ -105,13 +149,6 @@ async fn install_trustee_configuration(
105149
Err(e) => error!("Failed to create the KBS configuration configmap: {e}"),
106150
}
107151

108-
let rv_ctx = RvContextData {
109-
client: client.clone(),
110-
owner_reference: owner_reference.clone(),
111-
pcrs_compute_image: cluster.spec.pcrs_compute_image.clone(),
112-
};
113-
reference_values::launch_rv_image_controller(rv_ctx.clone()).await;
114-
reference_values::launch_rv_job_controller(rv_ctx.clone()).await;
115152
match reference_values::create_pcrs_config_map(client.clone(), owner_reference.clone()).await {
116153
Ok(_) => info!("Created bare configmap for PCRs"),
117154
Err(e) => error!("Failed to create the PCRs configmap: {e}"),
@@ -159,8 +196,6 @@ async fn install_register_server(client: Client, cluster: &TrustedExecutionClust
159196
Err(e) => error!("Failed to create register server service: {e}"),
160197
}
161198

162-
register_server::launch_keygen_controller(client).await;
163-
164199
Ok(())
165200
}
166201

@@ -172,9 +207,12 @@ async fn main() -> Result<()> {
172207
info!("trusted execution clusters operator",);
173208
let cl: Api<TrustedExecutionCluster> = Api::default_namespaced(kube_client.clone());
174209

175-
let client = Arc::new(kube_client);
210+
let ctx = Arc::new(ClusterContext {
211+
client: kube_client,
212+
uid: Mutex::new(None),
213+
});
176214
Controller::new(cl, watcher::Config::default())
177-
.run(reconcile, controller_error_policy, client)
215+
.run(reconcile, controller_error_policy, ctx)
178216
.for_each(controller_info)
179217
.await;
180218

@@ -191,6 +229,47 @@ mod tests {
191229
use super::*;
192230
use trusted_cluster_operator_test_utils::mock_client::*;
193231

232+
fn dummy_cluster_ctx(client: Client) -> ClusterContext {
233+
ClusterContext {
234+
client,
235+
uid: Mutex::new(None),
236+
}
237+
}
238+
239+
#[tokio::test]
240+
async fn test_launch_watchers_create() {
241+
let clos = async |req, ctr| panic!("unexpected API interaction: {req:?}, counter {ctr}");
242+
count_check!(0, clos, |client| {
243+
let cluster = Arc::new(dummy_cluster());
244+
let ctx = Arc::new(dummy_cluster_ctx(client));
245+
assert!(launch_watchers(cluster, ctx, "test").await.unwrap());
246+
});
247+
}
248+
249+
#[tokio::test]
250+
async fn test_launch_watchers_update() {
251+
let clos = async |req, ctr| panic!("unexpected API interaction: {req:?}, counter {ctr}");
252+
count_check!(0, clos, |client| {
253+
let cluster = Arc::new(dummy_cluster());
254+
let mut ctx = dummy_cluster_ctx(client);
255+
ctx.uid = Mutex::new(Some("def".to_string()));
256+
let result = launch_watchers(cluster, Arc::new(ctx), "test");
257+
assert!(result.await.unwrap());
258+
});
259+
}
260+
261+
#[tokio::test]
262+
async fn test_launch_watchers_existing() {
263+
let clos = async |req, ctr| panic!("unexpected API interaction: {req:?}, counter {ctr}");
264+
count_check!(0, clos, |client| {
265+
let cluster = dummy_cluster();
266+
let mut ctx = dummy_cluster_ctx(client);
267+
ctx.uid = Mutex::new(cluster.metadata.uid.clone());
268+
let result = launch_watchers(Arc::new(cluster), Arc::new(ctx), "test");
269+
assert!(!result.await.unwrap());
270+
});
271+
}
272+
194273
#[tokio::test]
195274
async fn test_reconcile_uninstalling() {
196275
let clos = async |req: Request<Body>, ctr| match req.method() {
@@ -203,7 +282,7 @@ mod tests {
203282
count_check!(1, clos, |client| {
204283
let mut cluster = dummy_cluster();
205284
cluster.metadata.deletion_timestamp = Some(Time(Utc::now()));
206-
let result = reconcile(Arc::new(cluster), Arc::new(client)).await;
285+
let result = reconcile(Arc::new(cluster), Arc::new(dummy_cluster_ctx(client))).await;
207286
assert_eq!(result.unwrap(), Action::await_change());
208287
});
209288
}
@@ -218,16 +297,19 @@ mod tests {
218297
metadata: Default::default(),
219298
};
220299
Ok(serde_json::to_string(&object_list).unwrap())
221-
} else if ctr == 1 && req.method() == Method::PATCH {
300+
} else if 1 < ctr && ctr < 4 {
301+
// Watchers
302+
Ok(serde_json::to_string(&dummy_cluster()).unwrap())
303+
} else if ctr == 4 && req.method() == Method::PATCH {
222304
assert_body_contains(req, NOT_INSTALLED_REASON_NON_UNIQUE).await;
223305
Ok(serde_json::to_string(&dummy_cluster()).unwrap())
224306
} else {
225307
panic!("unexpected API interaction: {req:?}, counter {ctr}");
226308
}
227309
};
228-
count_check!(2, clos, |client| {
310+
count_check!(5, clos, |client| {
229311
let cluster = Arc::new(dummy_cluster());
230-
let result = reconcile(cluster, Arc::new(client)).await;
312+
let result = reconcile(cluster, Arc::new(dummy_cluster_ctx(client))).await;
231313
assert_eq!(result.unwrap(), Action::requeue(Duration::from_secs(60)));
232314
});
233315
}
@@ -238,9 +320,9 @@ mod tests {
238320
r if r.method() == Method::GET => Err(StatusCode::INTERNAL_SERVER_ERROR),
239321
_ => panic!("unexpected API interaction: {req:?}"),
240322
};
241-
count_check!(1, clos, |client| {
323+
count_check!(4, clos, |client| {
242324
let cluster = Arc::new(dummy_cluster());
243-
let result = reconcile(cluster, Arc::new(client)).await;
325+
let result = reconcile(cluster, Arc::new(dummy_cluster_ctx(client))).await;
244326
assert!(result.is_err());
245327
});
246328
}

test_utils/src/mock_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ pub fn dummy_cluster() -> TrustedExecutionCluster {
178178
TrustedExecutionCluster {
179179
metadata: ObjectMeta {
180180
name: Some("test".to_string()),
181+
uid: Some("uid".to_string()),
181182
..Default::default()
182183
},
183184
status: None,

tests/attestation.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,3 +346,20 @@ async fn test_vm_restart_operator_existing() -> anyhow::Result<()> {
346346
Ok(())
347347
}
348348
}
349+
350+
virt_test! {
351+
async fn test_vm_restart_operator_new() -> anyhow::Result<()> {
352+
let test_ctx = setup!().await?;
353+
test_ctx.info("Testing operator restart - new VM should boot");
354+
let vm_name = "test-coreos-operator-restart-new";
355+
356+
let deployments: Api<Deployment> =
357+
Api::namespaced(test_ctx.client().clone(), test_ctx.namespace());
358+
deployments.restart("trusted-cluster-operator").await?;
359+
test_ctx.info("Restarted operator deployment");
360+
361+
let _ = SingleAttestationContext::new(vm_name, &test_ctx).await?;
362+
test_ctx.cleanup().await?;
363+
Ok(())
364+
}
365+
}

0 commit comments

Comments
 (0)