Skip to content

Commit fc759fd

Browse files
committed
jobs: avoid duplicate system.jobs updates
This avoids cases where we issue 2 updates to system.jobs instead of 1. Epic: none Release note: None
1 parent 1370142 commit fc759fd

File tree

1 file changed

+42
-74
lines changed

1 file changed

+42
-74
lines changed

pkg/jobs/update.go

Lines changed: 42 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -203,28 +203,6 @@ WHERE id = $1
203203
return nil
204204
}
205205

206-
// Build a statement of the following form, depending on which properties
207-
// need updating:
208-
//
209-
// UPDATE system.jobs
210-
// SET
211-
// [status = $2,]
212-
// [payload = $y,]
213-
// [progress = $z]
214-
// WHERE
215-
// id = $1
216-
217-
var setters []string
218-
params := []interface{}{j.ID()} // $1 is always the job ID.
219-
addSetter := func(column string, value interface{}) {
220-
params = append(params, value)
221-
setters = append(setters, fmt.Sprintf("%s = $%d", column, len(params)))
222-
}
223-
224-
if ju.md.State != "" {
225-
addSetter("status", ju.md.State)
226-
}
227-
228206
var payloadBytes []byte
229207
if ju.md.Payload != nil {
230208
payload = ju.md.Payload
@@ -246,26 +224,6 @@ WHERE id = $1
246224
}
247225
}
248226

249-
if len(setters) != 0 {
250-
updateStmt := fmt.Sprintf(
251-
"UPDATE system.jobs SET %s WHERE id = $1",
252-
strings.Join(setters, ", "),
253-
)
254-
n, err := u.txn.ExecEx(
255-
ctx, "job-update", u.txn.KV(),
256-
sessiondata.NodeUserSessionDataOverride,
257-
updateStmt, params...,
258-
)
259-
if err != nil {
260-
return err
261-
}
262-
if n != 1 {
263-
return errors.Errorf(
264-
"expected exactly one row affected, but %d rows affected by job update", n,
265-
)
266-
}
267-
}
268-
269227
// Insert the job payload and progress into the system.jobs_info table.
270228
infoStorage := j.InfoStorage(u.txn)
271229
infoStorage.claimChecked = true
@@ -319,55 +277,65 @@ WHERE id = $1
319277
}
320278
}
321279

322-
vals := []interface{}{j.ID()}
323-
324-
var update strings.Builder
280+
// Build a statement of the following form, depending on which properties
281+
// need updating:
282+
//
283+
// UPDATE system.jobs
284+
// SET
285+
// [status = $2,]
286+
// [owner = $y,]
287+
// [error_msg = $z]
288+
// WHERE
289+
// id = $1
290+
var setters []string
291+
params := []interface{}{j.ID()} // $1 is always the job ID.
292+
addSetter := func(column string, value interface{}) {
293+
params = append(params, value)
294+
setters = append(setters, fmt.Sprintf("%s = $%d", column, len(params)))
295+
}
325296

297+
if ju.md.State != "" {
298+
addSetter("status", ju.md.State)
299+
}
326300
if payloadBytes != nil {
327301
if beforePayload.Description != payload.Description {
328-
if update.Len() > 0 {
329-
update.WriteString(", ")
330-
}
331-
vals = append(vals, payload.Description)
332-
fmt.Fprintf(&update, "description = $%d", len(vals))
302+
addSetter("description", payload.Description)
333303
}
334304

335-
if beforePayload.UsernameProto.Decode() != payload.UsernameProto.Decode() {
336-
if update.Len() > 0 {
337-
update.WriteString(", ")
338-
}
339-
vals = append(vals, payload.UsernameProto.Decode().Normalized())
340-
fmt.Fprintf(&update, "owner = $%d", len(vals))
305+
beforeUser := beforePayload.UsernameProto.Decode()
306+
afterUser := payload.UsernameProto.Decode()
307+
if afterUser != beforeUser {
308+
addSetter("owner", afterUser.Normalized())
341309
}
342310

343311
if beforePayload.Error != payload.Error {
344-
if update.Len() > 0 {
345-
update.WriteString(", ")
346-
}
347-
vals = append(vals, payload.Error)
348-
fmt.Fprintf(&update, "error_msg = $%d", len(vals))
312+
addSetter("error_msg", payload.Error)
349313
}
350314

351315
if beforePayload.FinishedMicros != payload.FinishedMicros {
352-
if update.Len() > 0 {
353-
update.WriteString(", ")
354-
}
355-
vals = append(vals, time.UnixMicro(payload.FinishedMicros))
356-
fmt.Fprintf(&update, "finished = $%d", len(vals))
316+
addSetter("finished", time.UnixMicro(payload.FinishedMicros))
357317
}
358-
359318
}
360-
if len(vals) > 1 {
361-
stmt := fmt.Sprintf("UPDATE system.jobs SET %s WHERE id = $1", update.String())
362-
if _, err := u.txn.ExecEx(
363-
ctx, "job-update-row", u.txn.KV(),
319+
320+
if len(setters) != 0 {
321+
updateStmt := fmt.Sprintf(
322+
"UPDATE system.jobs SET %s WHERE id = $1",
323+
strings.Join(setters, ", "),
324+
)
325+
n, err := u.txn.ExecEx(
326+
ctx, "job-update-job", u.txn.KV(),
364327
sessiondata.NodeUserSessionDataOverride,
365-
stmt, vals...,
366-
); err != nil {
328+
updateStmt, params...,
329+
)
330+
if err != nil {
367331
return err
368332
}
333+
if n != 1 {
334+
return errors.Errorf(
335+
"expected exactly one row affected, but %d rows affected by job update", n,
336+
)
337+
}
369338
}
370-
371339
return nil
372340
}
373341

0 commit comments

Comments
 (0)