Skip to content

Commit 9cf35b3

Browse files
committed
Implement Container Task API
Signed-off-by: Guvenc Gulce <[email protected]>
1 parent 81f4434 commit 9cf35b3

File tree

18 files changed

+814
-24
lines changed

18 files changed

+814
-24
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ members = [
44
"feos/services/vm-service",
55
"feos/services/host-service",
66
"feos/services/image-service",
7+
"feos/services/task-service",
78
"cli",
89
"feos/proto",
910
"feos/utils",

feos/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ feos-utils = { path = "utils" }
1616
vm-service = { path = "services/vm-service" }
1717
host-service = { path = "services/host-service" }
1818
image-service = { path = "services/image-service" }
19+
task-service = { path = "services/task-service" }
1920
feos-proto = { workspace = true }
2021

2122
# Workspace dependencies

feos/proto/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
1717
&[proto_dir],
1818
)?;
1919
Ok(())
20-
}
20+
}

feos/proto/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ pub mod host_service {
1010
pub mod image_service {
1111
tonic::include_proto!("feos.image.vmm.api.v1");
1212
}
13+
pub mod task_service {
14+
tonic::include_proto!("feos.task.v1");
15+
}

feos/services/image-service/src/filestore.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,7 @@ impl FileStore {
8787
fs::create_dir_all(final_dir).await?;
8888

8989
// Handle both single-blob rootfs (for VMs) and layered rootfs (for containers)
90-
if image_data.layers.len() == 1
91-
&& image_ref.contains("cloud-hypervisor")
92-
{
90+
if image_data.layers.len() == 1 && image_ref.contains("cloud-hypervisor") {
9391
// Assuming this is a single rootfs blob for a VM
9492
let final_disk_path = final_dir.join("disk.image");
9593
fs::write(final_disk_path, &image_data.layers[0]).await?;
@@ -163,4 +161,4 @@ impl FileStore {
163161
);
164162
store
165163
}
166-
}
164+
}

feos/services/image-service/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,4 @@ pub enum FileCommand {
9595
ScanExistingImages {
9696
responder: oneshot::Sender<HashMap<String, ImageInfo>>,
9797
},
98-
}
98+
}

feos/services/image-service/src/worker.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ use feos_proto::image_service::{
99
PullImageResponse,
1010
};
1111
use log::{error, info, warn};
12-
use oci_distribution::{
13-
client::ClientConfig, manifest, secrets::RegistryAuth, Client, Reference,
14-
};
12+
use oci_distribution::{client::ClientConfig, manifest, secrets::RegistryAuth, Client, Reference};
1513
use std::collections::HashMap;
1614
use tokio::sync::{broadcast, mpsc, oneshot};
1715
use tonic::Status;
@@ -257,9 +255,7 @@ async fn pull_oci_data(image_ref: &str) -> Result<PulledImageData, ImageServiceE
257255
let auth = &RegistryAuth::Anonymous;
258256

259257
info!("ImagePuller: pulling manifest and config for {image_ref}");
260-
let (manifest, _, _) = client
261-
.pull_manifest_and_config(&reference, auth)
262-
.await?;
258+
let (manifest, _, _) = client.pull_manifest_and_config(&reference, auth).await?;
263259

264260
let mut config_data = Vec::new();
265261
client
@@ -289,10 +285,7 @@ async fn pull_oci_data(image_ref: &str) -> Result<PulledImageData, ImageServiceE
289285
client
290286
.pull_blob(&reference, &layer, &mut layer_data)
291287
.await?;
292-
info!(
293-
"ImagePuller: pulled layer blob {} bytes",
294-
layer_data.len()
295-
);
288+
info!("ImagePuller: pulled layer blob {} bytes", layer_data.len());
296289
layers.push(layer_data);
297290
}
298291

@@ -411,4 +404,4 @@ pub async fn watch_image_status_stream(
411404
}
412405
}
413406
}
414-
}
407+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "task-service"
3+
version.workspace = true
4+
edition.workspace = true
5+
6+
[dependencies]
7+
feos-proto = { workspace = true }
8+
tokio = { workspace = true }
9+
tokio-stream = { workspace = true }
10+
tonic = { workspace = true }
11+
anyhow = { workspace = true }
12+
log = { workspace = true }
13+
prost = { workspace = true }
14+
thiserror = { workspace = true }
15+
nix = { workspace = true, features = ["signal", "process"] }
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use crate::error::TaskError;
2+
use crate::Command;
3+
use feos_proto::task_service::{
4+
task_service_server::TaskService, CreateRequest, CreateResponse, DeleteRequest, DeleteResponse,
5+
KillRequest, KillResponse, StartRequest, StartResponse, WaitRequest, WaitResponse,
6+
};
7+
use log::info;
8+
use tokio::sync::{mpsc, oneshot};
9+
use tonic::{Request, Response, Status};
10+
11+
pub struct TaskApiHandler {
12+
dispatcher_tx: mpsc::Sender<Command>,
13+
}
14+
15+
impl TaskApiHandler {
16+
pub fn new(dispatcher_tx: mpsc::Sender<Command>) -> Self {
17+
Self { dispatcher_tx }
18+
}
19+
}
20+
21+
/// Helper function to create a command, send it to the dispatcher, and await the response.
22+
async fn dispatch_and_wait<T, F>(
23+
dispatcher: &mpsc::Sender<Command>,
24+
command_constructor: F,
25+
) -> Result<Response<T>, Status>
26+
where
27+
F: FnOnce(oneshot::Sender<Result<T, TaskError>>) -> Command,
28+
{
29+
let (resp_tx, resp_rx) = oneshot::channel();
30+
let cmd = command_constructor(resp_tx);
31+
32+
dispatcher
33+
.send(cmd)
34+
.await
35+
.map_err(|e| Status::internal(format!("Failed to send command to dispatcher: {}", e)))?;
36+
37+
match resp_rx.await {
38+
Ok(Ok(result)) => Ok(Response::new(result)),
39+
Ok(Err(e)) => Err(e.into()),
40+
Err(_) => Err(Status::internal(
41+
"Dispatcher task dropped response channel.",
42+
)),
43+
}
44+
}
45+
46+
#[tonic::async_trait]
47+
impl TaskService for TaskApiHandler {
48+
async fn create(
49+
&self,
50+
request: Request<CreateRequest>,
51+
) -> Result<Response<CreateResponse>, Status> {
52+
info!(
53+
"API: Received Create request for {}",
54+
request.get_ref().container_id
55+
);
56+
dispatch_and_wait(&self.dispatcher_tx, |responder| Command::Create {
57+
req: request.into_inner(),
58+
responder,
59+
})
60+
.await
61+
}
62+
63+
async fn start(
64+
&self,
65+
request: Request<StartRequest>,
66+
) -> Result<Response<StartResponse>, Status> {
67+
info!(
68+
"API: Received Start request for {}",
69+
request.get_ref().container_id
70+
);
71+
dispatch_and_wait(&self.dispatcher_tx, |responder| Command::Start {
72+
req: request.into_inner(),
73+
responder,
74+
})
75+
.await
76+
}
77+
78+
async fn kill(&self, request: Request<KillRequest>) -> Result<Response<KillResponse>, Status> {
79+
info!(
80+
"API: Received Kill request for {}",
81+
request.get_ref().container_id
82+
);
83+
dispatch_and_wait(&self.dispatcher_tx, |responder| Command::Kill {
84+
req: request.into_inner(),
85+
responder,
86+
})
87+
.await
88+
}
89+
90+
async fn delete(
91+
&self,
92+
request: Request<DeleteRequest>,
93+
) -> Result<Response<DeleteResponse>, Status> {
94+
info!(
95+
"API: Received Delete request for {}",
96+
request.get_ref().container_id
97+
);
98+
dispatch_and_wait(&self.dispatcher_tx, |responder| Command::Delete {
99+
req: request.into_inner(),
100+
responder,
101+
})
102+
.await
103+
}
104+
105+
async fn wait(&self, request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
106+
info!(
107+
"API: Received Wait request for {}",
108+
request.get_ref().container_id
109+
);
110+
dispatch_and_wait(&self.dispatcher_tx, |responder| Command::Wait {
111+
req: request.into_inner(),
112+
responder,
113+
})
114+
.await
115+
}
116+
}

0 commit comments

Comments
 (0)