@@ -3,6 +3,7 @@ package harmonytask
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "os"
6
7
"strconv"
7
8
"sync/atomic"
8
9
"time"
@@ -23,6 +24,8 @@ var POLL_NEXT_DURATION = 100 * time.Millisecond // After scheduling a task, wait
23
24
var CLEANUP_FREQUENCY = 5 * time .Minute // Check for dead workers this often * everyone
24
25
var FOLLOW_FREQUENCY = 1 * time .Minute // Check for work to follow this often
25
26
27
+ var ExitStatusRestartRequest = 100
28
+
26
29
type TaskTypeDetails struct {
27
30
// Max returns how many tasks this machine can run of this type.
28
31
// Nil (default)/Zero or less means unrestricted.
@@ -57,6 +60,16 @@ type TaskTypeDetails struct {
57
60
// CanAccept() can read taskEngine's WorkOrigin string to learn about a task.
58
61
// Ex: make new CC sectors, clean-up, or retrying pipelines that failed in later states.
59
62
IAmBored func (AddTaskFunc ) error
63
+
64
+ // CanYield is true if the task should yield when the node is not schedulable.
65
+ // This is implied for background tasks.
66
+ CanYield bool
67
+
68
+ // SchedOverrides is a map of task names which, when running while the node is not schedulable,
69
+ // allow this task to continue being scheduled. This is useful in pipelines where a long-running
70
+ // task would block a short-running task from being scheduled, blocking other related pipelines on
71
+ // other machines.
72
+ SchedulingOverrides map [string ]bool
60
73
}
61
74
62
75
// TaskInterface must be implemented in order to have a task used by harmonytask.
@@ -126,6 +139,9 @@ type TaskEngine struct {
126
139
follows map [string ][]followStruct
127
140
hostAndPort string
128
141
142
+ // runtime flags
143
+ yieldBackground atomic.Bool
144
+
129
145
// synchronous to the single-threaded poller
130
146
lastFollowTime time.Time
131
147
lastCleanup atomic.Value
@@ -283,20 +299,24 @@ func (e *TaskEngine) poller() {
283
299
nextWait = POLL_DURATION
284
300
285
301
// Check if the machine is schedulable
286
- schedulable , err := e .schedulable ()
302
+ schedulable , err := e .checkNodeFlags ()
287
303
if err != nil {
288
304
log .Error ("Unable to check schedulable status: " , err )
289
305
continue
290
306
}
307
+
308
+ e .yieldBackground .Store (! schedulable )
309
+
310
+ accepted := e .pollerTryAllWork (schedulable )
311
+ if accepted {
312
+ nextWait = POLL_NEXT_DURATION
313
+ }
314
+
291
315
if ! schedulable {
292
316
log .Debugf ("Machine %s is not schedulable. Please check the cordon status." , e .hostAndPort )
293
317
continue
294
318
}
295
319
296
- accepted := e .pollerTryAllWork ()
297
- if accepted {
298
- nextWait = POLL_NEXT_DURATION
299
- }
300
320
if time .Since (e .lastFollowTime ) > FOLLOW_FREQUENCY {
301
321
e .followWorkInDB ()
302
322
}
@@ -361,12 +381,40 @@ func (e *TaskEngine) followWorkInDB() {
361
381
}
362
382
363
383
// pollerTryAllWork starts the next 1 task
364
- func (e * TaskEngine ) pollerTryAllWork () bool {
384
+ func (e * TaskEngine ) pollerTryAllWork (schedulable bool ) bool {
365
385
if time .Since (e .lastCleanup .Load ().(time.Time )) > CLEANUP_FREQUENCY {
366
386
e .lastCleanup .Store (time .Now ())
367
387
resources .CleanupMachines (e .ctx , e .db )
368
388
}
369
389
for _ , v := range e .handlers {
390
+ if ! schedulable {
391
+ if v .TaskTypeDetails .SchedulingOverrides == nil {
392
+ continue
393
+ }
394
+
395
+ // Override the schedulable flag if the task has any assigned overrides
396
+ var foundOverride bool
397
+ for relatedTaskName := range v .TaskTypeDetails .SchedulingOverrides {
398
+ var assignedOverrideTasks []int
399
+ err := e .db .Select (e .ctx , & assignedOverrideTasks , `SELECT id
400
+ FROM harmony_task
401
+ WHERE owner_id = $1 AND name=$2
402
+ ORDER BY update_time LIMIT 1` , e .ownerID , relatedTaskName )
403
+ if err != nil {
404
+ log .Error ("Unable to read assigned overrides " , err )
405
+ break
406
+ }
407
+ if len (assignedOverrideTasks ) > 0 {
408
+ log .Infow ("found override, scheduling despite schedulable=false flag" , "ownerID" , e .ownerID , "relatedTaskName" , relatedTaskName , "assignedOverrideTasks" , assignedOverrideTasks )
409
+ foundOverride = true
410
+ break
411
+ }
412
+ }
413
+ if ! foundOverride {
414
+ continue
415
+ }
416
+ }
417
+
370
418
if err := v .AssertMachineHasCapacity (); err != nil {
371
419
log .Debugf ("skipped scheduling %s type tasks on due to %s" , v .Name , err .Error ())
372
420
continue
@@ -407,6 +455,11 @@ func (e *TaskEngine) pollerTryAllWork() bool {
407
455
log .Warn ("Work not accepted for " + strconv .Itoa (len (unownedTasks )) + " " + v .Name + " task(s)" )
408
456
}
409
457
}
458
+
459
+ if ! schedulable {
460
+ return false
461
+ }
462
+
410
463
// if no work was accepted, are we bored? Then find work in priority order.
411
464
for _ , v := range e .handlers {
412
465
v := v
@@ -462,15 +515,43 @@ func (e *TaskEngine) Host() string {
462
515
return e .hostAndPort
463
516
}
464
517
465
- func (e * TaskEngine ) schedulable () (bool , error ) {
518
+ func (e * TaskEngine ) checkNodeFlags () (bool , error ) {
466
519
var unschedulable bool
467
- err := e .db .QueryRow (e .ctx , `SELECT unschedulable FROM harmony_machines WHERE host_and_port=$1` , e .hostAndPort ).Scan (& unschedulable )
520
+ var restartRequest * time.Time
521
+ err := e .db .QueryRow (e .ctx , `SELECT unschedulable, restart_request FROM harmony_machines WHERE host_and_port=$1` , e .hostAndPort ).Scan (& unschedulable , & restartRequest )
468
522
if err != nil {
469
523
return false , err
470
524
}
525
+
526
+ if restartRequest != nil {
527
+ e .restartIfNoTasksPending (* restartRequest )
528
+ }
529
+
471
530
return ! unschedulable , nil
472
531
}
473
532
533
+ func (e * TaskEngine ) restartIfNoTasksPending (pendingSince time.Time ) {
534
+ var tasksPending int
535
+ err := e .db .QueryRow (e .ctx , `SELECT COUNT(*) FROM harmony_task WHERE owner_id=$1` , e .ownerID ).Scan (& tasksPending )
536
+ if err != nil {
537
+ log .Error ("Unable to check for tasks pending: " , err )
538
+ return
539
+ }
540
+ if tasksPending == 0 {
541
+ log .Infow ("no tasks pending, restarting" , "ownerID" , e .ownerID , "pendingSince" , pendingSince , "took" , time .Since (pendingSince ))
542
+
543
+ // unset the flags first
544
+ _ , err = e .db .Exec (e .ctx , `UPDATE harmony_machines SET restart_request=NULL, unschedulable=FALSE WHERE host_and_port=$1` , e .hostAndPort )
545
+ if err != nil {
546
+ log .Error ("Unable to unset restart request: " , err )
547
+ return
548
+ }
549
+
550
+ // then exit
551
+ os .Exit (ExitStatusRestartRequest )
552
+ }
553
+ }
554
+
474
555
// About the Registry
475
556
// This registry exists for the benefit of "static methods" of TaskInterface extensions.
476
557
// For example, GetSPID(db, taskID) (int, err) is a static method that can be called
0 commit comments