Skip to content

Commit 1417db9

Browse files
authored
feat(agent): implement DVC remote exec detached mode (#1567)
Adds fire-and-forget remote execution via the now proto DVC. Previously, all execution types (except Run) waited for the process exit code and tracked the execution session, but this behavior is not always what the user expects. This PR changes that and adds an option to specify if fire and forget mode is needed (return result right after process is spawned. Issue: [ARC-411](https://devolutions.atlassian.net/browse/ARC-411)
1 parent 7e0ab86 commit 1417db9

File tree

4 files changed

+134
-71
lines changed

4 files changed

+134
-71
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

devolutions-session/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ win-api-wrappers = { path = "../crates/win-api-wrappers", optional = true }
4444

4545
[dependencies.now-proto-pdu]
4646
optional = true
47-
version = "0.3.2"
47+
version = "0.4.1"
4848
features = ["std"]
4949

5050
[target.'cfg(windows)'.build-dependencies]

devolutions-session/src/dvc/process.rs

Lines changed: 92 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ pub enum ServerChannelEvent {
9898
pub struct WinApiProcessCtx {
9999
session_id: u32,
100100

101-
io_notification_tx: Sender<ServerChannelEvent>,
102-
103101
stdout_read_pipe: Option<Pipe>,
104102
stderr_read_pipe: Option<Pipe>,
105103
stdin_write_pipe: Option<Pipe>,
@@ -123,7 +121,7 @@ impl WinApiProcessCtx {
123121
Ok(())
124122
}
125123

126-
pub fn process_cancel(&mut self) -> Result<(), ExecError> {
124+
pub fn process_cancel(&mut self, io_notification_tx: Sender<ServerChannelEvent>) -> Result<(), ExecError> {
127125
info!(
128126
session_id = self.session_id,
129127
"Cancelling process execution by user request"
@@ -135,15 +133,18 @@ impl WinApiProcessCtx {
135133

136134
// Acknowledge client that cancel request has been processed
137135
// successfully.
138-
self.io_notification_tx
139-
.blocking_send(ServerChannelEvent::SessionCancelSuccess {
140-
session_id: self.session_id,
141-
})?;
136+
io_notification_tx.blocking_send(ServerChannelEvent::SessionCancelSuccess {
137+
session_id: self.session_id,
138+
})?;
142139

143140
Ok(())
144141
}
145142

146-
pub fn wait(mut self, mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>) -> Result<u32, ExecError> {
143+
pub fn wait(
144+
mut self,
145+
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
146+
io_notification_tx: Sender<ServerChannelEvent>,
147+
) -> Result<u32, ExecError> {
147148
let session_id = self.session_id;
148149

149150
info!(session_id, "Waiting for process to exit");
@@ -153,8 +154,7 @@ impl WinApiProcessCtx {
153154
const WAIT_OBJECT_INPUT_MESSAGE: WAIT_EVENT = WAIT_OBJECT_0;
154155
const WAIT_OBJECT_PROCESS_EXIT: WAIT_EVENT = WAIT_EVENT(WAIT_OBJECT_0.0 + 1);
155156

156-
self.io_notification_tx
157-
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
157+
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
158158

159159
loop {
160160
// SAFETY: No preconditions.
@@ -179,7 +179,7 @@ impl WinApiProcessCtx {
179179
return Err(ExecError::Aborted);
180180
}
181181
ProcessIoInputEvent::CancelExecution => {
182-
self.process_cancel()?;
182+
self.process_cancel(io_notification_tx.clone())?;
183183

184184
// wait for process to exit
185185
continue;
@@ -209,6 +209,7 @@ impl WinApiProcessCtx {
209209
pub fn wait_with_io_redirection(
210210
mut self,
211211
mut input_event_rx: WinapiSignaledReceiver<ProcessIoInputEvent>,
212+
io_notification_tx: Sender<ServerChannelEvent>,
212213
) -> Result<u32, ExecError> {
213214
let session_id = self.session_id;
214215

@@ -277,8 +278,7 @@ impl WinApiProcessCtx {
277278

278279
// Signal client side about started execution
279280

280-
self.io_notification_tx
281-
.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
281+
io_notification_tx.blocking_send(ServerChannelEvent::SessionStarted { session_id })?;
282282

283283
info!(session_id, "Process IO is ready for async loop execution");
284284
loop {
@@ -304,7 +304,7 @@ impl WinApiProcessCtx {
304304
return Err(ExecError::Aborted);
305305
}
306306
ProcessIoInputEvent::CancelExecution => {
307-
self.process_cancel()?;
307+
self.process_cancel(io_notification_tx.clone())?;
308308

309309
// wait for process to exit
310310
continue;
@@ -369,26 +369,24 @@ impl WinApiProcessCtx {
369369
// EOF on stdout pipe, close it and send EOF message to message_tx
370370
self.stdout_read_pipe = None;
371371

372-
self.io_notification_tx
373-
.blocking_send(ServerChannelEvent::SessionDataOut {
374-
session_id,
375-
stream: NowExecDataStreamKind::Stdout,
376-
last: true,
377-
data: Vec::new(),
378-
})?;
372+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
373+
session_id,
374+
stream: NowExecDataStreamKind::Stdout,
375+
last: true,
376+
data: Vec::new(),
377+
})?;
379378
}
380379
_code => return Err(err.into()),
381380
}
382381
continue;
383382
}
384383

385-
self.io_notification_tx
386-
.blocking_send(ServerChannelEvent::SessionDataOut {
387-
session_id,
388-
stream: NowExecDataStreamKind::Stdout,
389-
last: false,
390-
data: stdout_buffer[..bytes_read as usize].to_vec(),
391-
})?;
384+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
385+
session_id,
386+
stream: NowExecDataStreamKind::Stdout,
387+
last: false,
388+
data: stdout_buffer[..bytes_read as usize].to_vec(),
389+
})?;
392390

393391
// Schedule next overlapped read
394392
// SAFETY: pipe is valid to read from, as long as it is not closed.
@@ -432,26 +430,24 @@ impl WinApiProcessCtx {
432430
ERROR_HANDLE_EOF | ERROR_BROKEN_PIPE => {
433431
// EOF on stderr pipe, close it and send EOF message to message_tx
434432
self.stderr_read_pipe = None;
435-
self.io_notification_tx
436-
.blocking_send(ServerChannelEvent::SessionDataOut {
437-
session_id,
438-
stream: NowExecDataStreamKind::Stderr,
439-
last: true,
440-
data: Vec::new(),
441-
})?;
433+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
434+
session_id,
435+
stream: NowExecDataStreamKind::Stderr,
436+
last: true,
437+
data: Vec::new(),
438+
})?;
442439
}
443440
_code => return Err(err.into()),
444441
}
445442
continue;
446443
}
447444

448-
self.io_notification_tx
449-
.blocking_send(ServerChannelEvent::SessionDataOut {
450-
session_id,
451-
stream: NowExecDataStreamKind::Stderr,
452-
last: false,
453-
data: stderr_buffer[..bytes_read as usize].to_vec(),
454-
})?;
445+
io_notification_tx.blocking_send(ServerChannelEvent::SessionDataOut {
446+
session_id,
447+
stream: NowExecDataStreamKind::Stderr,
448+
last: false,
449+
data: stderr_buffer[..bytes_read as usize].to_vec(),
450+
})?;
455451

456452
// Schedule next overlapped read
457453
// SAFETY: pipe is valid to read from, as long as it is not closed.
@@ -527,12 +523,12 @@ impl WinApiProcessBuilder {
527523
self
528524
}
529525

530-
/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
531-
pub fn run(
526+
fn run_impl(
532527
mut self,
533528
session_id: u32,
534-
io_notification_tx: Sender<ServerChannelEvent>,
535-
) -> Result<WinApiProcess, ExecError> {
529+
io_notification_tx: Option<Sender<ServerChannelEvent>>,
530+
detached: bool,
531+
) -> Result<Option<WinApiProcess>, ExecError> {
536532
let command_line = format!("\"{}\" {}", self.executable, self.command_line)
537533
.trim_end()
538534
.to_owned();
@@ -557,31 +553,42 @@ impl WinApiProcessBuilder {
557553
let io_redirection = self.enable_io_redirection;
558554

559555
let process_ctx = if io_redirection {
560-
prepare_process_with_io_redirection(
561-
session_id,
562-
command_line,
563-
current_directory,
564-
self.env,
565-
io_notification_tx.clone(),
566-
)?
556+
prepare_process_with_io_redirection(session_id, command_line, current_directory, self.env)?
567557
} else {
568-
prepare_process(
569-
session_id,
570-
command_line,
571-
current_directory,
572-
self.env,
573-
io_notification_tx.clone(),
574-
)?
558+
prepare_process(session_id, command_line, current_directory, self.env)?
575559
};
576560

561+
// For detached mode, spawn a thread that waits for process exit and keeps temp files alive
562+
if detached && !temp_files.is_empty() {
563+
std::thread::spawn(move || {
564+
let _temp_files = temp_files;
565+
566+
// Wait for process to exit (indefinitely)
567+
if let Err(error) = process_ctx.process.wait(None) {
568+
error!(%error, session_id, "Failed to wait for detached process");
569+
return;
570+
}
571+
572+
info!(session_id, "Detached process exited");
573+
574+
// Temp files will be cleaned up when this thread exits
575+
});
576+
577+
info!(session_id, "Detached process started successfully");
578+
return Ok(None);
579+
}
580+
577581
// Create channel for `task` -> `Process IO thread` communication
578582
let (input_event_tx, input_event_rx) = winapi_signaled_mpsc_channel()?;
579583

584+
let io_notification_tx =
585+
io_notification_tx.expect("BUG: io_notification_tx must be Some for non-detached mode");
586+
580587
let join_handle = std::thread::spawn(move || {
581588
let run_result = if io_redirection {
582-
process_ctx.wait_with_io_redirection(input_event_rx)
589+
process_ctx.wait_with_io_redirection(input_event_rx, io_notification_tx.clone())
583590
} else {
584-
process_ctx.wait(input_event_rx)
591+
process_ctx.wait(input_event_rx, io_notification_tx.clone())
585592
};
586593

587594
let notification = match run_result {
@@ -594,11 +601,31 @@ impl WinApiProcessBuilder {
594601
}
595602
});
596603

597-
Ok(WinApiProcess {
604+
Ok(Some(WinApiProcess {
598605
input_event_tx,
599606
join_handle,
600607
_temp_files: temp_files,
601-
})
608+
}))
609+
}
610+
611+
/// Starts process execution and spawns IO thread to redirect stdio to/from dvc.
612+
pub fn run(
613+
self,
614+
session_id: u32,
615+
io_notification_tx: Sender<ServerChannelEvent>,
616+
) -> Result<WinApiProcess, ExecError> {
617+
Ok(self
618+
.run_impl(session_id, Some(io_notification_tx), false)?
619+
.expect("result should be non-optional when running in non-detached mode"))
620+
}
621+
622+
/// Starts process in detached mode (fire-and-forget).
623+
/// No IO redirection. Process exit is monitored in a background thread to manage temp file cleanup.
624+
/// Returns immediately after spawning.
625+
pub fn run_detached(self, session_id: u32) -> Result<(), ExecError> {
626+
// Result always empty and therefore ignored in detached mode.
627+
self.run_impl(session_id, None, true)?;
628+
Ok(())
602629
}
603630
}
604631

@@ -607,7 +634,6 @@ fn prepare_process(
607634
mut command_line: WideString,
608635
current_directory: WideString,
609636
env: HashMap<String, String>,
610-
io_notification_tx: Sender<ServerChannelEvent>,
611637
) -> Result<WinApiProcessCtx, ExecError> {
612638
let mut process_information = PROCESS_INFORMATION::default();
613639

@@ -620,6 +646,7 @@ fn prepare_process(
620646
let environment_block = (!env.is_empty()).then(|| make_environment_block(env)).transpose()?;
621647

622648
let mut creation_flags = NORMAL_PRIORITY_CLASS | CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE;
649+
623650
if environment_block.is_some() {
624651
creation_flags |= CREATE_UNICODE_ENVIRONMENT;
625652
}
@@ -657,7 +684,6 @@ fn prepare_process(
657684

658685
Ok(WinApiProcessCtx {
659686
session_id,
660-
io_notification_tx,
661687
stdout_read_pipe: None,
662688
stderr_read_pipe: None,
663689
stdin_write_pipe: None,
@@ -671,7 +697,6 @@ fn prepare_process_with_io_redirection(
671697
mut command_line: WideString,
672698
current_directory: WideString,
673699
env: HashMap<String, String>,
674-
io_notification_tx: Sender<ServerChannelEvent>,
675700
) -> Result<WinApiProcessCtx, ExecError> {
676701
let mut process_information = PROCESS_INFORMATION::default();
677702

@@ -741,7 +766,6 @@ fn prepare_process_with_io_redirection(
741766

742767
let process_ctx = WinApiProcessCtx {
743768
session_id,
744-
io_notification_tx,
745769
stdout_read_pipe: Some(stdout_read_pipe),
746770
stderr_read_pipe: Some(stderr_read_pipe),
747771
stdin_write_pipe: Some(stdin_write_pipe),

0 commit comments

Comments
 (0)