Skip to content

Commit d63d802

Browse files
authored
Merge pull request #146 from Jakob-Naucke/watch-per-operator-process
Launch watchers once per operator process
2 parents d14b8d5 + 24fa8bc commit d63d802

File tree

4 files changed

+183
-29
lines changed

4 files changed

+183
-29
lines changed

operator/src/main.rs

Lines changed: 102 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44
// SPDX-License-Identifier: MIT
55

6-
use std::sync::Arc;
6+
use std::sync::{Arc, Mutex};
77
use std::time::Duration;
88

99
use anyhow::Result;
@@ -30,6 +30,12 @@ mod trustee;
3030
use crate::conditions::*;
3131
use operator::*;
3232

33+
struct ClusterContext {
34+
client: Client,
35+
/// UID of cluster that watchers are based on
36+
uid: Mutex<Option<String>>,
37+
}
38+
3339
fn is_installed(status: Option<TrustedExecutionClusterStatus>) -> bool {
3440
let chk = |c: &Condition| c.type_ == INSTALLED_CONDITION && c.status == "True";
3541
status
@@ -38,16 +44,52 @@ fn is_installed(status: Option<TrustedExecutionClusterStatus>) -> bool {
3844
.unwrap_or(false)
3945
}
4046

47+
/// Launch reference value-related watchers. Is run once per TrustedExecutionCluster and operator
48+
/// process. Returns whether watchers were launched.
49+
async fn launch_rv_watchers(
50+
cluster: Arc<TrustedExecutionCluster>,
51+
ctx: Arc<ClusterContext>,
52+
name: &str,
53+
) -> Result<bool> {
54+
let client = ctx.client.clone();
55+
let mut launch_watchers = false;
56+
if let Ok(mut ctx_uid) = ctx.uid.lock() {
57+
let err = format!("TrustedExecutionCluster {name} had no UID");
58+
let cluster_uid = cluster.metadata.uid.clone().expect(&err);
59+
if ctx_uid.is_none() || ctx_uid.clone() != Some(cluster_uid.clone()) {
60+
launch_watchers = true;
61+
*ctx_uid = Some(cluster_uid);
62+
}
63+
} else {
64+
warn!("Failed to acquire lock on context UID store");
65+
}
66+
if launch_watchers {
67+
info!(
68+
"First registration of TrustedExecutionCluster {name} by this operator. \
69+
Launching reference value watchers."
70+
);
71+
let owner_reference = generate_owner_reference(&*cluster)?;
72+
let rv_ctx = RvContextData {
73+
client,
74+
owner_reference: owner_reference.clone(),
75+
pcrs_compute_image: cluster.spec.pcrs_compute_image.clone(),
76+
};
77+
reference_values::launch_rv_image_controller(rv_ctx.clone()).await;
78+
reference_values::launch_rv_job_controller(rv_ctx.clone()).await;
79+
}
80+
Ok(launch_watchers)
81+
}
82+
4183
async fn reconcile(
4284
cluster: Arc<TrustedExecutionCluster>,
43-
client: Arc<Client>,
85+
ctx: Arc<ClusterContext>,
4486
) -> Result<Action, ControllerError> {
4587
let generation = cluster.metadata.generation;
4688
let known_address = cluster.spec.public_trustee_addr.is_some();
4789
let address_condition = known_trustee_address_condition(known_address, generation);
4890
let mut conditions = Some(vec![address_condition]);
4991

50-
let kube_client = Arc::unwrap_or_clone(client);
92+
let kube_client = ctx.client.clone();
5193
let err = "trusted execution cluster had no name";
5294
let name = &cluster.metadata.name.clone().expect(err);
5395
let clusters: Api<TrustedExecutionCluster> = Api::default_namespaced(kube_client.clone());
@@ -60,6 +102,7 @@ async fn reconcile(
60102
return Ok(Action::await_change());
61103
}
62104

105+
let _ = launch_rv_watchers(cluster.clone(), ctx, name).await?;
63106
if is_installed(cluster.status.clone()) {
64107
return Ok(Action::await_change());
65108
}
@@ -107,13 +150,6 @@ async fn install_trustee_configuration(
107150
Err(e) => error!("Failed to create the KBS configuration configmap: {e}"),
108151
}
109152

110-
let rv_ctx = RvContextData {
111-
client: client.clone(),
112-
owner_reference: owner_reference.clone(),
113-
pcrs_compute_image: cluster.spec.pcrs_compute_image.clone(),
114-
};
115-
reference_values::launch_rv_image_controller(rv_ctx.clone()).await;
116-
reference_values::launch_rv_job_controller(rv_ctx.clone()).await;
117153
match reference_values::create_pcrs_config_map(client.clone(), owner_reference.clone()).await {
118154
Ok(_) => info!("Created bare configmap for PCRs"),
119155
Err(e) => error!("Failed to create the PCRs configmap: {e}"),
@@ -204,15 +240,18 @@ async fn main() -> Result<()> {
204240
info!("trusted execution clusters operator",);
205241
let cl: Api<TrustedExecutionCluster> = Api::default_namespaced(kube_client.clone());
206242

207-
// Launch all controllers
243+
// Launch all controllers except reference value-related ones
208244
register_server::launch_keygen_controller(kube_client.clone()).await;
209245
attestation_key_register::launch_ak_controller(kube_client.clone()).await;
210246
attestation_key_register::launch_machine_ak_controller(kube_client.clone()).await;
211247
attestation_key_register::launch_secret_ak_controller(kube_client.clone()).await;
212248

213-
let client = Arc::new(kube_client);
249+
let ctx = Arc::new(ClusterContext {
250+
client: kube_client,
251+
uid: Mutex::new(None),
252+
});
214253
Controller::new(cl, watcher::Config::default())
215-
.run(reconcile, controller_error_policy, client)
254+
.run(reconcile, controller_error_policy, ctx)
216255
.for_each(controller_info)
217256
.await;
218257

@@ -229,6 +268,47 @@ mod tests {
229268
use super::*;
230269
use trusted_cluster_operator_test_utils::mock_client::*;
231270

271+
fn dummy_cluster_ctx(client: Client) -> ClusterContext {
272+
ClusterContext {
273+
client,
274+
uid: Mutex::new(None),
275+
}
276+
}
277+
278+
#[tokio::test]
279+
async fn test_launch_watchers_create() {
280+
let clos = async |req, ctr| panic!("unexpected API interaction: {req:?}, counter {ctr}");
281+
count_check!(0, clos, |client| {
282+
let cluster = Arc::new(dummy_cluster());
283+
let ctx = Arc::new(dummy_cluster_ctx(client));
284+
assert!(launch_rv_watchers(cluster, ctx, "test").await.unwrap());
285+
});
286+
}
287+
288+
#[tokio::test]
289+
async fn test_launch_watchers_update() {
290+
let clos = async |req, ctr| panic!("unexpected API interaction: {req:?}, counter {ctr}");
291+
count_check!(0, clos, |client| {
292+
let cluster = Arc::new(dummy_cluster());
293+
let mut ctx = dummy_cluster_ctx(client);
294+
ctx.uid = Mutex::new(Some("def".to_string()));
295+
let result = launch_rv_watchers(cluster, Arc::new(ctx), "test");
296+
assert!(result.await.unwrap());
297+
});
298+
}
299+
300+
#[tokio::test]
301+
async fn test_launch_watchers_existing() {
302+
let clos = async |req, ctr| panic!("unexpected API interaction: {req:?}, counter {ctr}");
303+
count_check!(0, clos, |client| {
304+
let cluster = dummy_cluster();
305+
let mut ctx = dummy_cluster_ctx(client);
306+
ctx.uid = Mutex::new(cluster.metadata.uid.clone());
307+
let result = launch_rv_watchers(Arc::new(cluster), Arc::new(ctx), "test");
308+
assert!(!result.await.unwrap());
309+
});
310+
}
311+
232312
#[tokio::test]
233313
async fn test_reconcile_uninstalling() {
234314
let clos = async |req: Request<Body>, ctr| match req.method() {
@@ -241,7 +321,7 @@ mod tests {
241321
count_check!(1, clos, |client| {
242322
let mut cluster = dummy_cluster();
243323
cluster.metadata.deletion_timestamp = Some(Time(Utc::now()));
244-
let result = reconcile(Arc::new(cluster), Arc::new(client)).await;
324+
let result = reconcile(Arc::new(cluster), Arc::new(dummy_cluster_ctx(client))).await;
245325
assert_eq!(result.unwrap(), Action::await_change());
246326
});
247327
}
@@ -256,16 +336,19 @@ mod tests {
256336
metadata: Default::default(),
257337
};
258338
Ok(serde_json::to_string(&object_list).unwrap())
259-
} else if ctr == 1 && req.method() == Method::PATCH {
339+
} else if 1 < ctr && ctr < 4 {
340+
// Watchers
341+
Ok(serde_json::to_string(&dummy_cluster()).unwrap())
342+
} else if ctr == 4 && req.method() == Method::PATCH {
260343
assert_body_contains(req, NOT_INSTALLED_REASON_NON_UNIQUE).await;
261344
Ok(serde_json::to_string(&dummy_cluster()).unwrap())
262345
} else {
263346
panic!("unexpected API interaction: {req:?}, counter {ctr}");
264347
}
265348
};
266-
count_check!(2, clos, |client| {
349+
count_check!(4, clos, |client| {
267350
let cluster = Arc::new(dummy_cluster());
268-
let result = reconcile(cluster, Arc::new(client)).await;
351+
let result = reconcile(cluster, Arc::new(dummy_cluster_ctx(client))).await;
269352
assert_eq!(result.unwrap(), Action::requeue(Duration::from_secs(60)));
270353
});
271354
}
@@ -276,9 +359,9 @@ mod tests {
276359
r if r.method() == Method::GET => Err(StatusCode::INTERNAL_SERVER_ERROR),
277360
_ => panic!("unexpected API interaction: {req:?}"),
278361
};
279-
count_check!(1, clos, |client| {
362+
count_check!(3, clos, |client| {
280363
let cluster = Arc::new(dummy_cluster());
281-
let result = reconcile(cluster, Arc::new(client)).await;
364+
let result = reconcile(cluster, Arc::new(dummy_cluster_ctx(client))).await;
282365
assert!(result.is_err());
283366
});
284367
}

test_utils/src/mock_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ pub fn dummy_cluster() -> TrustedExecutionCluster {
178178
TrustedExecutionCluster {
179179
metadata: ObjectMeta {
180180
name: Some("test".to_string()),
181+
uid: Some("uid".to_string()),
181182
..Default::default()
182183
},
183184
status: None,

test_utils/src/virt.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -333,12 +333,32 @@ pub async fn wait_for_vm_ssh_ready(
333333
key_path: &Path,
334334
timeout_secs: u64,
335335
) -> anyhow::Result<()> {
336+
wait_for_vm_ssh(namespace, vm_name, key_path, timeout_secs, true).await
337+
}
338+
339+
pub async fn wait_for_vm_ssh_unavail(
340+
namespace: &str,
341+
vm_name: &str,
342+
key_path: &Path,
343+
timeout_secs: u64,
344+
) -> anyhow::Result<()> {
345+
wait_for_vm_ssh(namespace, vm_name, key_path, timeout_secs, false).await
346+
}
347+
348+
async fn wait_for_vm_ssh(
349+
namespace: &str,
350+
vm_name: &str,
351+
key_path: &Path,
352+
timeout_secs: u64,
353+
await_start: bool,
354+
) -> anyhow::Result<()> {
355+
let avail_prefix = if await_start { "" } else { "un" };
336356
let poller = Poller::new()
337357
.with_timeout(Duration::from_secs(timeout_secs))
338358
.with_interval(Duration::from_secs(10))
339359
.with_error_message(format!(
340-
"SSH access to VM {}/{} did not become available after {} seconds",
341-
namespace, vm_name, timeout_secs
360+
"SSH access to VM {}/{} did not become {}available after {} seconds",
361+
namespace, vm_name, avail_prefix, timeout_secs
342362
));
343363

344364
poller
@@ -348,10 +368,10 @@ pub async fn wait_for_vm_ssh_ready(
348368
let key = key_path.to_path_buf();
349369
async move {
350370
// Try a simple command to check if SSH is ready
351-
match virtctl_ssh_exec(&ns, &vm, &key, "echo ready").await {
352-
Ok(_) => Ok(()),
353-
Err(e) => Err(anyhow::anyhow!("SSH not ready yet: {}", e)),
354-
}
371+
let result = virtctl_ssh_exec(&ns, &vm, &key, "echo ready").await;
372+
(result.is_err() ^ await_start)
373+
.then_some(())
374+
.ok_or(anyhow::anyhow!("SSH not desired state yet: {result:?}"))
355375
}
356376
})
357377
.await

tests/attestation.rs

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// SPDX-License-Identifier: MIT
44

5-
use k8s_openapi::api::core::v1::Secret;
5+
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Secret};
66
use kube::Api;
77
use trusted_cluster_operator_lib::{Machine, virtualmachineinstances::VirtualMachineInstance};
88
use trusted_cluster_operator_test_utils::*;
@@ -244,7 +244,8 @@ async fn test_vm_reboot_attestation() -> anyhow::Result<()> {
244244
)
245245
.await;
246246

247-
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
247+
test_ctx.info(format!("Waiting for lack of SSH access after reboot {}", i));
248+
virt::wait_for_vm_ssh_unavail(namespace, vm_name, &att_ctx.key_path, 30).await?;
248249

249250
test_ctx.info(format!("Waiting for SSH access after reboot {}", i));
250251
virt::wait_for_vm_ssh_ready(namespace, vm_name, &att_ctx.key_path, 300).await?;
@@ -274,7 +275,6 @@ async fn test_vm_reboot_attestation() -> anyhow::Result<()> {
274275

275276
virt_test! {
276277
async fn test_vm_reboot_delete_machine() -> anyhow::Result<()> {
277-
use kube::Api;
278278
use trusted_cluster_operator_lib::Machine;
279279

280280
let test_ctx = setup!().await?;
@@ -297,7 +297,8 @@ async fn test_vm_reboot_delete_machine() -> anyhow::Result<()> {
297297
)
298298
.await;
299299

300-
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
300+
test_ctx.info("Waiting for lack of SSH access after reboot");
301+
virt::wait_for_vm_ssh_unavail(test_ctx.namespace(), vm_name, &att_ctx.key_path, 30).await?;
301302

302303
test_ctx.info("Waiting for SSH access after machine removal");
303304
let wait = virt::wait_for_vm_ssh_ready(
@@ -313,3 +314,52 @@ async fn test_vm_reboot_delete_machine() -> anyhow::Result<()> {
313314
Ok(())
314315
}
315316
}
317+
318+
virt_test! {
319+
async fn test_vm_restart_operator_existing() -> anyhow::Result<()> {
320+
let test_ctx = setup!().await?;
321+
test_ctx.info("Testing operator restart - existing VM should still boot");
322+
let vm_name = "test-coreos-operator-restart-existing";
323+
let att_ctx = SingleAttestationContext::new(vm_name, &test_ctx).await?;
324+
325+
let deployments: Api<Deployment> =
326+
Api::namespaced(test_ctx.client().clone(), test_ctx.namespace());
327+
deployments.restart("trusted-cluster-operator").await?;
328+
329+
let _reboot_result = virt::virtctl_ssh_exec(
330+
test_ctx.namespace(),
331+
vm_name,
332+
&att_ctx.key_path,
333+
"sudo systemctl reboot",
334+
)
335+
.await;
336+
337+
test_ctx.info("Waiting for lack of SSH access after reboot");
338+
virt::wait_for_vm_ssh_unavail(test_ctx.namespace(), vm_name, &att_ctx.key_path, 30).await?;
339+
340+
test_ctx.info("Waiting for SSH access after operator restart & reboot");
341+
let wait =
342+
virt::wait_for_vm_ssh_ready(test_ctx.namespace(), vm_name, &att_ctx.key_path, 300).await;
343+
assert!(wait.is_ok());
344+
345+
test_ctx.cleanup().await?;
346+
Ok(())
347+
}
348+
}
349+
350+
virt_test! {
351+
async fn test_vm_restart_operator_new() -> anyhow::Result<()> {
352+
let test_ctx = setup!().await?;
353+
test_ctx.info("Testing operator restart - new VM should boot");
354+
let vm_name = "test-coreos-operator-restart-new";
355+
356+
let deployments: Api<Deployment> =
357+
Api::namespaced(test_ctx.client().clone(), test_ctx.namespace());
358+
deployments.restart("trusted-cluster-operator").await?;
359+
test_ctx.info("Restarted operator deployment");
360+
361+
let _ = SingleAttestationContext::new(vm_name, &test_ctx).await?;
362+
test_ctx.cleanup().await?;
363+
Ok(())
364+
}
365+
}

0 commit comments

Comments
 (0)