Skip to content

Commit d822584

Browse files
committed
refactor(wrtagweb): add jobSSE helpers
1 parent 1fb6caf commit d822584

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

cmd/wrtagweb/main.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ func main() {
121121
}
122122

123123
var sse broadcast[uint64]
124+
jobSSENew := func() { sse.send(0) }
125+
jobSSEUpdate := func(id uint64) { sse.send(id) }
126+
124127
var jobQueue = make(chan uint64, 32_768)
125128

126129
mux := http.NewServeMux()
@@ -184,7 +187,7 @@ func main() {
184187

185188
respTmpl(w, "job-import", struct{ Operation string }{Operation: operationStr})
186189

187-
sse.send(0)
190+
jobSSENew()
188191
jobQueue <- job.ID
189192
})
190193

@@ -221,7 +224,7 @@ func main() {
221224

222225
respTmpl(w, "job", job)
223226

224-
sse.send(0)
227+
jobSSENew()
225228
jobQueue <- job.ID
226229
})
227230

@@ -234,7 +237,7 @@ func main() {
234237
respErrf(w, http.StatusInternalServerError, "error getting job")
235238
return
236239
}
237-
sse.send(0)
240+
jobSSENew()
238241
})
239242

240243
mux.HandleFunc("GET /dirs", func(w http.ResponseWriter, r *http.Request) {
@@ -312,7 +315,7 @@ func main() {
312315
return
313316
}
314317

315-
sse.send(0)
318+
jobSSENew()
316319
jobQueue <- job.ID
317320
})
318321

@@ -363,7 +366,7 @@ func main() {
363366
case <-ctx.Done():
364367
return nil
365368
case jobID := <-jobQueue:
366-
if err := processJob(ctx, cfg, notifs, researchLinkQuerier, *publicURL, db, &sse, jobID); err != nil {
369+
if err := processJob(ctx, cfg, notifs, researchLinkQuerier, *publicURL, db, jobSSEUpdate, jobID); err != nil {
367370
return fmt.Errorf("next job: %w", err)
368371
}
369372
}
@@ -388,7 +391,7 @@ func main() {
388391
}
389392
}
390393

391-
func processJob(ctx context.Context, cfg *wrtag.Config, notifs *notifications.Notifications, researchLinkQuerier *researchlink.Builder, publicURL string, db *sql.DB, sse *broadcast[uint64], jobID uint64) error {
394+
func processJob(ctx context.Context, cfg *wrtag.Config, notifs *notifications.Notifications, researchLinkQuerier *researchlink.Builder, publicURL string, db *sql.DB, jobSSEUpdate func(uint64), jobID uint64) error {
392395
var job Job
393396
err := sqlb.ScanRow(ctx, db, &job, "update jobs set status=? where id=? and status=? returning *", StatusInProgress, jobID, StatusEnqueued)
394397
if errors.Is(err, sql.ErrNoRows) {
@@ -398,8 +401,8 @@ func processJob(ctx context.Context, cfg *wrtag.Config, notifs *notifications.No
398401
return err
399402
}
400403

401-
sse.send(job.ID)
402-
defer sse.send(job.ID)
404+
jobSSEUpdate(job.ID)
405+
defer jobSSEUpdate(job.ID)
403406

404407
op, err := wrtagflag.OperationByName(job.Operation, false)
405408
if err != nil {

0 commit comments

Comments
 (0)