Skip to content

Commit 4ed02ba

Browse files
committed
Add statusCode to WriteJobStatus and status stream responses
Wire Job.StatusCode through the status streaming path so plugins can forward cluster-specific status codes (e.g. Kubernetes pod phase) to the Launcher, matching C++ plugin behavior.
1 parent ef7d0f2 commit 4ed02ba

File tree

7 files changed

+89
-27
lines changed

7 files changed

+89
-27
lines changed

api/types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,9 @@ type Job struct {
401401
StatusMsg string `json:"statusMessage,omitempty"`
402402

403403
// The standard code/enum for the current status of the Job, if known.
404-
// Optional.
404+
// Optional. Carried in job state responses as part of the Job payload,
405+
// and forwarded explicitly in job status stream responses via
406+
// [launcher.StreamResponseWriter.WriteJobStatus].
405407
StatusCode string `json:"statusCode,omitempty"`
406408

407409
// The process ID of the Job, if applicable. Optional.

cache/cache.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons
249249
done := false
250250
err := r.Lookup(user, id, func(job *api.Job) {
251251
//nolint:errcheck // fire-and-forget convenience wrapper
252-
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusMsg)
252+
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg)
253253
// Break off early if we know there will be no further updates.
254254
if api.TerminalStatus(job.Status) {
255255
done = true
@@ -275,7 +275,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons
275275
return
276276
}
277277
//nolint:errcheck // fire-and-forget convenience wrapper
278-
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusMsg)
278+
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg)
279279
}
280280
}
281281
}
@@ -286,7 +286,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo
286286
r.store.JobsForUser(user, nil, func(jobs []*api.Job) {
287287
for _, job := range jobs {
288288
//nolint:errcheck // fire-and-forget convenience wrapper
289-
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusMsg)
289+
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg)
290290
}
291291
})
292292
ch := make(chan *statusUpdate, 1)
@@ -300,7 +300,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo
300300
return
301301
}
302302
//nolint:errcheck // fire-and-forget convenience wrapper
303-
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusMsg)
303+
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg)
304304
}
305305
}
306306
}
@@ -447,16 +447,17 @@ func (s *subManager) Close() int {
447447
}
448448

449449
type statusUpdate struct {
450-
ID api.JobID
451-
User string
452-
Name string
453-
Status string
454-
StatusMsg string
450+
ID api.JobID
451+
User string
452+
Name string
453+
Status string
454+
StatusCode string
455+
StatusMsg string
455456
}
456457

457458
func newStatusUpdateFromJob(job *api.Job) *statusUpdate {
458459
return &statusUpdate{
459-
job.ID, job.User, job.Name, job.Status, job.StatusMsg,
460+
job.ID, job.User, job.Name, job.Status, job.StatusCode, job.StatusMsg,
460461
}
461462
}
462463

internal/protocol/rpc.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,19 +289,20 @@ type JobStatusStreamResponse struct {
289289
ID api.JobID `json:"id"`
290290
Name string `json:"name"`
291291
Status string `json:"status"`
292-
Msg string `json:"statusMessage,omitempty"`
293292
Code string `json:"statusCode,omitempty"`
293+
Msg string `json:"statusMessage,omitempty"`
294294
}
295295

296296
// NewJobStatusStreamResponse creates a new job status stream response.
297-
func NewJobStatusStreamResponse(responseID uint64, id, name, status, msg string) *JobStatusStreamResponse {
297+
func NewJobStatusStreamResponse(responseID uint64, id, name, status, statusCode, msg string) *JobStatusStreamResponse {
298298
base := responseBase{responseJobStatus, 0, responseID}
299299
return &JobStatusStreamResponse{
300300
responseBase: base,
301301
Sequences: []StreamSequence{}, // Ensure we never send null.
302302
ID: api.JobID(id),
303303
Name: name,
304304
Status: status,
305+
Code: statusCode,
305306
Msg: msg,
306307
}
307308
}

internal/protocol/rpc_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,62 @@ func TestNewMetricsResponse_WithLatency(t *testing.T) {
142142
}
143143
}
144144

145+
func TestNewJobStatusStreamResponse(t *testing.T) {
146+
t.Run("all fields present", func(t *testing.T) {
147+
resp := NewJobStatusStreamResponse(5, "job-1", "My Job", "Running", "PodRunning", "all good")
148+
149+
data, err := json.Marshal(resp)
150+
if err != nil {
151+
t.Fatalf("json.Marshal() error = %v", err)
152+
}
153+
154+
var got map[string]interface{}
155+
if err := json.Unmarshal(data, &got); err != nil {
156+
t.Fatalf("json.Unmarshal() error = %v", err)
157+
}
158+
159+
if mt := int(got["messageType"].(float64)); mt != 3 {
160+
t.Errorf("messageType = %d, want 3", mt)
161+
}
162+
if id := got["id"].(string); id != "job-1" {
163+
t.Errorf("id = %q, want %q", id, "job-1")
164+
}
165+
if name := got["name"].(string); name != "My Job" {
166+
t.Errorf("name = %q, want %q", name, "My Job")
167+
}
168+
if status := got["status"].(string); status != "Running" {
169+
t.Errorf("status = %q, want %q", status, "Running")
170+
}
171+
if code := got["statusCode"].(string); code != "PodRunning" {
172+
t.Errorf("statusCode = %q, want %q", code, "PodRunning")
173+
}
174+
if msg := got["statusMessage"].(string); msg != "all good" {
175+
t.Errorf("statusMessage = %q, want %q", msg, "all good")
176+
}
177+
})
178+
179+
t.Run("statusCode omitted when empty", func(t *testing.T) {
180+
resp := NewJobStatusStreamResponse(5, "job-1", "My Job", "Running", "", "")
181+
182+
data, err := json.Marshal(resp)
183+
if err != nil {
184+
t.Fatalf("json.Marshal() error = %v", err)
185+
}
186+
187+
var got map[string]interface{}
188+
if err := json.Unmarshal(data, &got); err != nil {
189+
t.Fatalf("json.Unmarshal() error = %v", err)
190+
}
191+
192+
if _, ok := got["statusCode"]; ok {
193+
t.Error("statusCode should be omitted when empty")
194+
}
195+
if _, ok := got["statusMessage"]; ok {
196+
t.Error("statusMessage should be omitted when empty")
197+
}
198+
})
199+
}
200+
145201
func TestNewConfigReloadResponse_ErrorTypes(t *testing.T) {
146202
tests := []struct {
147203
name string

launcher/launcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ type ResponseWriter interface {
296296
// StreamResponseWriter is the interface for writing streaming responses.
297297
type StreamResponseWriter interface {
298298
ResponseWriter
299-
WriteJobStatus(id api.JobID, name, status, msg string) error
299+
WriteJobStatus(id api.JobID, name, status, statusCode, msg string) error
300300
WriteJobOutput(output string, outputType api.JobOutput) error
301301
WriteJobResourceUtil(cpuPercent float64, cpuTime float64,
302302
residentMem float64, virtualMem float64) error
@@ -610,13 +610,13 @@ func (w *defaultResponseWriter) WriteJobs(jobs []*api.Job) error {
610610
// non-stream response writer.
611611
var errNotStreamWriter = fmt.Errorf("method called on non-stream response writer")
612612

613-
func (w *defaultResponseWriter) WriteJobStatus(id api.JobID, name, status, msg string) error {
613+
func (w *defaultResponseWriter) WriteJobStatus(id api.JobID, name, status, statusCode, msg string) error {
614614
if w.store == nil {
615615
return errNotStreamWriter
616616
}
617617
rid := w.req.ID()
618618
resp := protocol.NewJobStatusStreamResponse(nextResponseID(), string(id),
619-
name, status, msg)
619+
name, status, statusCode, msg)
620620
resp.Sequences = []protocol.StreamSequence{
621621
{RequestID: rid, SequenceID: w.store.SequenceID(rid)},
622622
}

plugintest/example_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ func TestMockStreamResponseWriter(t *testing.T) {
176176
t.Run("captures status updates", func(t *testing.T) {
177177
w := plugintest.NewMockStreamResponseWriter()
178178

179-
w.WriteJobStatus("job-1", "My Job", api.StatusRunning, "Job started")
180-
w.WriteJobStatus("job-1", "My Job", api.StatusFinished, "Job completed")
179+
w.WriteJobStatus("job-1", "My Job", api.StatusRunning, "", "Job started")
180+
w.WriteJobStatus("job-1", "My Job", api.StatusFinished, "", "Job completed")
181181

182182
plugintest.AssertStatusCount(t, w, 2)
183183

plugintest/mocks.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,11 @@ type MockStreamResponseWriter struct {
212212

213213
// StatusUpdate represents a job status update.
214214
type StatusUpdate struct {
215-
ID api.JobID
216-
Name string
217-
Status string
218-
Message string
215+
ID api.JobID
216+
Name string
217+
Status string
218+
StatusCode string
219+
Message string
219220
}
220221

221222
// OutputChunk represents a chunk of job output.
@@ -249,14 +250,15 @@ func NewMockStreamResponseWriter() *MockStreamResponseWriter {
249250
}
250251

251252
// WriteJobStatus implements launcher.StreamResponseWriter.
252-
func (m *MockStreamResponseWriter) WriteJobStatus(id api.JobID, name, status, msg string) error {
253+
func (m *MockStreamResponseWriter) WriteJobStatus(id api.JobID, name, status, statusCode, msg string) error {
253254
m.mu.Lock()
254255
defer m.mu.Unlock()
255256
m.Statuses = append(m.Statuses, StatusUpdate{
256-
ID: id,
257-
Name: name,
258-
Status: status,
259-
Message: msg,
257+
ID: id,
258+
Name: name,
259+
Status: status,
260+
StatusCode: statusCode,
261+
Message: msg,
260262
})
261263
return nil
262264
}

0 commit comments

Comments
 (0)