Skip to content

Commit 277a1a6

Browse files
jprendesmxpv
authored andcommitted
use windows activation strategy on async code
Signed-off-by: Jorge Prendes <[email protected]>
1 parent df1b8f0 commit 277a1a6

File tree

1 file changed

+79
-42
lines changed
  • crates/shim/src/asynchronous

1 file changed

+79
-42
lines changed

crates/shim/src/asynchronous/mod.rs

Lines changed: 79 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ use std::{
2020
io::Read,
2121
os::unix::{fs::FileTypeExt, net::UnixListener},
2222
path::Path,
23-
process::{self, Command, Stdio},
23+
process::{self, Command as StdCommand, Stdio},
2424
sync::{
2525
atomic::{AtomicBool, Ordering},
2626
Arc,
2727
},
2828
};
2929

3030
use async_trait::async_trait;
31-
use command_fds::{CommandFdExt, FdMapping};
3231
use containerd_shim_protos::{
3332
api::DeleteResponse,
3433
protobuf::{well_known_types::any::Any, Message, MessageField},
@@ -50,7 +49,7 @@ use nix::{
5049
};
5150
use oci_spec::runtime::Features;
5251
use signal_hook_tokio::Signals;
53-
use tokio::{io::AsyncWriteExt, sync::Notify};
52+
use tokio::{io::AsyncWriteExt, process::Command, sync::Notify};
5453
use which::which;
5554

5655
const DEFAULT_BINARY_NAME: &str = "runc";
@@ -61,7 +60,7 @@ use crate::{
6160
error::{Error, Result},
6261
logger, parse_sockaddr, reap, socket_address,
6362
util::{asyncify, read_file_to_str, write_str_to_file},
64-
Config, Flags, StartOpts, SOCKET_FD, TTRPC_ADDRESS,
63+
Config, Flags, StartOpts, TTRPC_ADDRESS,
6564
};
6665

6766
pub mod monitor;
@@ -142,7 +141,10 @@ pub fn run_info() -> Result<RuntimeInfo> {
142141
let binary_path = which(binary_name).unwrap();
143142

144143
// get features
145-
let output = Command::new(binary_path).arg("features").output().unwrap();
144+
let output = StdCommand::new(binary_path)
145+
.arg("features")
146+
.output()
147+
.unwrap();
146148

147149
let features: Features = serde_json::from_str(&String::from_utf8_lossy(&output.stdout))?;
148150

@@ -215,6 +217,12 @@ where
215217
Ok(())
216218
}
217219
_ => {
220+
if flags.socket.is_empty() {
221+
return Err(Error::InvalidArgument(String::from(
222+
"Shim socket cannot be empty",
223+
)));
224+
}
225+
218226
if !config.no_setup_logger {
219227
logger::init(
220228
flags.debug,
@@ -228,11 +236,15 @@ where
228236
let task = Box::new(shim.create_task_service(publisher).await)
229237
as Box<dyn containerd_shim_protos::shim_async::Task + Send + Sync>;
230238
let task_service = create_task(Arc::from(task));
231-
let mut server = Server::new().register_service(task_service);
232-
server = server.add_listener(SOCKET_FD)?;
233-
server = server.set_domain_unix();
239+
let Some(mut server) = create_server_with_retry(&flags).await? else {
240+
signal_server_started();
241+
return Ok(());
242+
};
243+
server = server.register_service(task_service);
234244
server.start().await?;
235245

246+
signal_server_started();
247+
236248
info!("Shim successfully started, waiting for exit signal...");
237249
tokio::spawn(async move {
238250
handle_signals(signals).await;
@@ -296,61 +308,86 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) ->
296308
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
297309
let address = socket_address(&opts.address, &opts.namespace, grouping);
298310

299-
// Create socket and prepare listener.
300-
// We'll use `add_listener` when creating TTRPC server.
301-
let listener = match start_listener(&address).await {
302-
Ok(l) => l,
303-
Err(e) => {
304-
if let Error::IoError {
305-
err: ref io_err, ..
306-
} = e
307-
{
308-
if io_err.kind() != std::io::ErrorKind::AddrInUse {
309-
return Err(e);
310-
};
311-
}
312-
if let Ok(()) = wait_socket_working(&address, 5, 200).await {
313-
write_str_to_file("address", &address).await?;
314-
return Ok(address);
315-
}
316-
remove_socket(&address).await?;
317-
start_listener(&address).await?
318-
}
319-
};
311+
// Activation pattern comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70
312+
// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating
313+
// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could
314+
// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented.
320315

321-
// tokio::process::Command do not have method `fd_mappings`,
322-
// and the `spawn()` is also not an async method,
323-
// so we use the std::process::Command here
324316
let mut command = Command::new(cmd);
325-
326317
command
327318
.current_dir(cwd)
328-
.stdout(Stdio::null())
319+
.stdout(Stdio::piped())
329320
.stdin(Stdio::null())
330321
.stderr(Stdio::null())
322+
.envs(vars)
331323
.args([
332324
"-namespace",
333325
&opts.namespace,
334326
"-id",
335327
&opts.id,
336328
"-address",
337329
&opts.address,
338-
])
339-
.fd_mappings(vec![FdMapping {
340-
parent_fd: listener.into(),
341-
child_fd: SOCKET_FD,
342-
}])?;
330+
"-socket",
331+
&address,
332+
]);
333+
343334
if opts.debug {
344335
command.arg("-debug");
345336
}
346-
command.envs(vars);
347337

348-
let _child = command.spawn().map_err(io_error!(e, "spawn shim"))?;
338+
let mut child = command.spawn().map_err(io_error!(e, "spawn shim"))?;
339+
349340
#[cfg(target_os = "linux")]
350-
crate::cgroup::set_cgroup_and_oom_score(_child.id())?;
341+
crate::cgroup::set_cgroup_and_oom_score(child.id().unwrap())?;
342+
343+
let mut reader = child.stdout.take().unwrap();
344+
tokio::io::copy(&mut reader, &mut tokio::io::stderr())
345+
.await
346+
.unwrap();
347+
351348
Ok(address)
352349
}
353350

351+
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
352+
async fn create_server(flags: &args::Flags) -> Result<Server> {
353+
use std::os::fd::IntoRawFd;
354+
let listener = start_listener(&flags.socket).await?;
355+
let mut server = Server::new();
356+
server = server.add_listener(listener.into_raw_fd())?;
357+
server = server.set_domain_unix();
358+
Ok(server)
359+
}
360+
361+
async fn create_server_with_retry(flags: &args::Flags) -> Result<Option<Server>> {
362+
// Really try to create a server.
363+
let server = match create_server(flags).await {
364+
Ok(server) => server,
365+
Err(Error::IoError { err, .. }) if err.kind() == std::io::ErrorKind::AddrInUse => {
366+
// If the address is already in use then make sure it is up and running and return the address
367+
// This allows for running a single shim per container scenarios
368+
if let Ok(()) = wait_socket_working(&flags.socket, 5, 200).await {
369+
write_str_to_file("address", &flags.socket).await?;
370+
return Ok(None);
371+
}
372+
remove_socket(&flags.socket).await?;
373+
create_server(flags).await?
374+
}
375+
Err(e) => return Err(e),
376+
};
377+
378+
Ok(Some(server))
379+
}
380+
381+
fn signal_server_started() {
382+
use libc::{dup2, STDERR_FILENO, STDOUT_FILENO};
383+
384+
unsafe {
385+
if dup2(STDERR_FILENO, STDOUT_FILENO) < 0 {
386+
panic!("Error closing pipe: {}", std::io::Error::last_os_error())
387+
}
388+
}
389+
}
390+
354391
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
355392
fn setup_signals_tokio(config: &Config) -> Signals {
356393
if config.no_reaper {

0 commit comments

Comments
 (0)