diff --git a/pkg/custompluginmonitor/plugin/plugin.go b/pkg/custompluginmonitor/plugin/plugin.go index 0b90e9c01..9f9cad32b 100644 --- a/pkg/custompluginmonitor/plugin/plugin.go +++ b/pkg/custompluginmonitor/plugin/plugin.go @@ -17,7 +17,6 @@ limitations under the License. package plugin import ( - "context" "fmt" "io" "os/exec" @@ -143,15 +142,15 @@ func readFromReader(reader io.ReadCloser, maxBytes int64) ([]byte, error) { } func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, output string) { - var ctx context.Context - var cancel context.CancelFunc + isTimeout := false + isHung := false + var timeoutDuration time.Duration if rule.Timeout != nil && *rule.Timeout < *p.config.PluginGlobalConfig.Timeout { - ctx, cancel = context.WithTimeout(context.Background(), *rule.Timeout) + timeoutDuration = *rule.Timeout } else { - ctx, cancel = context.WithTimeout(context.Background(), *p.config.PluginGlobalConfig.Timeout) + timeoutDuration = *p.config.PluginGlobalConfig.Timeout } - defer cancel() cmd := util.Exec(rule.Path, rule.Args...) @@ -170,37 +169,6 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp return cpmtypes.Unknown, "Error in starting plugin. Please check the error log" } - waitChan := make(chan struct{}) - defer close(waitChan) - - var m sync.Mutex - timeout := false - - go func() { - select { - case <-ctx.Done(): - if ctx.Err() == context.Canceled { - return - } - klog.Errorf("Error in running plugin timeout %q", rule.Path) - if cmd.Process == nil || cmd.Process.Pid == 0 { - klog.Errorf("Error in cmd.Process check %q", rule.Path) - break - } - - m.Lock() - timeout = true - m.Unlock() - - err := util.Kill(cmd) - if err != nil { - klog.Errorf("Error in kill process %d, %v", cmd.Process.Pid, err) - } - case <-waitChan: - return - } - }() - var ( wg sync.WaitGroup stdout []byte @@ -220,14 +188,46 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp }() // This will wait for the reads to complete. If the execution times out, the pipes // will be closed and the wait group unblocks. - wg.Wait() + // If the timeout is caused by the plugin process or sub-process hung due to GPU device errors or other reasons, + // wg.Wait() will be blocked forever, so we need to add a timeout to the wait group. + waitChan := make(chan struct{}) + go func() { + wg.Wait() + close(waitChan) + }() + select { + case <-waitChan: + // The reads are done. + break + case <-time.After(timeoutDuration): + klog.Errorf("Waiting for command output timed out when running plugin %q", rule.Path) + isTimeout = true + err := util.Kill(cmd) + if err != nil { + klog.Errorf("Error when killing process %d: %v", cmd.Process.Pid, err) + } else { + klog.Infof("Killed process %d successfully", cmd.Process.Pid) + } - if stdoutErr != nil { + // Check if the process is in D state. If it is, the process is hung and can not be killed. + // It also means that the plugin can not report the correct status, instead reports Unknown status. + // On a GPU machine, a plugin with Python script calling pynvml API may hang in D state due to some GPU device errors. + if util.IsProcessInDState(cmd.Process.Pid) { + klog.Errorf("Process %d is hung in D state", cmd.Process.Pid) + isHung = true + } + } + + if isHung { + return cpmtypes.Unknown, fmt.Sprintf("Process is hung when running plugin %s", rule.Path) + } + + if !isTimeout && stdoutErr != nil { klog.Errorf("Error reading stdout for plugin %q: error - %v", rule.Path, err) return cpmtypes.Unknown, "Error reading stdout for plugin. Please check the error log" } - if stderrErr != nil { + if !isTimeout && stderrErr != nil { klog.Errorf("Error reading stderr for plugin %q: error - %v", rule.Path, err) return cpmtypes.Unknown, "Error reading stderr for plugin. Please check the error log" } @@ -239,16 +239,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp } } - // trim suffix useless bytes - output = string(stdout) - output = strings.TrimSpace(output) - - m.Lock() - cmdKilled := timeout - m.Unlock() - - if cmdKilled { - output = fmt.Sprintf("Timeout when running plugin %q: state - %s. output - %q", rule.Path, cmd.ProcessState.String(), output) + stderrStr := "" + if isTimeout { + output = fmt.Sprintf("Timeout when running plugin %q: state - %s. output - %q", rule.Path, cmd.ProcessState.String(), "") + } else { + // trim suffix useless bytes + output = strings.TrimSpace(string(stdout)) + stderrStr = strings.TrimSpace(string(stderr)) } // cut at position max_output_length if stdout is longer than max_output_length bytes @@ -259,13 +256,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus() switch exitCode { case 0: - logPluginStderr(rule, string(stderr), 3) + logPluginStderr(rule, stderrStr, 3) return cpmtypes.OK, output case 1: - logPluginStderr(rule, string(stderr), 0) + logPluginStderr(rule, stderrStr, 0) return cpmtypes.NonOK, output default: - logPluginStderr(rule, string(stderr), 0) + logPluginStderr(rule, stderrStr, 0) return cpmtypes.Unknown, output } } diff --git a/pkg/util/exec_unix.go b/pkg/util/exec_unix.go index d7311ba3f..a8f52e625 100644 --- a/pkg/util/exec_unix.go +++ b/pkg/util/exec_unix.go @@ -21,6 +21,8 @@ package util import ( "fmt" "os/exec" + "strconv" + "strings" "syscall" ) @@ -42,3 +44,13 @@ func Kill(cmd *exec.Cmd) error { } return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) } + +// IsProcessInDState checks if the process is in D state. +func IsProcessInDState(pid int) bool { + // "-o stat=" is used to get the process state without the header. + cmd := exec.Command("ps", "-o", "stat=", "-p", strconv.Itoa(pid)) + if res, err := cmd.Output(); err == nil && strings.Contains(string(res), "D") { + return true + } + return false +} diff --git a/pkg/util/exec_windows.go b/pkg/util/exec_windows.go index 3e70d6f23..e11cca264 100644 --- a/pkg/util/exec_windows.go +++ b/pkg/util/exec_windows.go @@ -86,3 +86,8 @@ func Kill(cmd *exec.Cmd) error { } return err } + +// IsProcessInDState does not apply to Windows. +func IsProcessInDState(pid int) bool { + return false +}