Skip to content

Commit 0a6d772

Browse files
committed
Allow to additionally delay a reconnect after kill
1 parent 73e5f45 commit 0a6d772

File tree

1 file changed

+44
-24
lines changed

1 file changed

+44
-24
lines changed

process/process.go

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"fmt"
1111
"io"
12+
"math/rand/v2"
1213
"os"
1314
"os/exec"
1415
"runtime"
@@ -37,8 +38,11 @@ type Process interface {
3738
Stop(wait bool) error
3839

3940
// Kill stops the process such that it will restart
40-
// automatically if it is defined to do so.
41-
Kill(wait bool, reason string) error
41+
// automatically if it is defined to do so. The reason will be written
42+
// to the report as the last line. The delayReconnect will add an
43+
// additional duration to the reconnect timeout for the next reconnect,
44+
// in case the process should reconnect.
45+
Kill(wait bool, reason string, delayReconnect time.Duration) error
4246

4347
// IsRunning returns whether the process is currently
4448
// running or not.
@@ -231,16 +235,17 @@ type process struct {
231235
timer *time.Timer
232236
lock sync.Mutex
233237
}
234-
timeout time.Duration
235-
stopTimer *time.Timer
236-
stopTimerLock sync.Mutex
237-
stopReason string
238-
stopReasonLock sync.Mutex
239-
killTimer *time.Timer
240-
killTimerLock sync.Mutex
241-
logger log.Logger
242-
debuglogger log.Logger
243-
callbacks struct {
238+
timeout time.Duration
239+
stopTimer *time.Timer
240+
stopTimerLock sync.Mutex
241+
stopReason string
242+
stopDelayReconnect time.Duration
243+
stopReasonLock sync.Mutex
244+
killTimer *time.Timer
245+
killTimerLock sync.Mutex
246+
logger log.Logger
247+
debuglogger log.Logger
248+
callbacks struct {
244249
onBeforeStart func(args []string) ([]string, error)
245250
onStart func()
246251
onExit func(state string)
@@ -339,7 +344,9 @@ func New(config Config) (Process, error) {
339344
"gpudecoder": gpudecoder,
340345
"gpumemmory": gpumemory,
341346
}).Warn().Log("Killed because limits are exceeded")
342-
p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory))
347+
348+
reason := fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory)
349+
p.Kill(false, reason, time.Duration(rand.IntN(30))*time.Second)
343350
},
344351
})
345352
if err != nil {
@@ -597,9 +604,9 @@ func (p *process) Limit(cpu, memory, gpu bool) error {
597604
}
598605

599606
p.logger.Warn().WithFields(log.Fields{
600-
"limit_cpu": cpu,
601-
"limit_memory": memory,
602-
"limit_gpumemory": gpu,
607+
"limit_cpu": cpu,
608+
"limit_memory": memory,
609+
"limit_gpu": gpu,
603610
}).Log("Limiter triggered")
604611

605612
return p.limits.Limit(cpu, memory, gpu)
@@ -718,7 +725,7 @@ func (p *process) start() error {
718725
if p.stopTimer == nil {
719726
// Only create a new timer if there isn't already one running
720727
p.stopTimer = time.AfterFunc(p.timeout, func() {
721-
p.Kill(false, fmt.Sprintf("Killed because timeout triggered (%s)", p.timeout))
728+
p.Kill(false, fmt.Sprintf("Killed because timeout triggered (%s)", p.timeout), 0)
722729

723730
p.stopTimerLock.Lock()
724731
p.stopTimer.Stop()
@@ -765,7 +772,7 @@ func (p *process) Stop(wait bool) error {
765772
return nil
766773
}
767774

768-
err := p.stop(wait, "")
775+
err := p.stop(wait, "", 0)
769776
if err != nil {
770777
p.debuglogger.WithFields(log.Fields{
771778
"state": p.getStateString(),
@@ -779,20 +786,20 @@ func (p *process) Stop(wait bool) error {
779786

780787
// Kill will stop the process without changing the order such that it
781788
// will restart automatically if enabled.
782-
func (p *process) Kill(wait bool, reason string) error {
789+
func (p *process) Kill(wait bool, reason string, delayReconnect time.Duration) error {
783790
// If the process is currently not running, we don't need
784791
// to do anything.
785792
if !p.isRunning() {
786793
return nil
787794
}
788795

789-
err := p.stop(wait, reason)
796+
err := p.stop(wait, reason, delayReconnect)
790797

791798
return err
792799
}
793800

794801
// stop will stop a process considering the current order and state.
795-
func (p *process) stop(wait bool, reason string) error {
802+
func (p *process) stop(wait bool, reason string, delayReconnect time.Duration) error {
796803
// Stop the restart timer
797804
p.unreconnect()
798805

@@ -801,7 +808,7 @@ func (p *process) stop(wait bool, reason string) error {
801808
return nil
802809
}
803810

804-
// If the process in starting state, wait until the process has been started
811+
// If the process is in starting state, wait until the process has been started
805812
start := time.Now()
806813
for {
807814
if time.Since(start) > 5*time.Second {
@@ -825,6 +832,7 @@ func (p *process) stop(wait bool, reason string) error {
825832

826833
p.stopReasonLock.Lock()
827834
p.stopReason = reason
835+
p.stopDelayReconnect = delayReconnect
828836
p.stopReasonLock.Unlock()
829837

830838
p.logger.Info().Log("Stopping")
@@ -956,7 +964,7 @@ func (p *process) staler(ctx context.Context) {
956964
d := t.Sub(last)
957965
if d.Seconds() > timeout.Seconds() {
958966
p.logger.Info().Log("Stale timeout after %s (%.2fs).", timeout, d.Seconds())
959-
p.stop(false, fmt.Sprintf("Stale timeout after %s, no output received from process", timeout))
967+
p.stop(false, fmt.Sprintf("Stale timeout after %s, no output received from process", timeout), 0)
960968
return
961969
}
962970
}
@@ -1116,7 +1124,19 @@ func (p *process) waiter() {
11161124

11171125
// Restart the process
11181126
if p.getOrder() == "start" && enableReconnect {
1119-
p.reconnect(p.delay(state))
1127+
delayReconnect := time.Duration(0)
1128+
1129+
p.stopReasonLock.Lock()
1130+
delayReconnect = p.stopDelayReconnect
1131+
p.stopDelayReconnect = time.Duration(0)
1132+
p.stopReasonLock.Unlock()
1133+
1134+
delay := p.delay(state)
1135+
if delay > time.Duration(0) {
1136+
delay += delayReconnect
1137+
}
1138+
1139+
p.reconnect(delay)
11201140
}
11211141

11221142
// Call the onExit callback

0 commit comments

Comments
 (0)