Skip to content

Commit 575d218

Browse files
feat: cleanup timeout for process termination (#207)
Implement configurable cleanup timeout to control how long the system waits for process cleanup verification before triggering forced shutdown. This replaces the hardcoded 10-second timeout with a user-configurable value. Configuration: - Add --cleanup-timeout CLI flag (default: 10s, env: COG_CLEANUP_TIMEOUT) - Flow: CLI → Config → Runner for proper field injection Implementation changes: - Replace hardcoded timeout with context.WithTimeout() pattern - Improve check interval from 100ms to 10ms for more responsive detection - Add injectable verifyFn field to Runner for comprehensive testing - Update terminology from "ungraceful shutdown" to "forced shutdown" Testing improvements: - Migrate time-dependent tests to Go 1.25 synctest for deterministic execution - Add comprehensive test coverage for timeout scenarios and multiple ForceKill calls - Use safe high PID values (9999999) and proper mocking to prevent real process operations - Configure linter to handle synctest patterns correctly This enables users to customize cleanup wait times based on their specific workload requirements while maintaining safe defaults. * Address leaking coglets from test The test harness now properly cleans up orphaned coglet processes when interrupted: Key Features: - Cross-platform compatibility: Works on both macOS (Darwin) and Linux - Robust process discovery: Uses pgrep -f coglet as primary method, falls back to ps with platform-specific flags - Safe process killing: Validates we can signal processes before attempting to kill them - Process group cleanup: Kills both process groups and individual processes - Safety measures: Never kills ourselves, our parent, or PID 1 - Signal handling: Responds to both SIGINT (ctrl+c) and SIGTERM Implementation Details: - killAllChildProcesses() finds and kills coglet processes during cleanup - findCogletProcesses() tries pgrep first, falls back to ps - Platform-specific ps flags: -ax on macOS, -e on Linux - Process ownership validation through signal testing before killing - Integrated into existing TestMain signal handler --------- Co-Authored-By: Michael Dwan <[email protected]> Co-authored-by: Morgan Fainberg <[email protected]>
1 parent efb1bdb commit 575d218

File tree

7 files changed

+676
-113
lines changed

7 files changed

+676
-113
lines changed

.golangci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ linters:
6464
- source: 'defer resp.Body.Close()'
6565
linters:
6666
- errcheck
67+
- source: 'synctest\.Test\(t, func\(t \*testing\.T\)'
68+
linters:
69+
- thelper
6770
settings:
6871
errcheck:
6972
disable-default-exclusions: false

cmd/cog/main.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ import (
1919
)
2020

2121
type ServerCmd struct {
22-
Host string `help:"Host address to bind the HTTP server to" default:"0.0.0.0"`
23-
Port int `help:"Port number for the HTTP server" default:"5000"`
24-
UseProcedureMode bool `help:"Enable procedure mode for concurrent predictions" name:"use-procedure-mode"`
25-
AwaitExplicitShutdown bool `help:"Wait for explicit shutdown signal instead of auto-shutdown" name:"await-explicit-shutdown"`
26-
UploadURL string `help:"Base URL for uploading prediction output files" name:"upload-url"`
27-
WorkingDirectory string `help:"Override the working directory for predictions" name:"working-directory"`
28-
RunnerShutdownGracePeriod time.Duration `help:"Grace period before force-killing prediction runners" name:"runner-shutdown-grace-period" default:"600s"`
22+
Host string `help:"Host address to bind the HTTP server to" default:"0.0.0.0" env:"COG_HOST"`
23+
Port int `help:"Port number for the HTTP server" default:"5000" env:"COG_PORT"`
24+
UseProcedureMode bool `help:"Enable procedure mode for concurrent predictions" name:"use-procedure-mode" env:"COG_USE_PROCEDURE_MODE"`
25+
AwaitExplicitShutdown bool `help:"Wait for explicit shutdown signal instead of auto-shutdown" name:"await-explicit-shutdown" env:"COG_AWAIT_EXPLICIT_SHUTDOWN"`
26+
OneShot bool `help:"Enable one-shot mode (single runner, wait for cleanup before ready)" name:"one-shot" env:"COG_ONE_SHOT"`
27+
UploadURL string `help:"Base URL for uploading prediction output files" name:"upload-url" env:"COG_UPLOAD_URL"`
28+
WorkingDirectory string `help:"Override the working directory for predictions" name:"working-directory" env:"COG_WORKING_DIRECTORY"`
29+
RunnerShutdownGracePeriod time.Duration `help:"Grace period before force-killing prediction runners" name:"runner-shutdown-grace-period" default:"600s" env:"COG_RUNNER_SHUTDOWN_GRACE_PERIOD"`
30+
CleanupTimeout time.Duration `help:"Maximum time to wait for process cleanup before hard exit" name:"cleanup-timeout" default:"10s" env:"COG_CLEANUP_TIMEOUT"`
2931
}
3032

3133
type SchemaCmd struct{}
@@ -43,6 +45,12 @@ var logger = util.CreateLogger("cog")
4345
func (s *ServerCmd) Run() error {
4446
log := logger.Sugar()
4547

48+
// One-shot mode requires procedure mode
49+
if s.OneShot && !s.UseProcedureMode {
50+
log.Errorw("one-shot mode requires procedure mode")
51+
return fmt.Errorf("one-shot mode requires procedure mode, use --use-procedure-mode")
52+
}
53+
4654
// Procedure mode implies await explicit shutdown
4755
// i.e. Python process exit should not trigger shutdown
4856
if s.UseProcedureMode {
@@ -51,6 +59,7 @@ func (s *ServerCmd) Run() error {
5159
log.Infow("configuration",
5260
"use-procedure-mode", s.UseProcedureMode,
5361
"await-explicit-shutdown", s.AwaitExplicitShutdown,
62+
"one-shot", s.OneShot,
5463
"upload-url", s.UploadURL,
5564
)
5665

@@ -67,13 +76,18 @@ func (s *ServerCmd) Run() error {
6776
}
6877
}
6978

79+
forceShutdown := make(chan struct{}, 1)
80+
7081
serverCfg := server.Config{
7182
UseProcedureMode: s.UseProcedureMode,
7283
AwaitExplicitShutdown: s.AwaitExplicitShutdown,
84+
OneShot: s.OneShot,
7385
IPCUrl: fmt.Sprintf("http://localhost:%d/_ipc", s.Port),
7486
UploadURL: s.UploadURL,
7587
WorkingDirectory: currentWorkingDirectory,
7688
RunnerShutdownGracePeriod: s.RunnerShutdownGracePeriod,
89+
CleanupTimeout: s.CleanupTimeout,
90+
ForceShutdown: forceShutdown,
7791
}
7892
// FIXME: in non-procedure mode we do not support concurrency in a meaningful way, we
7993
// statically create the runner list sized at 1.
@@ -107,15 +121,20 @@ func (s *ServerCmd) Run() error {
107121
ch := make(chan os.Signal, 1)
108122
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
109123
for {
110-
sig := <-ch
111-
if sig == syscall.SIGTERM && s.AwaitExplicitShutdown {
112-
log.Warnw("ignoring signal to stop", "signal", sig)
113-
} else {
114-
log.Infow("stopping Cog HTTP server", "signal", sig)
115-
if err := h.Stop(); err != nil {
116-
log.Errorw("failed to stop server handler", "error", err)
117-
os.Exit(1)
124+
select {
125+
case sig := <-ch:
126+
if sig == syscall.SIGTERM && s.AwaitExplicitShutdown {
127+
log.Warnw("ignoring signal to stop", "signal", sig)
128+
} else {
129+
log.Infow("stopping Cog HTTP server", "signal", sig)
130+
if err := h.Stop(); err != nil {
131+
log.Errorw("failed to stop server handler", "error", err)
132+
os.Exit(1)
133+
}
118134
}
135+
case <-forceShutdown:
136+
log.Errorw("cleanup timeout reached, forcing ungraceful shutdown")
137+
os.Exit(1)
119138
}
120139
}
121140
}()

internal/server/runner.go

Lines changed: 107 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,32 @@ func (pr *PendingPrediction) sendResponse() {
7878
// killFunc is the function signature for killing processes
7979
type killFunc func(pid int, sig syscall.Signal) error
8080

81+
// verifyProcessGroupTerminatedFunc is the function signature for verifying process group termination
82+
type verifyProcessGroupTerminatedFunc func(pid int) bool
83+
84+
// verifyProcessGroupTerminated checks if all processes in the group have been terminated
85+
// Returns true if all processes are gone, false if any are still running
86+
func verifyProcessGroupTerminated(pid int) bool {
87+
log := logger.Sugar()
88+
89+
// Try to send signal 0 to the process group to check if any processes still exist
90+
// Signal 0 doesn't actually send a signal but checks if the process exists
91+
err := syscall.Kill(-pid, 0)
92+
if err != nil {
93+
if errors.Is(err, syscall.ESRCH) {
94+
// No such process - process group is terminated
95+
log.Debugw("process group fully terminated", "pid", pid)
96+
return true
97+
}
98+
// Other errors (like EPERM) mean processes might still exist
99+
log.Debugw("process group verification failed, assuming processes exist", "pid", pid, "error", err)
100+
return false
101+
}
102+
// No error means at least one process in the group still exists
103+
log.Debugw("process group still has running processes", "pid", pid)
104+
return false
105+
}
106+
81107
type Runner struct {
82108
name string
83109
workingDir string
@@ -93,8 +119,12 @@ type Runner struct {
93119
pending map[string]*PendingPrediction
94120
uploadURL string
95121
shutdownGracePeriod time.Duration
96-
killed bool // tracks if we've killed this process instance
97-
killFn killFunc // injectable kill function for testing
122+
cleanupTimeout time.Duration // timeout for process cleanup verification
123+
killed bool // tracks if we've killed this process instance
124+
cleanupSlot chan struct{} // buffered size 1, holds cleanup token (len()=1 means no cleanup, len()=0 means cleanup in progress)
125+
forceShutdown chan<- struct{} // signals that cleanup failed and forced shutdown is needed
126+
killFn killFunc // injectable kill function for testing
127+
verifyFn verifyProcessGroupTerminatedFunc // injectable verification function for testing
98128
mu sync.Mutex
99129
stopped chan bool
100130
}
@@ -130,7 +160,7 @@ func NewRunner(name, cwd string, cfg Config) (*Runner, error) {
130160

131161
cmd.Env = mergeEnv(os.Environ(), cfg.EnvSet, cfg.EnvUnset)
132162

133-
return &Runner{
163+
r := &Runner{
134164
name: name,
135165
workingDir: workingDir,
136166
cmd: *cmd,
@@ -139,9 +169,18 @@ func NewRunner(name, cwd string, cfg Config) (*Runner, error) {
139169
pending: make(map[string]*PendingPrediction),
140170
uploadURL: cfg.UploadURL,
141171
shutdownGracePeriod: cfg.RunnerShutdownGracePeriod,
172+
cleanupTimeout: cfg.CleanupTimeout,
142173
killFn: nil, // nil means use real syscall.Kill
174+
verifyFn: nil, // nil means use real verifyProcessGroupTerminated
175+
cleanupSlot: make(chan struct{}, 1),
176+
forceShutdown: cfg.ForceShutdown,
143177
stopped: make(chan bool),
144-
}, nil
178+
}
179+
180+
// Initialize cleanup slot with token available (no cleanup in progress)
181+
r.cleanupSlot <- struct{}{}
182+
183+
return r, nil
145184
}
146185

147186
func NewProcedureRunner(name, srcDir string, cfg Config) (*Runner, error) {
@@ -193,6 +232,16 @@ func (r *Runner) ForceKill() {
193232

194233
log.Infow("force killing process group", "pid", r.cmd.Process.Pid)
195234

235+
// Try to take cleanup token
236+
gotToken := false
237+
select {
238+
case <-r.cleanupSlot:
239+
gotToken = true
240+
log.Infow("acquired cleanup token", "pid", r.cmd.Process.Pid)
241+
default:
242+
log.Infow("cleanup already in progress, but proceeding with kill", "pid", r.cmd.Process.Pid)
243+
}
244+
196245
// Use injected kill function for testing, or real syscall.Kill
197246
killFn := r.killFn
198247
if killFn == nil {
@@ -204,10 +253,63 @@ func (r *Runner) ForceKill() {
204253
if !errors.Is(err, syscall.ESRCH) {
205254
log.Errorw("failed to kill process group", "pid", r.cmd.Process.Pid, "error", err)
206255
}
256+
// Return token only if we took it and kill failed (unless ESRCH which is OK)
257+
if gotToken && !errors.Is(err, syscall.ESRCH) {
258+
r.cleanupSlot <- struct{}{}
259+
return
260+
}
207261
}
208262

209263
// Mark as killed to prevent PID reuse issues
210264
r.killed = true
265+
266+
// Start verification of process termination in background only if we got the token
267+
if gotToken {
268+
go r.verifyProcessCleanup(r.cmd.Process.Pid)
269+
}
270+
}
271+
272+
// verifyProcessCleanup verifies that all processes in the group have been terminated
273+
// and updates the cleanup status accordingly
274+
func (r *Runner) verifyProcessCleanup(pid int) {
275+
log := logger.Sugar()
276+
const checkInterval = 10 * time.Millisecond
277+
278+
ctx, cancel := context.WithTimeout(context.Background(), r.cleanupTimeout)
279+
defer cancel()
280+
281+
start := time.Now()
282+
ticker := time.NewTicker(checkInterval)
283+
defer ticker.Stop()
284+
285+
for {
286+
select {
287+
case <-r.stopped:
288+
// Runner is being stopped, exit cleanup verification
289+
log.Debugw("cleanup verification stopped due to runner shutdown", "pid", pid, "elapsed", time.Since(start))
290+
return
291+
case <-ctx.Done():
292+
// Timeout reached - signal forced shutdown
293+
log.Errorw("process cleanup verification timed out, signaling forced shutdown",
294+
"pid", pid, "elapsed", time.Since(start))
295+
select {
296+
case r.forceShutdown <- struct{}{}:
297+
default:
298+
}
299+
return
300+
case <-ticker.C:
301+
fn := verifyProcessGroupTerminated
302+
if r.verifyFn != nil {
303+
fn = r.verifyFn
304+
}
305+
if fn(pid) {
306+
// Cleanup completed successfully, return token
307+
r.cleanupSlot <- struct{}{}
308+
log.Infow("process cleanup completed successfully, returned cleanup token", "pid", pid, "elapsed", time.Since(start))
309+
return
310+
}
311+
}
312+
}
211313
}
212314

213315
func (r *Runner) Stop() error {
@@ -501,6 +603,7 @@ func (r *Runner) HandleIPC(s IPCStatus) error {
501603
log := logger.Sugar()
502604
switch s {
503605
case IPCStatusReady:
606+
504607
if r.status == StatusStarting {
505608
r.updateSchema()
506609
r.updateSetupResult()

0 commit comments

Comments
 (0)