Skip to content

Commit a1650d6

Browse files
committed
Simplify supervisor, support agent-triggered broker restart
1 parent 7caded7 commit a1650d6

File tree

7 files changed

+145
-80
lines changed

7 files changed

+145
-80
lines changed

agent/cmd/relay.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ var RelayCommand = &cobra.Command{
6666
}
6767

6868
fmt.Println("Starting agent")
69-
startAgent(cmd, buildRelayStack(cmd, config, info))
69+
startAgent(buildRelayStack(cmd, config, info))
7070
},
7171
}
7272

agent/cmd/serve.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ var serveCmd = &cobra.Command{
3535
}
3636

3737
config.Print()
38-
startAgent(cmd, buildServeStack(cmd, config))
38+
startAgent(buildServeStack(cmd, config))
3939
fmt.Println("Server stopped")
4040
},
4141
}

agent/cmd/stack.go

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,11 @@ import (
1515
"github.com/spf13/cobra"
1616
"go.uber.org/fx"
1717
"go.uber.org/zap"
18-
"go.uber.org/zap/zapcore"
1918
)
2019

21-
func getLoggingLevel(cmd *cobra.Command) zapcore.Level {
22-
23-
if ok, _ := cmd.Flags().GetBool("verbose"); ok {
24-
return zap.DebugLevel
25-
}
26-
return zap.InfoLevel
27-
}
28-
29-
func startAgent(cmd *cobra.Command, opts fx.Option) {
30-
options := []fx.Option{
31-
opts,
32-
}
33-
34-
if getLoggingLevel(cmd) != zap.DebugLevel {
35-
options = append(options, fx.NopLogger)
36-
}
37-
20+
func startAgent(opts fx.Option) {
3821
app := fx.New(
39-
options...,
22+
opts,
4023
)
4124

4225
noBanner := os.Getenv("NO_BANNER")
@@ -47,17 +30,34 @@ func startAgent(cmd *cobra.Command, opts fx.Option) {
4730
}
4831

4932
func buildCoreAgentStack(cmd *cobra.Command, cfg config.AgentConfig) fx.Option {
50-
return fx.Options(
51-
fx.Provide(func() *zap.Logger {
33+
34+
if ok, _ := cmd.Flags().GetBool("verbose"); ok {
35+
cfg.VerboseOutput = true
36+
}
37+
38+
options := []fx.Option{}
39+
40+
if !cfg.VerboseOutput {
41+
options = append(options, fx.NopLogger)
42+
}
43+
44+
stackOptions := fx.Options(
45+
fx.Supply(cfg),
46+
fx.Provide(func(config config.AgentConfig) *zap.Logger {
5247
cfg := zap.NewDevelopmentConfig()
53-
cfg.Level = zap.NewAtomicLevelAt(getLoggingLevel(cmd))
48+
49+
loggingLevel := zap.InfoLevel
50+
if config.VerboseOutput {
51+
loggingLevel = zap.DebugLevel
52+
}
53+
54+
cfg.Level = zap.NewAtomicLevelAt(loggingLevel)
5455
logger, err := cfg.Build()
5556
if err != nil {
5657
panic(err)
5758
}
5859
return logger
5960
}),
60-
fx.Supply(cfg),
6161
fx.Invoke(func(config config.AgentConfig, logger *zap.Logger) {
6262
if config.CortexApiToken == "" && !config.DryRun {
6363
logger.Fatal("Cannot start agent: either CORTEX_API_TOKEN or DRYRUN is required")
@@ -67,6 +67,13 @@ func buildCoreAgentStack(cmd *cobra.Command, cfg config.AgentConfig) fx.Option {
6767
fx.Provide(handler.NewHandlerManager),
6868
fx.Invoke(createAxonAgent),
6969
)
70+
71+
options = append(options, stackOptions)
72+
73+
return fx.Options(
74+
options...,
75+
)
76+
7077
}
7178

7279
func createAxonAgent(

agent/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type AgentConfig struct {
2828
WebhookServerPort int
2929
EnableApiProxy bool
3030
FailWaitTime time.Duration
31+
VerboseOutput bool
3132
}
3233

3334
func (ac AgentConfig) Print() {

agent/server/snykbroker/relay_instance_manager.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const defaultSnykBroker = "snyk-broker"
2121

2222
type RelayInstanceManager interface {
2323
Start() error
24+
Restart() error
2425
Close() error
2526
}
2627

@@ -63,7 +64,7 @@ func NewRelayInstanceManager(
6364
}
6465

6566
func (r *relayInstanceManager) RegisterRoutes(mux *http.ServeMux) error {
66-
mux.Handle(fmt.Sprintf("%s/reregister", cortexHttp.AxonPathRoot), r)
67+
mux.Handle(fmt.Sprintf("%s/broker/restart", cortexHttp.AxonPathRoot), r)
6768
return nil
6869
}
6970

@@ -73,7 +74,7 @@ func (r *relayInstanceManager) ServeHTTP(w http.ResponseWriter, req *http.Reques
7374
return
7475
}
7576

76-
err := r.restart()
77+
err := r.Restart()
7778
if err != nil {
7879
r.logger.Error("Unable to reregister", zap.Error(err))
7980
w.WriteHeader(http.StatusInternalServerError)
@@ -85,16 +86,19 @@ func (r *relayInstanceManager) ServeHTTP(w http.ResponseWriter, req *http.Reques
8586

8687
var errSkipBroker = errors.New("NoBrokerToken")
8788

88-
func (r *relayInstanceManager) restart() error {
89+
func (r *relayInstanceManager) Restart() error {
90+
91+
r.logger.Info("Restarting broker, shutting down existing broker")
8992
// re-register and restart supervisor
9093
err := r.Close()
9194
if err != nil {
92-
r.logger.Error("unable to close supervisor on /reregister", zap.Error(err))
95+
r.logger.Error("unable to close supervisor on Restart", zap.Error(err))
9396
}
9497

98+
r.logger.Info("Restarting broker")
9599
err = r.Start()
96100
if err != nil {
97-
return fmt.Errorf("unable to start supervisor on /reregister: %w", err)
101+
return fmt.Errorf("unable to start supervisor on Restart: %w", err)
98102
}
99103
return nil
100104
}
@@ -196,15 +200,21 @@ func (r *relayInstanceManager) Start() error {
196200
zap.String("acceptFile", acceptFile),
197201
)
198202

203+
brokerEnv := map[string]string{
204+
"ACCEPT": acceptFile,
205+
"BROKER_SERVER_URL": uri,
206+
"BROKER_TOKEN": token,
207+
"PORT": "7343",
208+
}
209+
210+
if r.config.VerboseOutput {
211+
brokerEnv["LOG_LEVEL"] = "debug"
212+
}
213+
199214
r.supervisor = NewSupervisor(
200215
executable,
201216
args,
202-
map[string]string{
203-
"ACCEPT": acceptFile,
204-
"BROKER_SERVER_URL": uri,
205-
"BROKER_TOKEN": token,
206-
"PORT": "7343",
207-
},
217+
brokerEnv,
208218
r.config.FailWaitTime,
209219
)
210220
r.startCount.Add(1)

agent/server/snykbroker/supervisor.go

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ type Supervisor struct {
2727
cmd *exec.Cmd
2828
env map[string]string
2929
done chan struct{}
30+
stopFunc func()
31+
pid int
32+
killed bool
33+
runCount int
3034
lastError error
3135
}
3236

@@ -43,32 +47,19 @@ func NewSupervisor(
4347
args: args,
4448
env: env,
4549
fastFailTime: fastFailTime * 2,
50+
done: make(chan struct{}),
51+
stopFunc: func() {},
4652
}
4753
}
4854

4955
var errKilled = errors.New("killed")
5056
var errMaxRetries = errors.New("max retries reached")
5157

52-
func (b *Supervisor) trigger() (func(error), error) {
53-
b.Lock()
54-
if b.done != nil {
55-
b.Unlock()
56-
return nil, fmt.Errorf("can't call start when already running")
57-
}
58-
b.done = make(chan struct{})
59-
b.Unlock()
58+
func (b *Supervisor) Start(maxRetries int, window time.Duration) error {
6059

61-
finish := func(err error) {
62-
b.Lock()
63-
defer b.Unlock()
64-
b.lastError = err
65-
close(b.done)
66-
b.done = nil
60+
if b.runCount > 0 {
61+
return errors.New("already started, cannot start again")
6762
}
68-
return finish, nil
69-
}
70-
71-
func (b *Supervisor) Start(maxRetries int, window time.Duration) error {
7263

7364
if err := b.runExecutionLoop(maxRetries, window); err != nil {
7465
return err
@@ -80,32 +71,34 @@ func (b *Supervisor) runExecutionLoop(maxRetries int, window time.Duration) erro
8071

8172
tracker := newEventTracker()
8273
startTime := time.Now()
83-
runCount := 0
8474

85-
finish, err := b.trigger()
86-
if err != nil {
87-
return err
75+
finish := func(err error) {
76+
b.Lock()
77+
defer b.Unlock()
78+
b.lastError = err
79+
if b.done != nil {
80+
close(b.done)
81+
}
8882
}
8983

9084
fastfail := make(chan struct{})
9185
// we run this off thread, looping to restart
9286
// the process if it crashes, but exiting
93-
// if too many happen in the restart windo
87+
// if too many happen in the restart window
88+
b.runCount = 1
9489
go func() {
9590
defer close(fastfail)
9691
for maxRetries > 0 {
9792
tracker.AddEvent()
98-
runCount++
9993
err := b.runCommand()
10094
runTime := time.Since(startTime)
10195

10296
if errors.Is(err, errKilled) {
103-
fmt.Println("Process killed")
10497
finish(nil)
10598
return
10699
}
107100

108-
if err != nil && runCount == 1 && runTime < b.fastFailTime {
101+
if err != nil && b.runCount == 1 && runTime < b.fastFailTime {
109102
finish(fmt.Errorf("run failed immediately: %v", err))
110103
return
111104
}
@@ -123,7 +116,7 @@ func (b *Supervisor) runExecutionLoop(maxRetries int, window time.Duration) erro
123116
}
124117
return
125118
}
126-
119+
b.runCount++
127120
}
128121
}()
129122

@@ -170,12 +163,10 @@ func (b *Supervisor) runWatchdog(pid int) func() {
170163
}
171164
}
172165

166+
// runCommand runs a single command and returns the result of the command after it exits,
167+
// or errKilled if it was intentionally shut down.
173168
func (b *Supervisor) runCommand() error {
174169

175-
if b.cmd != nil {
176-
panic("Command already running")
177-
}
178-
179170
cmd := exec.Command(b.executable)
180171
cmd.Args = append(cmd.Args, b.args...)
181172

@@ -199,38 +190,67 @@ func (b *Supervisor) runCommand() error {
199190
}
200191
}()
201192

193+
// sigChan triggers shutdown
202194
sigChan := make(chan os.Signal, 1)
203195
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
204-
killed := false
196+
197+
running := make(chan struct{})
198+
205199
go func() {
206200
<-sigChan
207-
err := cmd.Process.Kill()
208-
if err != nil {
209-
fmt.Printf("Error killing process: %v\n", err)
210-
}
211-
killed = true
201+
b.stopFunc()
212202
}()
213203

214-
b.cmd = cmd
204+
killed := false
205+
206+
// our stopfunc allows anyone to close the process
207+
// and wait for it to finish
208+
b.stopFunc = func() {
209+
killed = true
210+
211+
// if process is running trigger a kill
212+
if cmd.Process != nil {
213+
err := cmd.Process.Kill()
214+
if err != nil {
215+
fmt.Printf("Error killing process: %v\n", err)
216+
}
217+
}
218+
219+
// wait for it to actually finish
220+
<-running
221+
}
222+
215223
err := cmd.Start()
216224

217225
if err != nil {
218226
return err
219227
}
228+
pid := cmd.Process.Pid
229+
b.pid = pid
220230

221231
// We want to make sure the broker is killed if the agent dies or is killed. This is
222232
// mostly useful in the debugger but prevents port from being held open.
223233
cancelWatchdog := b.runWatchdog(cmd.Process.Pid)
224234
defer cancelWatchdog()
225235

226-
cmd.Wait()
227-
b.cmd = nil
236+
// here we wait for it to start then fully finish
237+
go func() {
238+
cmd.Wait()
239+
b.pid = 0
240+
b.stopFunc = func() {}
241+
close(running)
242+
running = nil
243+
}()
244+
245+
// block until the process is done
246+
<-running
228247
stopStdOut()
229248
stopStdErr()
230249
wg.Wait()
231250

232251
if killed {
233252
err = errKilled
253+
fmt.Printf("Process %v (pid=%v) killed\n", cmd.Path, pid)
234254
}
235255

236256
if err == nil && cmd.ProcessState.ExitCode() != 0 {
@@ -239,11 +259,12 @@ func (b *Supervisor) runCommand() error {
239259
return err
240260
}
241261

262+
func (b *Supervisor) Pid() int {
263+
return b.pid
264+
}
265+
242266
func (b *Supervisor) Close() error {
243-
if b.cmd != nil && b.cmd.Process != nil {
244-
b.cmd.Process.Kill()
245-
}
246-
b.cmd = nil
267+
b.stopFunc()
247268
return nil
248269
}
249270

0 commit comments

Comments
 (0)