Skip to content

Commit 36c10db

Browse files
committed
tenants: post-rebase adaptations
Signed-off-by: Stephan Renatus <stephan.renatus@gmail.com>
1 parent 3dd8f31 commit 36c10db

File tree

6 files changed

+50
-36
lines changed

6 files changed

+50
-36
lines changed

cmd/run/run.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package cmd
22

33
import (
4-
"context"
54
"os"
65

76
"github.com/open-policy-agent/opa-control-plane/cmd"

config/schema.json

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@
310310
"administrator",
311311
"viewer",
312312
"owner",
313-
"stack_owner"
313+
"stack_owner",
314+
"automation"
314315
],
315316
"type": "string"
316317
}
@@ -337,16 +338,10 @@
337338
"type": "string"
338339
},
339340
"bundle_rebuild_interval": {
340-
"type": [
341-
"null",
342-
"integer"
343-
]
341+
"type": "string"
344342
},
345343
"reconfiguration_interval": {
346-
"type": [
347-
"null",
348-
"integer"
349-
]
344+
"type": "string"
350345
}
351346
},
352347
"type": "object"

internal/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,12 +1075,12 @@ type Service struct {
10751075
// ReconfigurationInterval is the duration between configuration checks, i.e. when a change
10761076
// to a bundle/stack/source will have an effect on the internal bundle workers.
10771077
// String of a duration, e.g. "1m". Defaults to "15s".
1078-
ReconfigurationInterval *time.Duration `json:"reconfiguration_interval,omitempty" yaml:"reconfiguration_interval,omitempty"`
1078+
ReconfigurationInterval Duration `json:"reconfiguration_interval,omitempty"`
10791079

10801080
// BundleRebuildInterval is the time between bundle builds: After a bundle build as finished,
10811081
// OCP will wait _this long_ until it's build again (unless the bundle build is triggered by
10821082
// other means). String duration, e.g. "90s". Defaults to "30s".
1083-
BundleRebuildInterval *time.Duration `json:"bundle_rebuild_interval,omitempty" yaml:"bundle_rebuild_interval,omitempty"`
1083+
BundleRebuildInterval Duration `json:"bundle_rebuild_interval,omitempty"`
10841084

10851085
_ struct{} `additionalProperties:"false"`
10861086
}

internal/pool/pool.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,24 @@ import (
1717
type Pool struct {
1818
mu sync.Mutex
1919
queue []*task
20-
reg map[string]*task
20+
reg map[tenantName]*task
2121
wait chan struct{}
2222
}
2323

24+
type tenantName struct {
25+
tenant string
26+
name string
27+
}
28+
2429
type task struct {
25-
name string
30+
name tenantName
2631
fn func(context.Context) time.Time
2732
deadline time.Time
2833
rerun bool
2934
}
3035

3136
func New(workers int) *Pool {
32-
pool := Pool{reg: make(map[string]*task)}
37+
pool := Pool{reg: make(map[tenantName]*task)}
3338

3439
for range workers {
3540
go pool.work()
@@ -38,8 +43,8 @@ func New(workers int) *Pool {
3843
return &pool
3944
}
4045

41-
func (p *Pool) Add(name string, fn func(context.Context) time.Time) {
42-
p.enqueue(&task{name: name, fn: fn, deadline: time.Now()})
46+
func (p *Pool) Add(tenant, name string, fn func(context.Context) time.Time) {
47+
p.enqueue(&task{name: tenantName{name: name, tenant: tenant}, fn: fn, deadline: time.Now()})
4348
}
4449

4550
// work is the main loop for each worker goroutine.
@@ -55,22 +60,22 @@ func (p *Pool) work() {
5560
// task is not queued, it's running. In that case, we'll have it override its
5661
// next deadline to NOW, causing an immediate re-run after the current run.
5762
// Subsequent runs will use the deadline returned by the task's `fn`.
58-
func (p *Pool) Trigger(n string) error {
63+
func (p *Pool) Trigger(tenant, name string) error {
5964
p.mu.Lock()
6065
defer p.mu.Unlock()
6166

62-
if i := slices.IndexFunc(p.queue, func(t *task) bool { return t.name == n }); i != -1 {
67+
if i := slices.IndexFunc(p.queue, func(t *task) bool { return t.name.name == name && t.name.tenant == tenant }); i != -1 {
6368
p.queue[i].deadline = time.Now()
6469
p.sortAndWake()
6570
return nil
6671
}
6772
// if it's not in p.queue, it must be running at the moment
68-
if t, ok := p.reg[n]; ok {
73+
if t, ok := p.reg[tenantName{tenant: tenant, name: name}]; ok {
6974
t.rerun = true
7075
return nil
7176
}
7277

73-
return fmt.Errorf("no task with name %s", n)
78+
return fmt.Errorf("no task with name %s (tenant %s)", name, tenant)
7479
}
7580

7681
// sortAndWake is used in multiple places, but always needs to be run
@@ -110,7 +115,10 @@ func (p *Pool) dequeue() *task {
110115

111116
var t *task
112117
if len(p.queue) == 0 {
113-
t = &task{name: "dummy", deadline: time.Now().Add(time.Hour * 24 * 365)} // Default to a far future deadline
118+
t = &task{
119+
name: tenantName{tenant: "dummy", name: "dummy"},
120+
deadline: time.Now().Add(time.Hour * 24 * 365), // Default to a far future deadline
121+
}
114122
} else {
115123
t = p.queue[0]
116124
}

internal/server/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ func (s *Server) v1BundlesPost(w http.ResponseWriter, r *http.Request) {
201201
return
202202
}
203203

204-
if err := s.svc.Trigger(ctx, s.auth(r), name); err != nil {
204+
principal, tenant := s.auth(r)
205+
if err := s.svc.Trigger(ctx, principal, tenant, name); err != nil {
205206
errorAuto(w, err)
206207
return
207208
}

internal/service/service.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,19 @@ var (
4646
defaultStackMountPrefix = ast.DefaultRootRef.Append(ast.StringTerm("stacks"))
4747
)
4848

49+
type TenantName struct {
50+
Tenant, Name string
51+
}
52+
53+
func (tn *TenantName) String() string {
54+
return tn.Tenant + ":" + tn.Name
55+
}
56+
4957
type Service struct {
5058
config *config.Root
5159
persistenceDir string
5260
pool *pool.Pool
53-
workers map[string]*BundleWorker
61+
workers map[TenantName]*BundleWorker
5462
readyMutex sync.Mutex
5563
ready bool
5664
failures map[string]Status
@@ -111,7 +119,7 @@ type Status struct {
111119
func New() *Service {
112120
return &Service{
113121
pool: pool.New(10),
114-
workers: make(map[string]*BundleWorker),
122+
workers: make(map[TenantName]*BundleWorker),
115123
failures: make(map[string]Status),
116124
noninteractive: true,
117125
migrateDB: false,
@@ -129,11 +137,11 @@ func (s *Service) WithConfig(config *config.Root) *Service {
129137
s.config = config
130138
s.database = *s.database.WithConfig(config.Database)
131139
if s.config.Service != nil {
132-
if s.config.Service.ReconfigurationInterval != nil {
133-
s.reconfigurationInterval = *s.config.Service.ReconfigurationInterval
140+
if s.config.Service.ReconfigurationInterval != 0 {
141+
s.reconfigurationInterval = time.Duration(s.config.Service.ReconfigurationInterval)
134142
}
135-
if s.config.Service.BundleRebuildInterval != nil {
136-
s.buildInterval = *s.config.Service.BundleRebuildInterval
143+
if s.config.Service.BundleRebuildInterval != 0 {
144+
s.buildInterval = time.Duration(s.config.Service.BundleRebuildInterval)
137145
}
138146
}
139147
return s
@@ -220,6 +228,8 @@ shutdown:
220228
for _, w := range s.workers {
221229
s.report.Bundles[w.bundleConfig.Name] = w.status
222230
}
231+
232+
// For singleshot, we don't need to worry about tenants:
223233
maps.Copy(s.report.Bundles, s.failures)
224234
}
225235

@@ -230,9 +240,10 @@ func (s *Service) Report() *Report {
230240
return s.report
231241
}
232242

233-
func (s *Service) Trigger(ctx context.Context, principal, name string) error {
243+
func (s *Service) Trigger(ctx context.Context, principal, tenant, name string) error {
234244
a := authz.Access{
235245
Principal: principal,
246+
Tenant: tenant,
236247
Resource: "bundles",
237248
Permission: "bundles.trigger",
238249
Name: name,
@@ -241,7 +252,7 @@ func (s *Service) Trigger(ctx context.Context, principal, name string) error {
241252
return err
242253
}
243254

244-
err := s.pool.Trigger(name)
255+
err := s.pool.Trigger(tenant, name)
245256
if err != nil {
246257
s.log.Errorf("trigger bundle build for %s: %v", name, err)
247258
} else {
@@ -316,9 +327,9 @@ func (s *Service) launchWorkers(ctx context.Context) {
316327
return
317328
}
318329

319-
activeBundles := make(map[string]struct{})
330+
activeBundles := make(map[TenantName]struct{})
320331
for _, b := range bundles {
321-
bName := tenant + "_" + b.Name
332+
bName := TenantName{Tenant: tenant, Name: b.Name}
322333
activeBundles[bName] = struct{}{}
323334
}
324335

@@ -353,7 +364,7 @@ func (s *Service) launchWorkers(ctx context.Context) {
353364
failures := make(map[string]Status)
354365

355366
for _, b := range bundles {
356-
bName := tenant + "_" + b.Name
367+
bName := TenantName{Tenant: tenant, Name: b.Name}
357368
if w, ok := s.workers[bName]; ok {
358369
w.UpdateConfig(b, sourceDefs, stacks)
359370
continue
@@ -392,7 +403,7 @@ func (s *Service) launchWorkers(ctx context.Context) {
392403

393404
syncs := []Synchronizer{}
394405
sources := []*builder.Source{&root.Source}
395-
bundleDir := join(s.persistenceDir, md5sum(bName))
406+
bundleDir := join(s.persistenceDir, md5sum(bName.String()))
396407

397408
for _, dep := range deps {
398409
// NB(sr): dep.Name could contain a `:` which cause build errors in OPA's bundle build machinery
@@ -421,7 +432,7 @@ func (s *Service) launchWorkers(ctx context.Context) {
421432
WithStorage(storage).
422433
WithInterval(b.Interval).
423434
WithSingleShot(s.singleShot)
424-
s.pool.Add(b.Name, w.Execute) // TODO(sr): name not unique
435+
s.pool.Add(tenant, b.Name, w.Execute)
425436

426437
s.workers[bName] = w
427438
}

0 commit comments

Comments
 (0)