Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions internal/core/local_runtime/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,36 @@ func (s *PluginInstance) Stop() {
// Once the subprocess exists itself, STDOUT always close, which results in `CLOSE STDOUT`
func (s *PluginInstance) StartStdout() {
defer func() {
// mark shutdown to stop monitor loop
s.shutdown = true
// best-effort kill process (and its group on supported platforms)
if s.cmd != nil && s.cmd.Process != nil {
pid := s.cmd.Process.Pid
pgid, getpgidErr, groupKillErr := tryKillProcessGroup(s.cmd)
if getpgidErr == nil {
if groupKillErr != nil {
s.WalkNotifiers(func(notifier PluginInstanceNotifier) {
notifier.OnInstanceErrorLog(s, fmt.Errorf("failed to kill process group %d: %s", pgid, groupKillErr.Error()))
})
}
} else {
// fallback to single-process kill
s.WalkNotifiers(func(notifier PluginInstanceNotifier) {
notifier.OnInstanceWarningLog(s, fmt.Sprintf("failed to get pgid for pid %d: %s. fallback to killing process", pid, getpgidErr.Error()))
})
if err := killProcess(s.cmd); err != nil {
s.WalkNotifiers(func(notifier PluginInstanceNotifier) {
notifier.OnInstanceErrorLog(s, fmt.Errorf("failed to kill process %d: %s", pid, err.Error()))
})
}
}
// reap once
if err := s.cmd.Wait(); err != nil {
s.WalkNotifiers(func(notifier PluginInstanceNotifier) {
notifier.OnInstanceErrorLog(s, fmt.Errorf("failed to reap process %d: %s", pid, err.Error()))
})
}
}
// notify shutdown signal
s.WalkNotifiers(func(notifier PluginInstanceNotifier) {
notifier.OnInstanceShutdown(s)
Expand Down Expand Up @@ -171,22 +201,7 @@ func (s *PluginInstance) StartStdout() {
)
})
}

// once reader of stdout is closed, kill subprocess
if err := s.cmd.Process.Kill(); err != nil {
// no need to return here, just log the error, it's perhaps the process was exited already
// and the kill command fails
s.WalkNotifiers(func(notifier PluginInstanceNotifier) {
notifier.OnInstanceErrorLog(s, fmt.Errorf("failed to kill subprocess: %s", err.Error()))
})
}

// collect subprocess, avoid zombie processes
if _, err := s.cmd.Process.Wait(); err != nil {
s.WalkNotifiers(func(notifier PluginInstanceNotifier) {
notifier.OnInstanceErrorLog(s, fmt.Errorf("failed to reap subprocess: %s", err.Error()))
})
}
// process kill and wait are handled in the deferred cleanup above
}

// handles stdout data and notify corresponding listeners
Expand Down
11 changes: 11 additions & 0 deletions internal/core/local_runtime/proc_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//go:build !(darwin || linux || freebsd || netbsd || openbsd || windows)

package local_runtime

import "os/exec"

func setProcGroup(cmd *exec.Cmd) {}

func tryKillProcessGroup(cmd *exec.Cmd) (int, error, error) { return 0, nil, nil }

func killProcess(cmd *exec.Cmd) error { if cmd != nil && cmd.Process != nil { return cmd.Process.Kill() }; return nil }
40 changes: 40 additions & 0 deletions internal/core/local_runtime/proc_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//go:build darwin || linux || freebsd || netbsd || openbsd

package local_runtime

import (
"os/exec"
"syscall"
)

// setProcGroup places the child into its own process group (Unix only)
func setProcGroup(cmd *exec.Cmd) {
if cmd == nil {
return
}
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
}

// tryKillProcessGroup tries to kill the process group and returns diagnostic errors.
// Returns: (pgid, getpgidErr, killErr)
func tryKillProcessGroup(cmd *exec.Cmd) (int, error, error) {
if cmd == nil || cmd.Process == nil {
return 0, syscall.EINVAL, nil
}
pid := cmd.Process.Pid
pgid, err := syscall.Getpgid(pid)
if err != nil {
return 0, err, nil
}
if err := syscall.Kill(-pgid, syscall.SIGKILL); err != nil {
return pgid, nil, err
}
return pgid, nil, nil
}

func killProcess(cmd *exec.Cmd) error {
if cmd == nil || cmd.Process == nil {
return nil
}
return cmd.Process.Kill()
}
23 changes: 23 additions & 0 deletions internal/core/local_runtime/proc_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//go:build windows

package local_runtime

import (
"fmt"
"os/exec"
)

// On Windows, do nothing for process group. Consider Job Objects for full tree control.
func setProcGroup(cmd *exec.Cmd) {}

// tryKillProcessGroup is unsupported on Windows. Return a getpgid-like error for caller to log and fallback.
func tryKillProcessGroup(cmd *exec.Cmd) (int, error, error) {
return 0, fmt.Errorf("process group unsupported on windows"), nil
}

func killProcess(cmd *exec.Cmd) error {
if cmd != nil && cmd.Process != nil {
return cmd.Process.Kill()
}
return nil
}
2 changes: 2 additions & 0 deletions internal/core/local_runtime/subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func (r *LocalPluginRuntime) getInstanceCmd() (*exec.Cmd, error) {
return nil, err
}
cmd = exec.Command(pythonPath, "-m", r.Config.Meta.Runner.Entrypoint)
// ensure child is placed in its own process group on supported platforms
setProcGroup(cmd)

default:
return nil, fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
Expand Down
Loading