Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 86305c2

Browse files
author
Matthew Fisher
committed
working
Signed-off-by: Matthew Fisher <matt.fisher@microsoft.com>
1 parent 7fb17a0 commit 86305c2

File tree

3 files changed

+90
-26
lines changed

3 files changed

+90
-26
lines changed

src/main.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,21 @@ use kubelet::module_store::FileModuleStore;
33
use kubelet::Kubelet;
44

55
mod provider;
6-
76
use provider::Provider;
87

98
#[tokio::main]
109
async fn main() -> anyhow::Result<()> {
11-
// The provider is responsible for all the "back end" logic. If you are creating
12-
// a new Kubelet, all you need to implement is a provider.
1310
let config = Config::new_from_flags(env!("CARGO_PKG_VERSION"));
14-
1511
let kubeconfig = kube::Config::infer().await?;
1612

17-
// Initialize the logger
1813
env_logger::init();
1914

2015
let client = oci_distribution::Client::default();
2116
let mut module_store_path = config.data_dir.join(".oci");
2217
module_store_path.push("modules");
2318
let store = FileModuleStore::new(client, &module_store_path);
2419

25-
let provider = Provider::new(store);
20+
let provider = Provider::new(store, kubeconfig.clone());
2621
let kubelet = Kubelet::new(provider, kubeconfig, config);
2722
kubelet.start().await
2823
}

src/provider/mod.rs

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,15 @@
2828
//! };
2929
//! ```
3030
31+
use k8s_openapi::api::core::v1::Pod as KubePod;
32+
use kube::api::DeleteParams;
33+
use kube::Api;
34+
use kube::Config as KubeConfig;
3135
use kubelet::handle::{key_from_pod, pod_key};
3236
use kubelet::module_store::ModuleStore;
3337
use kubelet::provider::ProviderError;
3438
use kubelet::Pod;
35-
use log::{debug, info};
39+
use log::{debug, error, info, trace};
3640
use std::collections::HashMap;
3741
use std::sync::Arc;
3842
use tokio::sync::RwLock;
@@ -47,14 +51,16 @@ const TARGET_WASM32_WASI: &str = "wasm32-wasi";
4751
pub struct Provider<S> {
4852
pods: Arc<RwLock<HashMap<String, HashMap<String, Runtime>>>>,
4953
store: S,
54+
kubeconfig: KubeConfig,
5055
}
5156

5257
impl<S: ModuleStore + Send + Sync> Provider<S> {
5358
/// Create a new wasi provider from a module store and a kubelet config
54-
pub fn new(store: S) -> Self {
59+
pub fn new(store: S, kubeconfig: KubeConfig) -> Self {
5560
Self {
5661
pods: Default::default(),
5762
store,
63+
kubeconfig,
5864
}
5965
}
6066
}
@@ -79,7 +85,8 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
7985
.remove(&container.name)
8086
.expect("FATAL ERROR: module map not properly populated");
8187

82-
let mut runtime = Runtime::new(module_data, 1 as u32);
88+
// TODO: expose this as a feature flag (--stack-size)
89+
let mut runtime = Runtime::new(module_data, (1024 * 60) as u32);
8390

8491
debug!("Starting container {} on thread", container.name);
8592
runtime.start()?;
@@ -100,12 +107,75 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
100107
Ok(())
101108
}
102109

103-
async fn modify(&self, _pod: Pod) -> anyhow::Result<()> {
104-
unimplemented!()
110+
async fn modify(&self, pod: Pod) -> anyhow::Result<()> {
111+
// The only things we care about are:
112+
// 1. metadata.deletionTimestamp => signal all containers to stop and then mark them
113+
// as terminated
114+
// 2. spec.containers[*].image, spec.initContainers[*].image => stop the currently
115+
// running containers and start new ones?
116+
// 3. spec.activeDeadlineSeconds => Leaving unimplemented for now
117+
// TODO: Determine what the proper behavior should be if labels change
118+
debug!(
119+
"Got pod modified event for {} in namespace {}",
120+
pod.name(),
121+
pod.namespace()
122+
);
123+
trace!("Modified pod spec: {:#?}", pod.as_kube_pod());
124+
if let Some(_timestamp) = pod.deletion_timestamp() {
125+
let mut pods = self.pods.write().await;
126+
match pods.get_mut(&key_from_pod(&pod)) {
127+
Some(h) => {
128+
for (_name, runtime) in h {
129+
runtime.stop()?;
130+
}
131+
// Follow up with a delete when everything is stopped
132+
let dp = DeleteParams {
133+
grace_period_seconds: Some(0),
134+
..Default::default()
135+
};
136+
let pod_client: Api<KubePod> = Api::namespaced(
137+
kube::client::Client::new(self.kubeconfig.clone()),
138+
pod.namespace(),
139+
);
140+
match pod_client.delete(pod.name(), &dp).await {
141+
Ok(_) => Ok(()),
142+
Err(e) => Err(e.into()),
143+
}
144+
}
145+
None => {
146+
// This isn't an error with the pod, so don't return an error (otherwise it will
147+
// get updated in its status). This is an unlikely case to get into and means
148+
// that something is likely out of sync, so just log the error
149+
error!(
150+
"Unable to find pod {} in namespace {} when trying to stop all containers",
151+
pod.name(),
152+
pod.namespace()
153+
);
154+
Ok(())
155+
}
156+
}
157+
} else {
158+
Ok(())
159+
}
160+
// TODO: Implement behavior for stopping old containers and restarting when the container
161+
// image changes
105162
}
106163

107-
async fn delete(&self, _pod: Pod) -> anyhow::Result<()> {
108-
unimplemented!()
164+
async fn delete(&self, pod: Pod) -> anyhow::Result<()> {
165+
let mut pods = self.pods.write().await;
166+
match pods.remove(&key_from_pod(&pod)) {
167+
Some(_) => debug!(
168+
"Pod {} in namespace {} removed",
169+
pod.name(),
170+
pod.namespace()
171+
),
172+
None => info!(
173+
"unable to find pod {} in namespace {}, it was likely already deleted",
174+
pod.name(),
175+
pod.namespace()
176+
),
177+
}
178+
Ok(())
109179
}
110180

111181
async fn logs(
@@ -116,7 +186,7 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
116186
_sender: kubelet::LogSender,
117187
) -> anyhow::Result<()> {
118188
let mut pods = self.pods.write().await;
119-
let _pod = pods
189+
let _containers = pods
120190
.get_mut(&pod_key(&namespace, &pod_name))
121191
.ok_or_else(|| ProviderError::PodNotFound {
122192
pod_name: pod_name.clone(),

src/provider/runtime.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::error;
22
use std::fmt;
33

4-
use wasm3::{Environment, Module};
4+
use wasm3::Environment;
55

66
type Result<T> = std::result::Result<T, RuntimeError>;
77

@@ -14,9 +14,9 @@ pub struct RuntimeError {
1414
enum RuntimeErrorKind {
1515
AlreadyStarted,
1616
CannotCreateRuntime,
17-
CannotParseModule,
17+
CannotLinkWASI,
1818
CannotLoadModule,
19-
NoMainFunction,
19+
NoEntrypoint,
2020
RunFailure,
2121
}
2222

@@ -35,9 +35,9 @@ impl RuntimeError {
3535
match self.kind {
3636
RuntimeErrorKind::AlreadyStarted => "runtime already started",
3737
RuntimeErrorKind::CannotCreateRuntime => "cannot create runtime",
38-
RuntimeErrorKind::CannotParseModule => "cannot parse module",
38+
RuntimeErrorKind::CannotLinkWASI => "cannot link module to the WASI runtime",
3939
RuntimeErrorKind::CannotLoadModule => "cannot load module",
40-
RuntimeErrorKind::NoMainFunction => "no function called 'main' found",
40+
RuntimeErrorKind::NoEntrypoint => "no entrypoint function called '_start' found",
4141
RuntimeErrorKind::RunFailure => "failure during function call",
4242
}
4343
}
@@ -81,14 +81,13 @@ impl Runtime {
8181
let rt = env
8282
.create_runtime(self.stack_size)
8383
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotCreateRuntime))?;
84-
let module = Module::parse(&env, &self.module_bytes)
85-
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotParseModule))?;
86-
let module = rt
87-
.load_module(module)
84+
let mut module = rt
85+
.parse_and_load_module(&self.module_bytes)
8886
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotLoadModule))?;
87+
module.link_wasi().map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotLinkWASI))?;
8988
let func = module
90-
.find_function::<(), ()>("main")
91-
.map_err(|_| RuntimeError::new(RuntimeErrorKind::NoMainFunction))?;
89+
.find_function::<(), ()>("_start")
90+
.map_err(|_| RuntimeError::new(RuntimeErrorKind::NoEntrypoint))?;
9291
self.current_status = RuntimeStatus::Running;
9392
// FIXME: run this in the background
9493
// for now, we block until the function is complete, then call .stop()
@@ -97,7 +96,7 @@ impl Runtime {
9796
self.stop()
9897
}
9998

100-
fn stop(&mut self) -> Result<()> {
99+
pub fn stop(&mut self) -> Result<()> {
101100
// it is OK for the runtime to stop an already stopped module. Effectively a no-op
102101
self.current_status = RuntimeStatus::Stopped;
103102
Ok(())

0 commit comments

Comments
 (0)