Skip to content

Commit 6f192ee

Browse files
committed
Add --with-heartbeat flag for heartbeat logging during long-running errands
Errands produce no CLI output while the Agent executes the script. This silence causes CI/CD systems to kill the process due to inactivity timeouts and leaves operators unsure whether the task is still running. Add an opt-in --with-heartbeat flag to `bosh run-errand` that prints periodic heartbeat status lines while a task is processing or queued. bosh run-errand smoke_tests --with-heartbeat bosh run-errand smoke_tests --with-heartbeat=10 Output: Task 185528 | 16:16:23 | Task state: processing (5s elapsed) No Director changes required — uses existing task API fields (state, started_at). TaskHeartbeat is part of the TaskReporter interface with a no-op default on NoopTaskReporter; withHeartbeatInterval gates whether output is actually emitted. Throttling is handled in the reporter via timestamp comparison. Made-with: Cursor
1 parent c5619f3 commit 6f192ee

File tree

13 files changed

+264
-15
lines changed

13 files changed

+264
-15
lines changed

cmd/cmd.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,19 @@ func (c Cmd) Execute() (cmdErr error) {
224224
return NewErrandsCmd(deps.UI, c.deployment()).Run()
225225

226226
case *RunErrandOpts:
227-
director, deployment := c.directorAndDeployment()
227+
sess, ok := c.session().(*SessionImpl)
228+
if !ok {
229+
return fmt.Errorf("internal error: expected *SessionImpl")
230+
}
231+
director, err := sess.Director()
232+
c.panicIfErr(err)
233+
deployment, err := sess.Deployment()
234+
c.panicIfErr(err)
235+
236+
if opts.WithHeartbeat != nil && sess.taskReporter != nil {
237+
sess.taskReporter.EnableWithHeartbeat(time.Duration(*opts.WithHeartbeat) * time.Second)
238+
}
239+
228240
downloader := NewUIDownloader(director, deps.Time, deps.FS, deps.UI)
229241
return NewRunErrandCmd(deployment, downloader, deps.UI).Run(*opts)
230242

@@ -546,7 +558,10 @@ func (c Cmd) config() cmdconf.Config {
546558
}
547559

548560
func (c Cmd) session() Session {
549-
return NewSessionFromOpts(c.BoshOpts, c.config(), c.deps.UI, true, true, c.deps.FS, c.deps.Logger)
561+
return NewSessionImpl(
562+
NewSessionContextImpl(c.BoshOpts, c.config(), c.deps.FS),
563+
c.deps.UI, true, true, c.deps.Logger,
564+
)
550565
}
551566

552567
func (c Cmd) director() boshdir.Director {

cmd/opts/opts.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,8 @@ type RunErrandOpts struct {
713713
KeepAlive bool `long:"keep-alive" description:"Use existing VM to run an errand and keep it after completion"`
714714
WhenChanged bool `long:"when-changed" description:"Run errand only if errand configuration has changed or if the previous run was unsuccessful"`
715715

716+
WithHeartbeat *int `long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"`
717+
716718
DownloadLogs bool `long:"download-logs" description:"Download logs"`
717719
LogsDirectory DirOrCWDArg `long:"logs-dir" description:"Destination directory for logs" default:"."`
718720

cmd/opts/opts_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,6 +2150,14 @@ var _ = Describe("Opts", func() {
21502150
})
21512151
})
21522152

2153+
Describe("WithHeartbeat", func() {
2154+
It("contains desired values", func() {
2155+
Expect(getStructTagForName("WithHeartbeat", opts)).To(Equal(
2156+
`long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"`,
2157+
))
2158+
})
2159+
})
2160+
21532161
Describe("DownloadLogs", func() {
21542162
It("contains desired values", func() {
21552163
Expect(getStructTagForName("DownloadLogs", opts)).To(Equal(

cmd/session.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type SessionImpl struct {
2222

2323
// Memoized
2424
director boshdir.Director
25+
taskReporter *boshuit.ReporterImpl
2526
directorInfo boshdir.Info
2627
directorInfoSet bool
2728
}
@@ -118,10 +119,10 @@ func (c *SessionImpl) Director() (boshdir.Director, error) {
118119
c.ui.PrintLinef("Using environment '%s' as %s", c.Environment(), creds.Description())
119120
}
120121

121-
taskReporter := boshuit.NewReporter(c.ui, true)
122+
c.taskReporter = boshuit.NewReporter(c.ui, true)
122123
fileReporter := boshui.NewFileReporter(c.ui)
123124

124-
director, err := boshdir.NewFactory(c.logger).New(dirConfig, taskReporter, fileReporter)
125+
director, err := boshdir.NewFactory(c.logger).New(dirConfig, c.taskReporter, fileReporter)
125126
if err != nil {
126127
return nil, err
127128
}

director/directorfakes/fake_task_reporter.go

Lines changed: 41 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

director/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ type TaskReporter interface {
304304
TaskStarted(int)
305305
TaskFinished(int, string)
306306
TaskOutputChunk(int, []byte)
307+
TaskHeartbeat(id int, state string, startedAt int64)
307308
}
308309

309310
//counterfeiter:generate . OrphanDisk

director/noop_reporters.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func NewNoopTaskReporter() NoopTaskReporter {
4343
return NoopTaskReporter{}
4444
}
4545

46-
func (r NoopTaskReporter) TaskStarted(id int) {}
47-
func (r NoopTaskReporter) TaskFinished(id int, state string) {}
48-
func (r NoopTaskReporter) TaskOutputChunk(id int, chunk []byte) {}
46+
func (r NoopTaskReporter) TaskStarted(id int) {}
47+
func (r NoopTaskReporter) TaskFinished(id int, state string) {}
48+
func (r NoopTaskReporter) TaskOutputChunk(id int, chunk []byte) {}
49+
func (r NoopTaskReporter) TaskHeartbeat(id int, state string, startedAt int64) {}

director/task_client_request.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ func NewTaskClientRequest(
2727
}
2828

2929
type taskShortResp struct {
30-
ID int // 165
31-
State string // e.g. "queued", "processing", "done", "error", "cancelled"
30+
ID int `json:"id"`
31+
State string `json:"state"` // e.g. "queued", "processing", "done", "error", "cancelled"
32+
StartedAt int64 `json:"started_at"` // 1440318199
3233
}
3334

3435
func (r taskShortResp) IsRunning() bool {
@@ -111,6 +112,7 @@ func (r TaskClientRequest) WaitForCompletion(id int, type_ string, taskReporter
111112
}
112113

113114
if taskResp.IsRunning() {
115+
taskReporter.TaskHeartbeat(taskResp.ID, taskResp.State, taskResp.StartedAt)
114116
time.Sleep(r.taskCheckStepDuration)
115117
continue
116118
}

director/task_client_request_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,61 @@ var _ = Describe("TaskClientRequest", func() {
246246
})
247247
})
248248

249+
Describe("WaitForCompletion heartbeat", func() {
250+
It("emits a heartbeat for a processing task", func() {
251+
hbReporter := &fakedir.FakeTaskReporter{}
252+
hbReq := buildReq(hbReporter)
253+
254+
server.AppendHandlers(
255+
ghttp.CombineHandlers(
256+
ghttp.VerifyRequest("GET", "/tasks/42"),
257+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"processing","description":"run errand 'smoke'","started_at":1700000000}`),
258+
),
259+
ghttp.CombineHandlers(
260+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
261+
ghttp.RespondWith(http.StatusOK, ""),
262+
),
263+
ghttp.CombineHandlers(
264+
ghttp.VerifyRequest("GET", "/tasks/42"),
265+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`),
266+
),
267+
ghttp.CombineHandlers(
268+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
269+
ghttp.RespondWith(http.StatusOK, ""),
270+
),
271+
)
272+
273+
err := hbReq.WaitForCompletion(42, "event", hbReporter)
274+
Expect(err).ToNot(HaveOccurred())
275+
276+
Expect(hbReporter.TaskHeartbeatCallCount()).To(BeNumerically(">=", 1))
277+
id, state, startedAt := hbReporter.TaskHeartbeatArgsForCall(0)
278+
Expect(id).To(Equal(42))
279+
Expect(state).To(Equal("processing"))
280+
Expect(startedAt).To(Equal(int64(1700000000)))
281+
})
282+
283+
It("does not emit heartbeats for tasks that immediately finish", func() {
284+
hbReporter := &fakedir.FakeTaskReporter{}
285+
hbReq := buildReq(hbReporter)
286+
287+
server.AppendHandlers(
288+
ghttp.CombineHandlers(
289+
ghttp.VerifyRequest("GET", "/tasks/42"),
290+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`),
291+
),
292+
ghttp.CombineHandlers(
293+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
294+
ghttp.RespondWith(http.StatusOK, ""),
295+
),
296+
)
297+
298+
err := hbReq.WaitForCompletion(42, "event", hbReporter)
299+
Expect(err).ToNot(HaveOccurred())
300+
Expect(hbReporter.TaskHeartbeatCallCount()).To(Equal(0))
301+
})
302+
})
303+
249304
Describe("WaitForCompletion", func() {
250305
var (
251306
taskReporter *fakedir.FakeTaskReporter

ui/task/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ type Reporter interface {
44
TaskStarted(int)
55
TaskFinished(int, string)
66
TaskOutputChunk(int, []byte)
7+
TaskHeartbeat(id int, state string, startedAt int64)
78
}
89

910
type Task interface {

0 commit comments

Comments
 (0)