Skip to content

Commit bf196bb

Browse files
committed
service: add Trigger(name) to trigger bundle rebuild
Adds the machinery to pool and service to trigger bundle rebuilds by name. If the bundle worker is queued, it'll be pulled into the front of the line, to be picked up next; if it's currently not in the queue, it must be executing: if so, we're overriding the next deadline to be _now_, causing an immediate re-run of the build process. For now, this is only wired up with signal handling: send a HUP signal to rebuild. This is because we haven't discussed proper permissions and HTTP API design for this yet. Signed-off-by: Stephan Renatus <stephan.renatus@gmail.com>
1 parent dd36be1 commit bf196bb

File tree

6 files changed

+216
-56
lines changed

6 files changed

+216
-56
lines changed

cmd/run/run.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"bytes"
55
"context"
66
"os"
7+
"os/signal"
8+
"syscall"
79

810
"github.com/spf13/cobra"
911
"github.com/styrainc/opa-control-plane/cmd"
@@ -73,6 +75,10 @@ func init() {
7375
log.Fatalf("initialize service: %v", err)
7476
}
7577

78+
// NB(sr): Preliminary, not necessarily something we'll want to keep:
79+
// Rebuild all bundles on SIGHUP.
80+
signalTrigger(svc, log)
81+
7682
go func() {
7783
if err := server.New().WithDatabase(svc.Database()).WithReadiness(svc.Ready).Init().ListenAndServe(params.addr); err != nil {
7884
log.Fatalf("failed to start server: %v", err)
@@ -97,3 +103,15 @@ func init() {
97103
run,
98104
)
99105
}
106+
107+
func signalTrigger(s *service.Service, l *logging.Logger) {
108+
sigs := make(chan os.Signal, 1)
109+
signal.Notify(sigs, syscall.SIGHUP)
110+
go func() {
111+
for range sigs {
112+
if err := s.TriggerAll(context.Background()); err != nil {
113+
l.Error(err.Error())
114+
}
115+
}
116+
}()
117+
}

internal/config/config.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"reflect"
1515
"slices"
1616
"sort"
17+
"time"
1718

1819
"github.com/gobwas/glob"
1920
"github.com/pkg/errors"
@@ -40,6 +41,7 @@ type Root struct {
4041
Secrets map[string]*Secret `json:"secrets,omitempty" yaml:"secrets,omitempty"` // Schema validation overrides Secret to object type.
4142
Tokens map[string]*Token `json:"tokens,omitempty" yaml:"tokens,omitempty"`
4243
Database *Database `json:"database,omitempty" yaml:"database,omitempty"`
44+
Service *Service `json:"service,omitempty" yaml:"service,omitempty"`
4345
}
4446

4547
// SetSQLitePersistentByDefault sets the database configuration to use a SQLite
@@ -228,7 +230,7 @@ func (r *Root) Validate() error {
228230
return err
229231
}
230232

231-
var config interface{}
233+
var config any
232234
if err := json.Unmarshal(data, &config); err != nil {
233235
return err
234236
}
@@ -966,6 +968,18 @@ func (a Datasources) Equal(b Datasources) bool {
966968
return setEqual(a, b, func(ds Datasource) string { return ds.Name }, func(a, b Datasource) bool { return a.Equal(&b) })
967969
}
968970

971+
type Service struct {
972+
// ReconfigurationInterval is the duration between configuration checks, i.e. when a change
973+
// to a bundle/stack/source will have an effect on the internal bundle workers.
974+
// String of a duration, e.g. "1m". Defaults to "15s".
975+
ReconfigurationInterval *time.Duration `json:"reconfiguration_interval,omitempty" yaml:"reconfiguration_interval,omitempty"`
976+
977+
// BundleRebuildInterval is the time between bundle builds: After a bundle build as finished,
978+
// OCP will wait _this long_ until it's build again (unless the bundle build is triggered by
979+
// other means). String duration, e.g. "90s". Defaults to "30s".
980+
BundleRebuildInterval *time.Duration `json:"bundle_rebuild_interval,omitempty" yaml:"bundle_rebuild_interval,omitempty"`
981+
}
982+
969983
type Database struct {
970984
SQL *SQLDatabase `json:"sql,omitempty" yaml:"sql,omitempty"`
971985
AWSRDS *AmazonRDS `json:"aws_rds,omitempty" yaml:"aws_rds,omitempty"`

internal/pool/pool.go

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool
22

33
import (
44
"context"
5+
"fmt"
56
"slices"
67
"sync"
78
"time"
@@ -15,17 +16,20 @@ import (
1516
// the waiting goroutine to process the new task immediately.
1617
type Pool struct {
1718
mu sync.Mutex
18-
tasks []*task
19+
queue []*task
20+
reg map[string]*task
1921
wait chan struct{}
2022
}
2123

2224
type task struct {
25+
name string
2326
fn func(context.Context) time.Time
2427
deadline time.Time
28+
rerun bool
2529
}
2630

2731
func New(workers int) *Pool {
28-
var pool Pool
32+
pool := Pool{reg: make(map[string]*task)}
2933

3034
for range workers {
3135
go pool.work()
@@ -34,8 +38,8 @@ func New(workers int) *Pool {
3438
return &pool
3539
}
3640

37-
func (p *Pool) Add(fn func(context.Context) time.Time) {
38-
p.enqueue(&task{fn: fn, deadline: time.Now()})
41+
func (p *Pool) Add(name string, fn func(context.Context) time.Time) {
42+
p.enqueue(&task{name: name, fn: fn, deadline: time.Now()})
3943
}
4044

4145
// work is the main loop for each worker goroutine.
@@ -46,18 +50,34 @@ func (p *Pool) work() {
4650
}
4751
}
4852

49-
func (p *Pool) enqueue(t *task) {
50-
if t.deadline.IsZero() {
51-
// Task requested removal from the pool.
52-
return
53-
}
54-
53+
// Trigger runs the named task NOW, if it is in the queue, regardless of the
54+
// previous deadline, by pulling it into the front of the queue. If the named
55+
// task is not queued, it's running. In that case, we'll have it override its
56+
// next deadline to NOW, causing an immediate re-run after the current run.
57+
// Subsequent runs will use the deadline returned by the task's `fn`.
58+
func (p *Pool) Trigger(n string) error {
5559
p.mu.Lock()
5660
defer p.mu.Unlock()
5761

62+
if i := slices.IndexFunc(p.queue, func(t *task) bool { return t.name == n }); i != -1 {
63+
p.queue[i].deadline = time.Now()
64+
p.sortAndWake()
65+
return nil
66+
}
67+
// if it's not in p.queue, it must be running at the moment
68+
if t, ok := p.reg[n]; ok {
69+
t.rerun = true
70+
return nil
71+
}
72+
73+
return fmt.Errorf("no task with name %s", n)
74+
}
75+
76+
// sortAndWake is used in multiple places, but always needs to be run
77+
// within a p.mu lock!
78+
func (p *Pool) sortAndWake() {
5879
// Maintain the tasks in deadline order.
59-
p.tasks = append(p.tasks, t)
60-
slices.SortFunc(p.tasks, func(a, b *task) int {
80+
slices.SortFunc(p.queue, func(a, b *task) int {
6181
return a.deadline.Compare(b.deadline)
6282
})
6383

@@ -68,17 +88,31 @@ func (p *Pool) enqueue(t *task) {
6888
}
6989
}
7090

91+
func (p *Pool) enqueue(t *task) {
92+
if t.deadline.IsZero() {
93+
// Task requested removal from the pool.
94+
delete(p.reg, t.name)
95+
return
96+
}
97+
98+
p.mu.Lock()
99+
p.reg[t.name] = t
100+
p.queue = append(p.queue, t)
101+
p.sortAndWake()
102+
p.mu.Unlock()
103+
}
104+
71105
func (p *Pool) dequeue() *task {
72106
p.mu.Lock()
73107
defer p.mu.Unlock()
74108

75109
for {
76110

77111
var t *task
78-
if len(p.tasks) == 0 {
79-
t = &task{deadline: time.Now().Add(time.Hour * 24 * 365)} // Default to a far future deadline
112+
if len(p.queue) == 0 {
113+
t = &task{name: "dummy", deadline: time.Now().Add(time.Hour * 24 * 365)} // Default to a far future deadline
80114
} else {
81-
t = p.tasks[0]
115+
t = p.queue[0]
82116
}
83117

84118
if t.deadline.After(time.Now()) {
@@ -105,12 +139,16 @@ func (p *Pool) dequeue() *task {
105139
break
106140
}
107141

108-
t := p.tasks[0]
109-
p.tasks = slices.Delete(p.tasks, 0, 1)
142+
var t *task
143+
t, p.queue = p.queue[0], p.queue[1:]
110144
return t
111145
}
112146

113147
func (t *task) Execute(ctx context.Context) *task {
114148
t.deadline = t.fn(ctx)
149+
if t.rerun {
150+
t.rerun = false
151+
t.deadline = time.Now()
152+
}
115153
return t
116154
}

internal/pool/pool_test.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,79 @@ func TestPool(t *testing.T) {
1010
p := New(2)
1111

1212
// Add a task that returns a deadline in the future.
13-
p.Add(func(_ context.Context) time.Time {
13+
p.Add("a", func(context.Context) time.Time {
1414
return time.Now().Add(100 * time.Millisecond)
1515
})
1616

1717
// Add a task that returns a deadline in the past.
18-
p.Add(func(_ context.Context) time.Time {
18+
p.Add("b", func(context.Context) time.Time {
1919
return time.Now().Add(-100 * time.Millisecond)
2020
})
2121

2222
// Add a task that returns a deadline in the future.
23-
p.Add(func(_ context.Context) time.Time {
23+
p.Add("c", func(context.Context) time.Time {
2424
return time.Now().Add(200 * time.Millisecond)
2525
})
2626

2727
// Wait for a short period to allow tasks to be processed.
2828
time.Sleep(300 * time.Millisecond)
2929

3030
// The pool should have processed all tasks without deadlock.
31+
// If it had gotten stuck, we'd never reach this line.
3132
t.Log("All tasks processed successfully")
3233
}
34+
35+
type run struct {
36+
left int
37+
ran int
38+
sleep time.Duration
39+
deadline time.Duration
40+
}
41+
42+
func (t *run) Execute(context.Context) time.Time {
43+
if t.left > 0 {
44+
time.Sleep(t.sleep)
45+
t.left--
46+
t.ran++
47+
return time.Now().Add(t.deadline)
48+
}
49+
50+
var zero time.Time
51+
return zero // dequeue task
52+
}
53+
54+
func TestTrigger(t *testing.T) {
55+
t.Run("trigger pulls queued task up from", func(t *testing.T) {
56+
p := New(2)
57+
58+
rx := &run{left: 3, deadline: 200 * time.Millisecond}
59+
60+
p.Add("t", rx.Execute) // will run once (run #1), and be queued for 200 ms
61+
62+
_ = p.Trigger("t") // pulled in front, run #2
63+
time.Sleep(50 * time.Millisecond)
64+
_ = p.Trigger("t") // pulled in front, run #3
65+
time.Sleep(300 * time.Millisecond) // no other runs, third run dequeued
66+
67+
if exp, act := 3, rx.ran; exp != act {
68+
t.Errorf("expected counter of %d, got %d", exp, act)
69+
}
70+
})
71+
72+
t.Run("trigger reruns executing task right away", func(t *testing.T) {
73+
p := New(2)
74+
75+
// if it wasn't triggered, we'd not see a second run: the next deadline is 1s
76+
rx := &run{left: 3, sleep: 100 * time.Millisecond, deadline: time.Second}
77+
78+
p.Add("t", rx.Execute) // will run once (run #1), and be queued for 200 ms
79+
time.Sleep(50 * time.Millisecond)
80+
_ = p.Trigger("t") // re-run after it's done, run #2
81+
82+
time.Sleep(300 * time.Millisecond)
83+
84+
if exp, act := 2, rx.ran; exp != act {
85+
t.Errorf("expected counter of %d, got %d", exp, act)
86+
}
87+
})
88+
}

0 commit comments

Comments
 (0)