Skip to content

Commit 12afb36

Browse files
committed
pkg/settings/cresettings: add PerWorkflow.ChainAllowed
1 parent 39a3be0 commit 12afb36

File tree

14 files changed

+476
-39
lines changed

14 files changed

+476
-39
lines changed

pkg/contexts/chains.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package contexts
2+
3+
import (
4+
"context"
5+
"errors"
6+
)
7+
8+
const chainSelectorCtxKey key = "chainSelectorCtx"
9+
10+
// WithChainSelector returns a new context that includes the chain selector.
11+
// Use ChainSelectorValue to get the value.
12+
func WithChainSelector(ctx context.Context, cs uint64) context.Context {
13+
return context.WithValue(ctx, chainSelectorCtxKey, cs)
14+
}
15+
16+
// ChainSelectorValue returns the chain selector, if one was set via WithChainSelector.
17+
func ChainSelectorValue(ctx context.Context) (uint64, error) {
18+
val := Value[uint64](ctx, chainSelectorCtxKey)
19+
if val == 0 {
20+
return 0, errors.New("context missing chain selector")
21+
}
22+
return val, nil
23+
}

pkg/settings/cresettings/defaults.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
"ConsensusCallsLimit": "2000",
4242
"LogLineLimit": "1kb",
4343
"LogEventLimit": "1000",
44+
"ChainAllowed": {
45+
"Default": "false",
46+
"Values": {}
47+
},
4448
"CRONTrigger": {
4549
"FastestScheduleInterval": "30s",
4650
"RateLimit": "every30s:1"

pkg/settings/cresettings/defaults.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ ConsensusCallsLimit = '2000'
4242
LogLineLimit = '1kb'
4343
LogEventLimit = '1000'
4444

45+
[PerWorkflow.ChainAllowed]
46+
Default = 'false'
47+
48+
[PerWorkflow.ChainAllowed.Values]
49+
4550
[PerWorkflow.CRONTrigger]
4651
FastestScheduleInterval = '30s'
4752
RateLimit = 'every30s:1'

pkg/settings/cresettings/settings.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ var Default = Schema{
103103
ConsensusCallsLimit: Int(2000),
104104
LogLineLimit: Size(config.KByte),
105105
LogEventLimit: Int(1_000),
106+
ChainAllowed: PerChainSelector(Bool(false), map[string]bool{
107+
// none by default
108+
}),
106109

107110
CRONTrigger: cronTrigger{
108111
FastestScheduleInterval: Duration(30 * time.Second),
@@ -210,6 +213,8 @@ type Workflows struct {
210213
LogLineLimit Setting[config.Size]
211214
LogEventLimit Setting[int] `unit:"{log}"`
212215

216+
ChainAllowed SettingMap[bool]
217+
213218
CRONTrigger cronTrigger
214219
HTTPTrigger httpTrigger
215220
LogTrigger logTrigger

pkg/settings/cresettings/settings_test.go

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ func TestSchema_Unmarshal(t *testing.T) {
7474
},
7575
"PerWorkflow": {
7676
"WASMMemoryLimit": "250mb",
77+
"ChainAllowed": {
78+
"Default": "false",
79+
"Values": {
80+
"1": "true"
81+
}
82+
},
7783
"CRONTrigger": {
7884
"RateLimit": "every10s:5"
7985
},
@@ -103,6 +109,10 @@ func TestSchema_Unmarshal(t *testing.T) {
103109
assert.Equal(t, 48*time.Hour, cfg.PerOrg.ZeroBalancePruningTimeout.DefaultValue)
104110
assert.Equal(t, 99, cfg.PerOwner.WorkflowExecutionConcurrencyLimit.DefaultValue)
105111
assert.Equal(t, 250*config.MByte, cfg.PerWorkflow.WASMMemoryLimit.DefaultValue)
112+
assert.Equal(t, false, cfg.PerWorkflow.ChainAllowed.Default.DefaultValue)
113+
assert.Equal(t, "true", cfg.PerWorkflow.ChainAllowed.Values["1"])
114+
assert.NotNil(t, cfg.PerWorkflow.ChainAllowed.Default.Parse)
115+
assert.NotNil(t, cfg.PerWorkflow.ChainAllowed.KeyFromCtx)
106116
assert.Equal(t, config.Rate{Limit: rate.Every(10 * time.Second), Burst: 5}, cfg.PerWorkflow.CRONTrigger.RateLimit.DefaultValue)
107117
assert.Equal(t, config.Rate{Limit: rate.Every(30 * time.Second), Burst: 3}, cfg.PerWorkflow.HTTPTrigger.RateLimit.DefaultValue)
108118
assert.Equal(t, config.Rate{Limit: rate.Every(13 * time.Second), Burst: 6}, cfg.PerWorkflow.LogTrigger.EventRateLimit.DefaultValue)
@@ -142,11 +152,6 @@ func TestDefaultGetter(t *testing.T) {
142152
}`)
143153
reinit() // set default vars
144154

145-
_ = `
146-
[workflow.test-wf-id]
147-
PerWorkflow.HTTPAction.CallLimit = 20
148-
`
149-
150155
// Default unchanged
151156
got, err = limit.GetOrDefault(ctx, DefaultGetter)
152157
require.NoError(t, err)
@@ -158,3 +163,70 @@ PerWorkflow.HTTPAction.CallLimit = 20
158163
require.Equal(t, 20, got)
159164

160165
}
166+
167+
func TestDefaultGetter_SettingMap(t *testing.T) {
168+
limit := Default.PerWorkflow.ChainAllowed
169+
170+
ctx := contexts.WithCRE(t.Context(), contexts.CRE{Owner: "owner-id", Workflow: "foo"})
171+
ctx = contexts.WithChainSelector(ctx, 1234)
172+
overrideCtx := contexts.WithCRE(t.Context(), contexts.CRE{Owner: "owner-id", Workflow: "test-wf-id"})
173+
overrideCtx = contexts.WithChainSelector(overrideCtx, 1234)
174+
175+
// None allowed by default
176+
got, err := limit.GetOrDefault(ctx, DefaultGetter)
177+
require.NoError(t, err)
178+
require.False(t, got)
179+
got, err = limit.GetOrDefault(overrideCtx, DefaultGetter)
180+
require.NoError(t, err)
181+
require.False(t, got)
182+
183+
t.Cleanup(reinit) // restore default vars
184+
185+
// Org override to allow
186+
t.Setenv(envNameSettings, `{
187+
"workflow": {
188+
"test-wf-id": {
189+
"PerWorkflow": {
190+
"ChainAllowed": {
191+
"Values": {
192+
"1234": "true"
193+
}
194+
}
195+
}
196+
}
197+
}
198+
}`)
199+
reinit() // set default vars
200+
got, err = limit.GetOrDefault(ctx, DefaultGetter)
201+
require.NoError(t, err)
202+
require.False(t, got)
203+
got, err = limit.GetOrDefault(overrideCtx, DefaultGetter)
204+
require.NoError(t, err)
205+
require.True(t, got)
206+
207+
// Org override to allow by default, but disallow some
208+
t.Setenv(envNameSettings, `{
209+
"workflow": {
210+
"test-wf-id": {
211+
"PerWorkflow": {
212+
"ChainAllowed": {
213+
"Default": true,
214+
"Values": {
215+
"1234": "false"
216+
}
217+
}
218+
}
219+
}
220+
}
221+
}`)
222+
reinit() // set default vars
223+
got, err = limit.GetOrDefault(ctx, DefaultGetter)
224+
require.NoError(t, err)
225+
require.False(t, got)
226+
got, err = limit.GetOrDefault(overrideCtx, DefaultGetter)
227+
require.NoError(t, err)
228+
require.False(t, got)
229+
got, err = limit.GetOrDefault(contexts.WithChainSelector(overrideCtx, 42), DefaultGetter)
230+
require.NoError(t, err)
231+
require.True(t, got)
232+
}

pkg/settings/json.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ import (
88
"io/fs"
99
"maps"
1010
"slices"
11+
"strconv"
1112
"strings"
1213
)
1314

1415
// CombineJSONFiles reads a set of JSON config files and combines them in to one file. The expected inputs are:
15-
// - global.json
16-
// - org/*.json
17-
// - owner/*.json
18-
// - workflow/*.json
16+
// - global.json
17+
// - org/*.json
18+
// - owner/*.json
19+
// - workflow/*.json
20+
//
1921
// The directory and file names translate to keys in the JSON structure, while the file extensions are discarded.
2022
// For example: owner/0x1234.json:Foo.Bar becomes owner.0x1234.Foo.Bar
2123
func CombineJSONFiles(files fs.FS) ([]byte, error) {
@@ -152,11 +154,17 @@ func (s *jsonSettings) get(key string) (string, error) {
152154
}
153155

154156
field := parts[len(parts)-1]
155-
switch t := m[field].(type) {
156-
case string:
157-
return t, nil
158-
case json.Number:
159-
return t.String(), nil
157+
if val, ok := m[field]; ok {
158+
switch t := val.(type) {
159+
case string:
160+
return t, nil
161+
case json.Number:
162+
return t.String(), nil
163+
case bool:
164+
return strconv.FormatBool(t), nil
165+
default:
166+
return "", fmt.Errorf("non-string value: %s: %t(%v)", key, val, val)
167+
}
160168
}
161169
return "", nil // no value
162170
}
@@ -166,7 +174,7 @@ type jsonGetter struct {
166174
}
167175

168176
// NewJSONGetter returns a static Getter backed by the given JSON.
169-
//TODO https://smartcontract-it.atlassian.net/browse/CAPPL-775
177+
// TODO https://smartcontract-it.atlassian.net/browse/CAPPL-775
170178
// NewJSONRegistry with polling & subscriptions
171179
func NewJSONGetter(b []byte) (Getter, error) {
172180
s, err := newJSONSettings(b)

pkg/settings/keys.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ func (s Scope) rawKeys(ctx context.Context, key string) (keys []string, err erro
1717
if i.IsTenantRequired() {
1818
err = errors.Join(err, fmt.Errorf("empty %s key", i))
1919
}
20+
} else {
21+
keys = append(keys, i.String()+"."+tenant+"."+key)
2022
}
21-
keys = append(keys, i.String()+"."+tenant+"."+key)
2223
}
2324
keys = append(keys, ScopeGlobal.String()+"."+key) // ScopeGlobal
2425
return

pkg/settings/limits/bound.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,8 @@ func newBoundLimiter[N Number](f Factory, bound settings.Setting[N]) (BoundLimit
5454
updater: newUpdater[N](nil, func(ctx context.Context) (N, error) {
5555
return bound.GetOrDefault(ctx, f.Settings)
5656
}, nil),
57-
defaultBound: bound.DefaultValue,
58-
key: bound.Key,
59-
scope: bound.Scope,
57+
key: bound.Key,
58+
scope: bound.Scope,
6059
}
6160
b.updater.recordLimit = func(ctx context.Context, n N) { b.recordBound(ctx, n) }
6261

@@ -115,7 +114,6 @@ func newBoundLimiter[N Number](f Factory, bound settings.Setting[N]) (BoundLimit
115114

116115
type boundLimiter[N Number] struct {
117116
*updater[N]
118-
defaultBound N
119117

120118
key string // optional
121119
scope settings.Scope

pkg/settings/limits/errors.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,24 @@ func (e ErrorQueueFull) Error() string {
144144
}
145145

146146
var ErrQueueEmpty = fmt.Errorf("queue is empty")
147+
148+
type ErrorNotAllowed struct {
149+
Key string
150+
151+
Scope settings.Scope
152+
Tenant string
153+
}
154+
155+
func (e ErrorNotAllowed) GRPCStatus() *status.Status {
156+
return status.New(codes.PermissionDenied, e.Error())
157+
}
158+
159+
func (e ErrorNotAllowed) Is(target error) bool {
160+
_, ok := target.(ErrorNotAllowed) //nolint:errcheck // implementing errors.Is
161+
return ok
162+
}
163+
164+
func (e ErrorNotAllowed) Error() string {
165+
which, who := errArgs(e.Key, e.Scope, e.Tenant)
166+
return fmt.Sprintf("%slimited%s: not allowed", which, who)
167+
}

pkg/settings/limits/factory.go

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ func (f Factory) NewRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter
3131

3232
// MakeRateLimiter creates a RateLimiter for the given rate and configured by the Factory.
3333
// If Meter is set, the following metrics will be emitted
34-
// - rate.*.limit - float gauge
35-
// - rate.*.burst - int gauge
36-
// - rate.*.usage - int counter
37-
// - rate.*.denied - int histogram
34+
// - rate.*.limit - float gauge
35+
// - rate.*.burst - int gauge
36+
// - rate.*.usage - int counter
37+
// - rate.*.denied - int histogram
3838
func (f Factory) MakeRateLimiter(rate settings.Setting[config.Rate]) (RateLimiter, error) {
3939
if rate.Scope == settings.ScopeGlobal {
4040
return f.globalRateLimiter(rate)
@@ -49,10 +49,11 @@ func (f Factory) NewTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLi
4949

5050
// MakeTimeLimiter returns a TimeLimiter for given timeout, and configured by the Factory.
5151
// If Meter is set, the following metrics will be emitted
52-
// - time.*.limit - float gauge
53-
// - time.*.runtime - float gauge
54-
// - time.*.success - int counter
55-
// - time.*.timeout - int counter
52+
// - time.*.limit - float gauge
53+
// - time.*.runtime - float gauge
54+
// - time.*.success - int counter
55+
// - time.*.timeout - int counter
56+
//
5657
// Note: Unit will be ignored. All TimeLimiters emit seconds as "s".
5758
func (f Factory) MakeTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLimiter, error) {
5859
return f.newTimeLimiter(timeout)
@@ -65,10 +66,10 @@ func NewResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (Res
6566

6667
// MakeResourcePoolLimiter returns a ResourcePoolLimiter for the given limit, and configured by the Factory.
6768
// If Meter is set, the following metrics will be emitted
68-
// - resource.*.limit - gauge
69-
// - resource.*.usage - gauge
70-
// - resource.*.amount - histogram
71-
// - resource.*.denied - histogram
69+
// - resource.*.limit - gauge
70+
// - resource.*.usage - gauge
71+
// - resource.*.amount - histogram
72+
// - resource.*.denied - histogram
7273
func MakeResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (ResourcePoolLimiter[N], error) {
7374
if limit.Scope == settings.ScopeGlobal {
7475
return newGlobalResourcePoolLimiter(f, limit)
@@ -78,21 +79,32 @@ func MakeResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]) (Re
7879

7980
// MakeBoundLimiter returns a BoundLimiter for the given bound and configured by the Factory.
8081
// If Meter is set, the following metrics will be emitted
81-
// - bound.*.limit - gauge
82-
// - bound.*.usage - histogram
83-
// - bound.*.denied - histogram
82+
// - bound.*.limit - gauge
83+
// - bound.*.usage - histogram
84+
// - bound.*.denied - histogram
8485
func MakeBoundLimiter[N Number](f Factory, bound settings.Setting[N]) (BoundLimiter[N], error) {
8586
return newBoundLimiter(f, bound)
8687
}
8788

8889
// MakeQueueLimiter returns a QueueLimiter for the given limit and configured by the Factory.
8990
// If Meter is set, the following metrics will be emitted
90-
// - queue.*.limit - int gauge
91-
// - queue.*.usage - int gauge
92-
// - queue.*.denied - int histogram
91+
// - queue.*.limit - int gauge
92+
// - queue.*.usage - int gauge
93+
// - queue.*.denied - int histogram
9394
func MakeQueueLimiter[T any](f Factory, limit settings.Setting[int]) (QueueLimiter[T], error) {
9495
if limit.Scope == settings.ScopeGlobal {
9596
return newUnscopedQueue[T](f, limit)
9697
}
9798
return newScopedQueue[T](f, limit)
9899
}
100+
101+
// MakeGateLimiter returns a GateLimiter for the given limit and configured by the factory.
102+
// If Meter is set, the following metrics will be emitted
103+
// - gate.*.limit - int gauge
104+
// - gate.*.usage - int counter
105+
// - gate.*.denied - int counter
106+
//
107+
// OPT: accept an interface for limit
108+
func MakeGateLimiter(f Factory, limit settings.SettingMap[bool]) (GateLimiter, error) {
109+
return newGateLimiter(f, limit)
110+
}

0 commit comments

Comments
 (0)