Skip to content

Commit df1b8f0

Browse files
jprendesmxpv
authored andcommitted
use windows activation strategy on all platforms in sync code
Signed-off-by: Jorge Prendes <[email protected]>
1 parent 3598068 commit df1b8f0

File tree

3 files changed

+91
-108
lines changed

3 files changed

+91
-108
lines changed

crates/shim/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ command-fds = "0.3.0"
7777

7878
[target.'cfg(windows)'.dependencies]
7979
mio = { version = "1.0", features = ["os-ext", "os-poll"] }
80-
os_pipe.workspace = true
8180
windows-sys = { version = "0.52.0", features = [
8281
"Win32_Foundation",
8382
"Win32_System_WindowsProgramming",

crates/shim/src/lib.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818

1919
use std::{fs::File, path::PathBuf};
2020
#[cfg(unix)]
21-
use std::{
22-
os::unix::{io::RawFd, net::UnixListener},
23-
path::Path,
24-
};
21+
use std::{os::unix::net::UnixListener, path::Path};
2522

2623
pub use containerd_shim_protos as protos;
2724
#[cfg(unix)]
@@ -150,12 +147,6 @@ pub struct StartOpts {
150147
pub debug: bool,
151148
}
152149

153-
/// The shim process communicates with the containerd server through a communication channel
154-
/// created by containerd. One endpoint of the communication channel is passed to shim process
155-
/// through a file descriptor during forking, which is the fourth(3) file descriptor.
156-
#[cfg(unix)]
157-
const SOCKET_FD: RawFd = 3;
158-
159150
#[cfg(target_os = "linux")]
160151
pub const SOCKET_ROOT: &str = "/run/containerd";
161152

crates/shim/src/synchronous/mod.rs

Lines changed: 90 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ use crate::{
7575
};
7676

7777
cfg_unix! {
78-
use crate::{SOCKET_FD, parse_sockaddr};
79-
use command_fds::{CommandFdExt, FdMapping};
78+
use crate::parse_sockaddr;
8079
use libc::{SIGCHLD, SIGINT, SIGPIPE, SIGTERM};
8180
use nix::{
8281
errno::Errno,
@@ -252,6 +251,12 @@ where
252251
Ok(())
253252
}
254253
_ => {
254+
if flags.socket.is_empty() {
255+
return Err(Error::InvalidArgument(String::from(
256+
"Shim socket cannot be empty",
257+
)));
258+
}
259+
255260
#[cfg(windows)]
256261
util::setup_debugger_event();
257262

@@ -268,11 +273,13 @@ where
268273
let task = Box::new(shim.create_task_service(publisher))
269274
as Box<dyn containerd_shim_protos::Task + Send + Sync + 'static>;
270275
let task_service = create_task(Arc::from(task));
271-
let mut server = create_server(flags)?;
276+
let Some(mut server) = create_server_with_retry(&flags)? else {
277+
signal_server_started();
278+
return Ok(());
279+
};
272280
server = server.register_service(task_service);
273281
server.start()?;
274282

275-
#[cfg(windows)]
276283
signal_server_started();
277284

278285
info!("Shim successfully started, waiting for exit signal...");
@@ -292,22 +299,44 @@ where
292299
}
293300
}
294301

302+
#[cfg(windows)]
295303
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
296-
fn create_server(_flags: args::Flags) -> Result<Server> {
304+
fn create_server(flags: &args::Flags) -> Result<Server> {
305+
start_listener(&flags.socket).map_err(io_error!(e, "starting listener"))?;
297306
let mut server = Server::new();
307+
server = server.bind(&flags.socket)?;
308+
Ok(server)
309+
}
298310

299-
#[cfg(unix)]
300-
{
301-
server = server.add_listener(SOCKET_FD)?;
302-
}
311+
#[cfg(unix)]
312+
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
313+
fn create_server(flags: &args::Flags) -> Result<Server> {
314+
use std::os::fd::IntoRawFd;
315+
let listener = start_listener(&flags.socket).map_err(io_error!(e, "starting listener"))?;
316+
let mut server = Server::new();
317+
server = server.add_listener(listener.into_raw_fd())?;
318+
Ok(server)
319+
}
303320

304-
#[cfg(windows)]
305-
{
306-
let address = socket_address(&_flags.address, &_flags.namespace, &_flags.id);
307-
server = server.bind(address.as_str())?;
308-
}
321+
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
322+
fn create_server_with_retry(flags: &args::Flags) -> Result<Option<Server>> {
323+
// Really try to create a server.
324+
let server = match create_server(flags) {
325+
Ok(server) => server,
326+
Err(Error::IoError { err, .. }) if err.kind() == std::io::ErrorKind::AddrInUse => {
327+
// If the address is already in use then make sure it is up and running and return the address
328+
// This allows for running a single shim per container scenarios
329+
if let Ok(()) = wait_socket_working(&flags.socket, 5, 200) {
330+
write_address(&flags.socket)?;
331+
return Ok(None);
332+
}
333+
remove_socket(&flags.socket)?;
334+
create_server(flags)?
335+
}
336+
Err(e) => return Err(e),
337+
};
309338

310-
Ok(server)
339+
Ok(Some(server))
311340
}
312341

313342
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
@@ -463,96 +492,47 @@ pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result
463492
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
464493
let address = socket_address(&opts.address, &opts.namespace, grouping);
465494

466-
// Create socket and prepare listener.
467-
// On Linux, We'll use `add_listener` when creating TTRPC server, on Windows the value isn't used hence the clippy allow
468-
// (see note below about activation process for windows)
469-
#[allow(clippy::let_unit_value)]
470-
let _listener = match start_listener(&address) {
471-
Ok(l) => l,
472-
Err(e) => {
473-
if e.kind() != std::io::ErrorKind::AddrInUse {
474-
return Err(Error::IoError {
475-
context: "".to_string(),
476-
err: e,
477-
});
478-
};
479-
// If the address is already in use then make sure it is up and running and return the address
480-
// This allows for running a single shim per container scenarios
481-
if let Ok(()) = wait_socket_working(&address, 5, 200) {
482-
write_address(&address)?;
483-
return Ok((0, address));
484-
}
485-
remove_socket(&address)?;
486-
start_listener(&address).map_err(io_error!(e, ""))?
487-
}
488-
};
495+
// Activation pattern comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70
496+
// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating
497+
// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could
498+
// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented.
489499

490500
let mut command = Command::new(cmd);
491-
command.current_dir(cwd).envs(vars).args([
492-
"-namespace",
493-
&opts.namespace,
494-
"-id",
495-
&opts.id,
496-
"-address",
497-
&opts.address,
498-
]);
501+
command
502+
.current_dir(cwd)
503+
.stdout(Stdio::piped())
504+
.stdin(Stdio::null())
505+
.stderr(Stdio::null())
506+
.envs(vars)
507+
.args([
508+
"-namespace",
509+
&opts.namespace,
510+
"-id",
511+
&opts.id,
512+
"-address",
513+
&opts.address,
514+
"-socket",
515+
&address,
516+
]);
499517

500518
if opts.debug {
501519
command.arg("-debug");
502520
}
503521

504-
#[cfg(unix)]
505-
{
506-
command
507-
.stdout(Stdio::null())
508-
.stdin(Stdio::null())
509-
.stderr(Stdio::null())
510-
.fd_mappings(vec![FdMapping {
511-
parent_fd: _listener.into(),
512-
child_fd: SOCKET_FD,
513-
}])?;
514-
515-
command
516-
.spawn()
517-
.map_err(io_error!(e, "spawn shim"))
518-
.map(|child| {
519-
// Ownership of `listener` has been passed to child.
520-
(child.id(), address)
521-
})
522-
}
523-
522+
// On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn.
523+
// When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles.
524+
// Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles.
525+
// As a workaround we can Disables inheritance on the io pipe handles.
526+
// This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560
524527
#[cfg(windows)]
525-
{
526-
// Activation pattern for Windows comes from the hcsshim: https://github.com/microsoft/hcsshim/blob/v0.10.0-rc.7/cmd/containerd-shim-runhcs-v1/serve.go#L57-L70
527-
// another way to do it would to create named pipe and pass it to the child process through handle inheritence but that would require duplicating
528-
// the logic in Rust's 'command' for process creation. There is an issue in Rust to make it simplier to specify handle inheritence and this could
529-
// be revisited once https://github.com/rust-lang/rust/issues/54760 is implemented.
530-
let (mut reader, writer) = os_pipe::pipe().map_err(io_error!(e, "create pipe"))?;
531-
let stdio_writer = writer.try_clone().unwrap();
532-
533-
command
534-
.stdout(stdio_writer)
535-
.stdin(Stdio::null())
536-
.stderr(Stdio::null());
537-
538-
// On Windows Rust currently sets the `HANDLE_FLAG_INHERIT` flag to true when using Command::spawn.
539-
// When a child process is spawned by another process (containerd) the child process inherits the parent's stdin, stdout, and stderr handles.
540-
// Due to the HANDLE_FLAG_INHERIT flag being set to true this will cause containerd to hand until the child process closes the handles.
541-
// As a workaround we can Disables inheritance on the io pipe handles.
542-
// This workaround comes from https://github.com/rust-lang/rust/issues/54760#issuecomment-1045940560
543-
disable_handle_inheritance();
544-
command
545-
.spawn()
546-
.map_err(io_error!(e, "spawn shim"))
547-
.map(|child| {
548-
// IMPORTANT: we must drop the writer and command to close up handles before we copy the reader to stderr
549-
// AND the shim Start method must NOT write to stdout/stderr
550-
drop(writer);
551-
drop(command);
552-
io::copy(&mut reader, &mut io::stderr()).unwrap();
553-
(child.id(), address)
554-
})
555-
}
528+
disable_handle_inheritance();
529+
530+
let mut child = command.spawn().map_err(io_error!(e, "spawn shim"))?;
531+
532+
let mut reader = child.stdout.take().unwrap();
533+
std::io::copy(&mut reader, &mut std::io::stderr()).unwrap();
534+
535+
Ok((child.id(), address))
556536
}
557537

558538
#[cfg(windows)]
@@ -591,6 +571,19 @@ fn signal_server_started() {
591571
}
592572
}
593573

574+
// This closes the stdout handle which was mapped to the stderr on the first invocation of the shim.
575+
// This releases first process which will give containerd the address of the namedpipe.
576+
#[cfg(unix)]
577+
fn signal_server_started() {
578+
use libc::{dup2, STDERR_FILENO, STDOUT_FILENO};
579+
580+
unsafe {
581+
if dup2(STDERR_FILENO, STDOUT_FILENO) < 0 {
582+
panic!("Error closing pipe: {}", std::io::Error::last_os_error())
583+
}
584+
}
585+
}
586+
594587
#[cfg(test)]
595588
mod tests {
596589
use std::thread;

0 commit comments

Comments
 (0)