Skip to content

Commit 2fc96c5

Browse files
author
Andrew
committed
stage 1
1 parent 6483333 commit 2fc96c5

File tree

1 file changed

+105
-78
lines changed

1 file changed

+105
-78
lines changed

dsc_lib/src/dscresources/command_resource.rs

Lines changed: 105 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@
33

44
use jsonschema::JSONSchema;
55
use serde_json::Value;
6-
use std::{collections::HashMap, env, io::{Read, Write}, process::{Command, Stdio}};
6+
use std::{collections::HashMap, env, io::{Read, Write}};
77
use crate::{configure::{config_doc::ExecutionKind, {config_result::ResourceGetResult, parameters, Configurator}}, util::parse_input_to_json};
88
use crate::dscerror::DscError;
99
use super::{dscresource::get_diff, invoke_result::{ExportResult, GetResult, ResolveResult, SetResult, TestResult, ValidateResult, ResourceGetResponse, ResourceSetResponse, ResourceTestResponse, get_in_desired_state}, resource_manifest::{ArgKind, InputKind, Kind, ResourceManifest, ReturnKind, SchemaKind}};
1010
use tracing::{error, warn, info, debug, trace};
11+
use tokio::process::Command;
12+
use std::process::Stdio;
13+
use std::process::ExitStatus;
14+
use tokio::io::{BufReader, AsyncBufReadExt, AsyncWriteExt};
15+
use tokio::task::JoinError;
1116

1217
pub const EXIT_PROCESS_TERMINATED: i32 = 0x102;
1318

@@ -553,21 +558,8 @@ pub fn invoke_resolve(resource: &ResourceManifest, cwd: &str, input: &str) -> Re
553558
Ok(result)
554559
}
555560

556-
/// Invoke a command and return the exit code, stdout, and stderr.
557-
///
558-
/// # Arguments
559-
///
560-
/// * `executable` - The command to execute
561-
/// * `args` - Optional arguments to pass to the command
562-
/// * `input` - Optional input to pass to the command
563-
/// * `cwd` - Optional working directory to execute the command in
564-
///
565-
/// # Errors
566-
///
567-
/// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened.
568-
#[allow(clippy::implicit_hasher)]
569-
pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
570-
debug!("Invoking command '{}' with args {:?}", executable, args);
561+
async fn run_process_async(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
562+
571563
let mut command = Command::new(executable);
572564
if input.is_some() {
573565
command.stdin(Stdio::piped());
@@ -583,62 +575,96 @@ pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option
583575
if let Some(env) = env {
584576
command.envs(env);
585577
}
586-
587578
if executable == "dsc" && env::var("DEBUG_DSC").is_ok() {
588579
// remove this env var from child process as it will fail reading from keyboard to allow attaching
589580
command.env_remove("DEBUG_DSC");
590581
}
591582

592-
let mut child = command.spawn()?;
583+
let mut child = command.spawn().expect("failed to spawn command");
584+
585+
let stdout = child.stdout.take().expect("child did not have a handle to stdout");
586+
let stderr = child.stderr.take().expect("child did not have a handle to stderr");
587+
let mut stdout_reader = BufReader::new(stdout).lines();
588+
let mut stderr_reader = BufReader::new(stderr).lines();
589+
593590
if let Some(input) = input {
594591
trace!("Writing to command STDIN: {input}");
595-
// pipe to child stdin in a scope so that it is dropped before we wait
596-
// otherwise the pipe isn't closed and the child process waits forever
597-
let Some(mut child_stdin) = child.stdin.take() else {
598-
return Err(DscError::CommandOperation("Failed to open stdin".to_string(), executable.to_string()));
599-
};
600-
child_stdin.write_all(input.as_bytes())?;
601-
child_stdin.flush()?;
592+
let mut stdin = child.stdin.take().expect("child did not have a handle to stdin");
593+
stdin.write(input.as_bytes()).await.expect("could not write to stdin");
594+
drop(stdin);
602595
}
603-
604-
let Some(mut child_stdout) = child.stdout.take() else {
605-
return Err(DscError::CommandOperation("Failed to open stdout".to_string(), executable.to_string()));
596+
597+
let child_id: u32 = match child.id() {
598+
Some(id) => id,
599+
None => {
600+
return Err(DscError::CommandOperation("Can't get child process id".to_string(), executable.to_string()));
601+
}
606602
};
607-
let mut stdout_buf = Vec::new();
608-
child_stdout.read_to_end(&mut stdout_buf)?;
609603

610-
let Some(mut child_stderr) = child.stderr.take() else {
611-
return Err(DscError::CommandOperation("Failed to open stderr".to_string(), executable.to_string()));
612-
};
613-
let mut stderr_buf = Vec::new();
614-
child_stderr.read_to_end(&mut stderr_buf)?;
615-
616-
let exit_status = child.wait()?;
617-
let exit_code = exit_status.code().unwrap_or(EXIT_PROCESS_TERMINATED);
618-
let stdout = String::from_utf8_lossy(&stdout_buf).to_string();
619-
let stderr = String::from_utf8_lossy(&stderr_buf).to_string();
620-
if !stdout.is_empty() {
621-
trace!("STDOUT returned: {}", &stdout);
604+
// Ensure the child process is spawned in the runtime so it can
605+
// make progress on its own while we await for any output.
606+
let child_result:Result<ExitStatus, JoinError> = tokio::spawn(async {
607+
let status = child.wait_with_output().await;
608+
return status.unwrap().status
609+
}).await;
610+
611+
let mut stdout_result = String::with_capacity(1024*1024);
612+
while let Some(line) = stdout_reader.next_line().await? {
613+
stdout_result.push_str(&line);
614+
stdout_result.push('\n');
622615
}
623-
let cleaned_stderr = if stderr.is_empty() {
624-
stderr
625-
} else {
626-
trace!("STDERR returned data to be traced");
627-
log_resource_traces(executable, &child.id(), &stderr);
628-
// TODO: remove logged traces from STDERR
629-
String::new()
630-
};
631616

632-
if exit_code != 0 {
633-
if let Some(exit_codes) = exit_codes {
634-
if let Some(error_message) = exit_codes.get(&exit_code) {
635-
return Err(DscError::CommandExitFromManifest(executable.to_string(), exit_code, error_message.to_string()));
617+
let mut filtered_stderr = String::with_capacity(1024*1024);
618+
while let Some(stderr_line) = stderr_reader.next_line().await? {
619+
let filtered_stderr_line = log_stderr_line(executable, &child_id, &stderr_line);
620+
if !filtered_stderr_line.is_empty() {
621+
filtered_stderr.push_str(filtered_stderr_line);
622+
filtered_stderr.push('\n');
623+
}
624+
}
625+
626+
let exit_code = child_result.unwrap().code();
627+
match exit_code {
628+
Some(code) => {
629+
debug!("Process '{executable}' id {child_id} exited with code {code}");
630+
631+
if code != 0 {
632+
if let Some(exit_codes) = exit_codes {
633+
if let Some(error_message) = exit_codes.get(&code) {
634+
return Err(DscError::CommandExitFromManifest(executable.to_string(), code, error_message.to_string()));
635+
}
636+
}
637+
return Err(DscError::Command(executable.to_string(), code, filtered_stderr));
636638
}
639+
640+
Ok((code, stdout_result, filtered_stderr)) },
641+
None => {
642+
debug!("Process '{executable}' id {child_id} terminated by signal");
643+
return Err(DscError::CommandOperation("Process terminated by signal".to_string(), executable.to_string()));
637644
}
638-
return Err(DscError::Command(executable.to_string(), exit_code, cleaned_stderr));
639645
}
646+
}
647+
/// Invoke a command and return the exit code, stdout, and stderr.
648+
///
649+
/// # Arguments
650+
///
651+
/// * `executable` - The command to execute
652+
/// * `args` - Optional arguments to pass to the command
653+
/// * `input` - Optional input to pass to the command
654+
/// * `cwd` - Optional working directory to execute the command in
655+
///
656+
/// # Errors
657+
///
658+
/// Error is returned if the command fails to execute or stdin/stdout/stderr cannot be opened.
659+
#[allow(clippy::implicit_hasher)]
660+
pub fn invoke_command(executable: &str, args: Option<Vec<String>>, input: Option<&str>, cwd: Option<&str>, env: Option<HashMap<String, String>>, exit_codes: &Option<HashMap<i32, String>>) -> Result<(i32, String, String), DscError> {
661+
debug!("Invoking command '{}' with args {:?}", executable, args);
640662

641-
Ok((exit_code, stdout, cleaned_stderr))
663+
tokio::runtime::Builder::new_multi_thread()
664+
.enable_all()
665+
.build()
666+
.unwrap()
667+
.block_on(run_process_async(executable, args, input, cwd, env, exit_codes))
642668
}
643669

644670
fn process_args(args: &Option<Vec<ArgKind>>, value: &str) -> Option<Vec<String>> {
@@ -784,30 +810,31 @@ fn json_to_hashmap(json: &str) -> Result<HashMap<String, String>, DscError> {
784810
///
785811
/// * `process_name` - The name of the process
786812
/// * `process_id` - The ID of the process
787-
/// * `stderr` - The stderr output from the process
788-
pub fn log_resource_traces(process_name: &str, process_id: &u32, stderr: &str)
813+
/// * `trace_line` - The stderr line from the process
814+
pub fn log_stderr_line<'a>(process_name: &str, process_id: &u32, trace_line: &'a str) -> &'a str
789815
{
790-
if !stderr.is_empty()
816+
if !trace_line.is_empty()
791817
{
792-
for trace_line in stderr.lines() {
793-
if let Result::Ok(json_obj) = serde_json::from_str::<Value>(trace_line) {
794-
if let Some(msg) = json_obj.get("Error") {
795-
error!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
796-
} else if let Some(msg) = json_obj.get("Warning") {
797-
warn!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
798-
} else if let Some(msg) = json_obj.get("Info") {
799-
info!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
800-
} else if let Some(msg) = json_obj.get("Debug") {
801-
debug!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
802-
} else if let Some(msg) = json_obj.get("Trace") {
803-
trace!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
804-
} else {
805-
// TODO: deserialize tracing JSON to have better presentation
806-
trace!("Process '{process_name}' id {process_id} : {trace_line}");
807-
};
818+
if let Result::Ok(json_obj) = serde_json::from_str::<Value>(trace_line) {
819+
if let Some(msg) = json_obj.get("Error") {
820+
error!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
821+
} else if let Some(msg) = json_obj.get("Warning") {
822+
warn!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
823+
} else if let Some(msg) = json_obj.get("Info") {
824+
info!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
825+
} else if let Some(msg) = json_obj.get("Debug") {
826+
debug!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
827+
} else if let Some(msg) = json_obj.get("Trace") {
828+
trace!("Process '{process_name}' id {process_id} : {}", msg.as_str().unwrap_or_default());
808829
} else {
809-
trace!("Process '{process_name}' id {process_id} : {trace_line}");
810-
}
830+
// the line is a valid json, but not one of standard trace lines - return it as filtered stderr_line
831+
return trace_line;
832+
};
833+
} else {
834+
// the line is not a valid json - return it as filtered stderr_line
835+
return trace_line;
811836
}
812-
}
837+
};
838+
839+
return "";
813840
}

0 commit comments

Comments
 (0)