Skip to content

Commit 0cfaa35

Browse files
authored
Refactor job run monitoring to use struct with member methods (#3774)
## Changes Replaced callback builder functions that returned closures capturing stack pointers with a `jobRunMonitor` struct and a single `onProgress` method. The original code had three separate callback builders (`pullRunIdCallback`, `logDebugCallback`, `logProgressCallback`) that each returned closures with their own state, leading to pointer manipulation on the stack and redundant state tracking. Benefits: - Single source of truth for run state (`prevState` tracked in one place) - Eliminated stack pointer manipulation - Removed redundant logging (lifecycle state was logged twice) ## Why I was looking into `libs/cmdio` and how we can refactor it. I suspect we need to remove or refactor the progress logger to make further abstraction possible (eg. turn it into a "widget" that is started and stopped, like a spinner). Refactoring it is not possible without first changing the only call site, which is this one. ## Tests Tests pass. Manually performed a job run to
1 parent 1ec323e commit 0cfaa35

File tree

1 file changed

+41
-69
lines changed

1 file changed

+41
-69
lines changed

bundle/run/job.go

Lines changed: 41 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -86,66 +86,47 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) {
8686
}
8787
}
8888

89-
func pullRunIdCallback(runId *int64) func(info *jobs.Run) {
90-
return func(i *jobs.Run) {
91-
if *runId == 0 {
92-
*runId = i.RunId
93-
}
94-
}
89+
// jobRunMonitor tracks state for a single job run and provides callbacks
90+
// for monitoring progress.
91+
type jobRunMonitor struct {
92+
ctx context.Context
93+
prevState *jobs.RunState
94+
progressLogger *cmdio.Logger
9595
}
9696

97-
func logDebugCallback(ctx context.Context, runId *int64) func(info *jobs.Run) {
98-
var prevState *jobs.RunState
99-
return func(i *jobs.Run) {
100-
state := i.State
101-
if state == nil {
102-
return
103-
}
104-
105-
// Log the job run URL as soon as it is available.
106-
if prevState == nil {
107-
log.Infof(ctx, "Run available at %s", i.RunPageUrl)
108-
}
109-
if prevState == nil || prevState.LifeCycleState != state.LifeCycleState {
110-
log.Infof(ctx, "Run status: %s", i.State.LifeCycleState)
111-
prevState = state
112-
}
97+
// onProgress is the single callback that handles all state tracking and logging.
98+
func (m *jobRunMonitor) onProgress(info *jobs.Run) {
99+
state := info.State
100+
if state == nil {
101+
return
113102
}
114-
}
115-
116-
func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func(info *jobs.Run) {
117-
var prevState *jobs.RunState
118-
return func(i *jobs.Run) {
119-
state := i.State
120-
if state == nil {
121-
return
122-
}
123-
124-
if prevState == nil {
125-
progressLogger.Log(progress.NewJobRunUrlEvent(i.RunPageUrl))
126-
}
127103

128-
if prevState != nil && prevState.LifeCycleState == state.LifeCycleState &&
129-
prevState.ResultState == state.ResultState {
130-
return
131-
} else {
132-
prevState = state
133-
}
104+
// First time we see this run.
105+
if m.prevState == nil {
106+
log.Infof(m.ctx, "Run available at %s", info.RunPageUrl)
107+
m.progressLogger.Log(progress.NewJobRunUrlEvent(info.RunPageUrl))
108+
}
134109

135-
event := &progress.JobProgressEvent{
136-
Timestamp: time.Now(),
137-
JobId: i.JobId,
138-
RunId: i.RunId,
139-
RunName: i.RunName,
140-
State: *i.State,
141-
}
110+
// No state change: do not log.
111+
if m.prevState != nil &&
112+
m.prevState.LifeCycleState == state.LifeCycleState &&
113+
m.prevState.ResultState == state.ResultState {
114+
return
115+
}
142116

143-
// log progress events to stderr
144-
progressLogger.Log(event)
117+
// Capture current state as previous state for next call.
118+
m.prevState = state
145119

146-
// log progress events in using the default logger
147-
log.Info(ctx, event.String())
120+
// Log progress event both to the terminal (in place or append), and to the logger.
121+
event := &progress.JobProgressEvent{
122+
Timestamp: time.Now(),
123+
JobId: info.JobId,
124+
RunId: info.RunId,
125+
RunName: info.RunName,
126+
State: *info.State,
148127
}
128+
m.progressLogger.Log(event)
129+
log.Info(m.ctx, event.String())
149130
}
150131

151132
func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) {
@@ -154,8 +135,6 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
154135
return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID)
155136
}
156137

157-
runId := new(int64)
158-
159138
err = r.convertPythonParams(opts)
160139
if err != nil {
161140
return nil, err
@@ -172,19 +151,16 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
172151

173152
w := r.bundle.WorkspaceClient()
174153

175-
// gets the run id from inside Jobs.RunNowAndWait
176-
pullRunId := pullRunIdCallback(runId)
177-
178-
// callback to log status updates to the universal log destination.
179-
// Called on every poll request
180-
logDebug := logDebugCallback(ctx, runId)
181-
182154
// callback to log progress events. Called on every poll request
183155
progressLogger, ok := cmdio.FromContext(ctx)
184156
if !ok {
185157
return nil, errors.New("no progress logger found")
186158
}
187-
logProgress := logProgressCallback(ctx, progressLogger)
159+
160+
monitor := &jobRunMonitor{
161+
ctx: ctx,
162+
progressLogger: progressLogger,
163+
}
188164

189165
waiter, err := w.Jobs.RunNow(ctx, *req)
190166
if err != nil {
@@ -199,13 +175,9 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
199175
return nil, err
200176
}
201177

202-
run, err := waiter.OnProgress(func(r *jobs.Run) {
203-
pullRunId(r)
204-
logDebug(r)
205-
logProgress(r)
206-
}).GetWithTimeout(jobRunTimeout)
178+
run, err := waiter.OnProgress(monitor.onProgress).GetWithTimeout(jobRunTimeout)
207179
if err != nil {
208-
r.logFailedTasks(ctx, *runId)
180+
r.logFailedTasks(ctx, waiter.RunId)
209181
}
210182
if err != nil {
211183
return nil, err
@@ -229,7 +201,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
229201
// The task completed successfully.
230202
case jobs.RunResultStateSuccess:
231203
log.Infof(ctx, "Run has completed successfully!")
232-
return output.GetJobOutput(ctx, r.bundle.WorkspaceClient(), *runId)
204+
return output.GetJobOutput(ctx, r.bundle.WorkspaceClient(), waiter.RunId)
233205

234206
// The run was stopped after reaching the timeout.
235207
case jobs.RunResultStateTimedout:

0 commit comments

Comments
 (0)