Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 48f3cc1

Browse files
author
Matthew Fisher
committed
working status codes
Signed-off-by: Matthew Fisher <[email protected]>
1 parent 86305c2 commit 48f3cc1

File tree

3 files changed

+122
-102
lines changed

3 files changed

+122
-102
lines changed

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
1717
module_store_path.push("modules");
1818
let store = FileModuleStore::new(client, &module_store_path);
1919

20-
let provider = Provider::new(store, kubeconfig.clone());
20+
let provider = Provider::new(store, &config, kubeconfig.clone()).await?;
2121
let kubelet = Kubelet::new(provider, kubeconfig, config);
2222
kubelet.start().await
2323
}

src/provider/mod.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
//! kubelet.start().await.unwrap();
2828
//! };
2929
//! ```
30+
use std::path::PathBuf;
3031

3132
use k8s_openapi::api::core::v1::Pod as KubePod;
3233
use kube::api::DeleteParams;
3334
use kube::Api;
3435
use kube::Config as KubeConfig;
35-
use kubelet::handle::{key_from_pod, pod_key};
36+
use kubelet::config::Config as KubeletConfig;
37+
use kubelet::handle::{key_from_pod, pod_key, PodHandle};
3638
use kubelet::module_store::ModuleStore;
3739
use kubelet::provider::ProviderError;
3840
use kubelet::Pod;
@@ -42,26 +44,31 @@ use std::sync::Arc;
4244
use tokio::sync::RwLock;
4345

4446
mod runtime;
45-
use runtime::Runtime;
47+
use runtime::{HandleStopper, LogHandleFactory, Runtime};
4648

4749
const TARGET_WASM32_WASI: &str = "wasm32-wasi";
50+
const LOG_DIR_NAME: &str = "wasm3-logs";
4851

4952
/// Provider provides a Kubelet runtime implementation that executes WASM
5053
/// binaries conforming to the WASI spec
5154
pub struct Provider<S> {
52-
pods: Arc<RwLock<HashMap<String, HashMap<String, Runtime>>>>,
55+
handles: Arc<RwLock<HashMap<String, PodHandle<HandleStopper, LogHandleFactory>>>>,
5356
store: S,
57+
log_path: PathBuf,
5458
kubeconfig: KubeConfig,
5559
}
5660

5761
impl<S: ModuleStore + Send + Sync> Provider<S> {
5862
/// Create a new wasi provider from a module store and a kubelet config
59-
pub fn new(store: S, kubeconfig: KubeConfig) -> Self {
60-
Self {
61-
pods: Default::default(),
63+
pub async fn new(store: S, config: &KubeletConfig, kubeconfig: KubeConfig) -> anyhow::Result<Self> {
64+
let log_path = config.data_dir.join(LOG_DIR_NAME);
65+
tokio::fs::create_dir_all(&log_path).await?;
66+
Ok(Self {
67+
handles: Default::default(),
6268
store,
69+
log_path,
6370
kubeconfig,
64-
}
71+
})
6572
}
6673
}
6774

@@ -77,6 +84,7 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
7784

7885
let pod_name = pod.name();
7986
let mut containers = HashMap::new();
87+
let client = kube::Client::new(self.kubeconfig.clone());
8088

8189
let mut modules = self.store.fetch_pod_modules(&pod).await?;
8290
info!("Starting containers for pod {:?}", pod_name);
@@ -86,11 +94,11 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
8694
.expect("FATAL ERROR: module map not properly populated");
8795

8896
// TODO: expose this as a feature flag (--stack-size)
89-
let mut runtime = Runtime::new(module_data, (1024 * 60) as u32);
97+
let mut runtime = Runtime::new(module_data, (1024 * 60) as u32, self.log_path.clone()).await?;
9098

9199
debug!("Starting container {} on thread", container.name);
92-
runtime.start()?;
93-
containers.insert(container.name.clone(), runtime);
100+
let handle = runtime.start().await?;
101+
containers.insert(container.name.clone(), handle);
94102
}
95103
info!(
96104
"All containers started for pod {:?}. Updating status",
@@ -100,8 +108,11 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
100108
// Wrap this in a block so the write lock goes out of scope when we are done
101109
{
102110
// Grab the entry while we are creating things
103-
let mut pods = self.pods.write().await;
104-
pods.insert(key_from_pod(&pod), containers);
111+
let mut handles = self.handles.write().await;
112+
handles.insert(
113+
key_from_pod(&pod),
114+
PodHandle::new(containers, pod, client, None)?,
115+
);
105116
}
106117

107118
Ok(())
@@ -122,12 +133,10 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
122133
);
123134
trace!("Modified pod spec: {:#?}", pod.as_kube_pod());
124135
if let Some(_timestamp) = pod.deletion_timestamp() {
125-
let mut pods = self.pods.write().await;
126-
match pods.get_mut(&key_from_pod(&pod)) {
136+
let mut handles = self.handles.write().await;
137+
match handles.get_mut(&key_from_pod(&pod)) {
127138
Some(h) => {
128-
for (_name, runtime) in h {
129-
runtime.stop()?;
130-
}
139+
h.stop().await?;
131140
// Follow up with a delete when everything is stopped
132141
let dp = DeleteParams {
133142
grace_period_seconds: Some(0),
@@ -162,8 +171,8 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
162171
}
163172

164173
async fn delete(&self, pod: Pod) -> anyhow::Result<()> {
165-
let mut pods = self.pods.write().await;
166-
match pods.remove(&key_from_pod(&pod)) {
174+
let mut handles = self.handles.write().await;
175+
match handles.remove(&key_from_pod(&pod)) {
167176
Some(_) => debug!(
168177
"Pod {} in namespace {} removed",
169178
pod.name(),
@@ -185,8 +194,8 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
185194
_container_name: String,
186195
_sender: kubelet::LogSender,
187196
) -> anyhow::Result<()> {
188-
let mut pods = self.pods.write().await;
189-
let _containers = pods
197+
let mut handles = self.handles.write().await;
198+
let _containers = handles
190199
.get_mut(&pod_key(&namespace, &pod_name))
191200
.ok_or_else(|| ProviderError::PodNotFound {
192201
pod_name: pod_name.clone(),

src/provider/runtime.rs

Lines changed: 91 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,115 @@
1-
use std::error;
2-
use std::fmt;
1+
use std::path::Path;
2+
use std::sync::Arc;
33

4-
use wasm3::Environment;
4+
use tempfile::NamedTempFile;
5+
use tokio::sync::watch::{self, Sender};
6+
use tokio::task::JoinHandle;
7+
use wasm3::{Environment, Module};
8+
use kubelet::handle::{RuntimeHandle, Stop};
9+
use kubelet::status::ContainerStatus;
510

6-
type Result<T> = std::result::Result<T, RuntimeError>;
7-
8-
#[derive(Debug, Clone, PartialEq, Eq)]
9-
pub struct RuntimeError {
10-
kind: RuntimeErrorKind,
11-
}
12-
13-
#[derive(Debug, Clone, PartialEq, Eq)]
14-
enum RuntimeErrorKind {
15-
AlreadyStarted,
16-
CannotCreateRuntime,
17-
CannotLinkWASI,
18-
CannotLoadModule,
19-
NoEntrypoint,
20-
RunFailure,
21-
}
22-
23-
#[derive(Debug, Clone, PartialEq, Eq)]
24-
enum RuntimeStatus {
25-
Running,
26-
Stopped,
27-
}
28-
29-
impl RuntimeError {
30-
fn new(kind: RuntimeErrorKind) -> Self {
31-
Self { kind: kind }
32-
}
33-
34-
fn __description(&self) -> &str {
35-
match self.kind {
36-
RuntimeErrorKind::AlreadyStarted => "runtime already started",
37-
RuntimeErrorKind::CannotCreateRuntime => "cannot create runtime",
38-
RuntimeErrorKind::CannotLinkWASI => "cannot link module to the WASI runtime",
39-
RuntimeErrorKind::CannotLoadModule => "cannot load module",
40-
RuntimeErrorKind::NoEntrypoint => "no entrypoint function called '_start' found",
41-
RuntimeErrorKind::RunFailure => "failure during function call",
42-
}
43-
}
11+
pub struct HandleStopper {
12+
handle: JoinHandle<anyhow::Result<()>>,
4413
}
4514

46-
impl fmt::Display for RuntimeError {
47-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48-
write!(f, "{}", self.__description())
15+
#[async_trait::async_trait]
16+
impl Stop for HandleStopper {
17+
async fn stop(&mut self) -> anyhow::Result<()> {
18+
// no nothing
19+
Ok(())
4920
}
50-
}
5121

52-
impl error::Error for RuntimeError {
53-
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
54-
// source is not tracked
55-
None
22+
async fn wait(&mut self) -> anyhow::Result<()> {
23+
(&mut self.handle).await??;
24+
Ok(())
5625
}
5726
}
5827

5928
/// A runtime context for running a wasm module with wasm3
6029
pub struct Runtime {
6130
module_bytes: Vec<u8>,
6231
stack_size: u32,
63-
current_status: RuntimeStatus,
32+
output: Arc<NamedTempFile>,
6433
}
6534

6635
impl Runtime {
67-
pub fn new(module_bytes: Vec<u8>, stack_size: u32) -> Self {
68-
Self {
36+
pub async fn new<L: AsRef<Path> + Send + Sync + 'static>(module_bytes: Vec<u8>, stack_size: u32, log_dir: L) -> anyhow::Result<Self> {
37+
let temp = tokio::task::spawn_blocking(move || -> anyhow::Result<NamedTempFile> {
38+
Ok(NamedTempFile::new_in(log_dir)?)
39+
})
40+
.await??;
41+
42+
Ok(Self {
6943
module_bytes: module_bytes,
7044
stack_size: stack_size,
71-
current_status: RuntimeStatus::Stopped,
72-
}
45+
output: Arc::new(temp),
46+
})
7347
}
7448

75-
pub fn start(&mut self) -> Result<()> {
76-
if self.current_status == RuntimeStatus::Running {
77-
return Err(RuntimeError::new(RuntimeErrorKind::AlreadyStarted));
78-
}
79-
let env = Environment::new()
80-
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotCreateRuntime))?;
81-
let rt = env
82-
.create_runtime(self.stack_size)
83-
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotCreateRuntime))?;
84-
let mut module = rt
85-
.parse_and_load_module(&self.module_bytes)
86-
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotLoadModule))?;
87-
module.link_wasi().map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotLinkWASI))?;
88-
let func = module
89-
.find_function::<(), ()>("_start")
90-
.map_err(|_| RuntimeError::new(RuntimeErrorKind::NoEntrypoint))?;
91-
self.current_status = RuntimeStatus::Running;
92-
// FIXME: run this in the background
93-
// for now, we block until the function is complete, then call .stop()
94-
func.call()
95-
.map_err(|_| RuntimeError::new(RuntimeErrorKind::RunFailure))?;
96-
self.stop()
49+
pub async fn start(&mut self) -> anyhow::Result<RuntimeHandle<HandleStopper, LogHandleFactory>> {
50+
let temp = self.output.clone();
51+
let output_write = tokio::task::spawn_blocking(move || -> anyhow::Result<std::fs::File> {
52+
Ok(temp.reopen()?)
53+
})
54+
.await??;
55+
56+
let (status_sender, status_recv) = watch::channel(ContainerStatus::Waiting {
57+
timestamp: chrono::Utc::now(),
58+
message: "No status has been received from the process".into(),
59+
});
60+
let handle = spawn_wasm3(self.module_bytes.clone(), self.stack_size, status_sender, output_write).await?;
61+
62+
63+
let log_handle_factory = LogHandleFactory {
64+
temp: self.output.clone(),
65+
};
66+
67+
Ok(RuntimeHandle::new(
68+
HandleStopper{handle},
69+
log_handle_factory,
70+
status_recv,
71+
))
9772
}
73+
}
9874

99-
pub fn stop(&mut self) -> Result<()> {
100-
// it is OK for the runtime to stop an already stopped module. Effectively a no-op
101-
self.current_status = RuntimeStatus::Stopped;
102-
Ok(())
75+
/// Holds our tempfile handle.
76+
pub struct LogHandleFactory {
77+
temp: Arc<NamedTempFile>,
78+
}
79+
80+
impl kubelet::handle::LogHandleFactory<tokio::fs::File> for LogHandleFactory {
81+
/// Creates `tokio::fs::File` on demand for log reading.
82+
fn new_handle(&self) -> tokio::fs::File {
83+
tokio::fs::File::from_std(self.temp.reopen().unwrap())
10384
}
10485
}
86+
87+
// Spawns a running wasmtime instance with the given context and status
88+
// channel. Due to the Instance type not being Send safe, all of the logic
89+
// needs to be done within the spawned task
90+
async fn spawn_wasm3(
91+
module_bytes: Vec<u8>,
92+
stack_size: u32,
93+
status_sender: Sender<ContainerStatus>,
94+
_output_write: std::fs::File, //TODO: hook this up such that log output will be written to the file
95+
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
96+
let handle = tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
97+
let env = Environment::new().expect("cannot create environment");
98+
let rt = env.create_runtime(stack_size).expect("cannot create runtime");
99+
let module = Module::parse(&env, &module_bytes).expect("cannot parse module");
100+
let mut module = rt.load_module(module).expect("cannot load module");
101+
module.link_wasi().expect("cannot link WASI");
102+
let func = module.find_function::<(), ()>("_start").expect("cannot find function '_start' in module");
103+
func.call().expect("cannot call '_start' in module");
104+
status_sender
105+
.broadcast(ContainerStatus::Terminated {
106+
failed: false,
107+
message: "Module run completed".into(),
108+
timestamp: chrono::Utc::now(),
109+
})
110+
.expect("status should be able to send");
111+
Ok(())
112+
});
113+
114+
Ok(handle)
115+
}

0 commit comments

Comments
 (0)