Skip to content

Commit 7863d9a

Browse files
committed
DRA scheduler: refactor CEL compilation cache
A better place is the cel package because a) the name can become shorter and b) it is tightly coupled with the compiler there. Moving the compilation into the cache simplifies the callers.
1 parent 6f07fa3 commit 7863d9a

File tree

8 files changed

+188
-84
lines changed

8 files changed

+188
-84
lines changed

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
3939
"k8s.io/client-go/util/retry"
4040
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
41+
"k8s.io/dynamic-resource-allocation/cel"
4142
"k8s.io/dynamic-resource-allocation/resourceclaim"
4243
"k8s.io/dynamic-resource-allocation/structured"
4344
"k8s.io/klog/v2"
@@ -111,7 +112,7 @@ type DynamicResources struct {
111112
clientset kubernetes.Interface
112113
classLister resourcelisters.DeviceClassLister
113114
sliceLister resourcelisters.ResourceSliceLister
114-
celCache *structured.CELCache
115+
celCache *cel.Cache
115116
allocatedDevices *allocatedDevices
116117

117118
// claimAssumeCache enables temporarily storing a newer claim object
@@ -192,7 +193,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
192193
// This is a LRU cache for compiled CEL expressions. The most
193194
// recent 10 of them get reused across different scheduling
194195
// cycles.
195-
celCache: structured.NewCELCache(10),
196+
celCache: cel.NewCache(10),
196197

197198
allocatedDevices: newAllocatedDevices(logger),
198199
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cel
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/utils/keymutex"
23+
"k8s.io/utils/lru"
24+
)
25+
26+
// Cache is a thread-safe LRU cache for a compiled CEL expression.
27+
type Cache struct {
28+
compileMutex keymutex.KeyMutex
29+
cacheMutex sync.RWMutex
30+
cache *lru.Cache
31+
}
32+
33+
// NewCache creates a cache. The maximum number of entries determines
34+
// how many entries are cached at most before dropping the oldest
35+
// entry.
36+
func NewCache(maxCacheEntries int) *Cache {
37+
return &Cache{
38+
compileMutex: keymutex.NewHashed(0),
39+
cache: lru.New(maxCacheEntries),
40+
}
41+
}
42+
43+
// GetOrCompile checks whether the cache already has a compilation result
44+
// and returns that if available. Otherwise it compiles, stores successful
45+
// results and returns the new result.
46+
func (c *Cache) GetOrCompile(expression string) CompilationResult {
47+
// Compiling a CEL expression is expensive enough that it is cheaper
48+
// to lock a mutex than doing it several times in parallel.
49+
c.compileMutex.LockKey(expression)
50+
//nolint:errcheck // Only returns an error for unknown keys, which isn't the case here.
51+
defer c.compileMutex.UnlockKey(expression)
52+
53+
cached := c.get(expression)
54+
if cached != nil {
55+
return *cached
56+
}
57+
58+
expr := GetCompiler().CompileCELExpression(expression, Options{})
59+
if expr.Error == nil {
60+
c.add(expression, &expr)
61+
}
62+
return expr
63+
}
64+
65+
func (c *Cache) add(expression string, expr *CompilationResult) {
66+
c.cacheMutex.Lock()
67+
defer c.cacheMutex.Unlock()
68+
c.cache.Add(expression, expr)
69+
}
70+
71+
func (c *Cache) get(expression string) *CompilationResult {
72+
c.cacheMutex.RLock()
73+
defer c.cacheMutex.RUnlock()
74+
expr, found := c.cache.Get(expression)
75+
if !found {
76+
return nil
77+
}
78+
return expr.(*CompilationResult)
79+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cel
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"testing"
23+
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func TestCacheSemantic(t *testing.T) {
29+
// Cache two entries.
30+
//
31+
// Entries are comparable structs with pointers inside. Each
32+
// compilation leads to different pointers, so the entries can be
33+
// compared by value to figure out whether an entry was cached or
34+
// compiled anew.
35+
cache := NewCache(2)
36+
37+
// Successful compilations get cached.
38+
resultTrue := cache.GetOrCompile("true")
39+
require.Nil(t, resultTrue.Error)
40+
resultTrueAgain := cache.GetOrCompile("true")
41+
if resultTrue != resultTrueAgain {
42+
t.Fatal("result of compiling `true` should have been cached")
43+
}
44+
45+
// Unsuccessful ones don't.
46+
resultFailed := cache.GetOrCompile("no-such-variable")
47+
require.NotNil(t, resultFailed.Error)
48+
resultFailedAgain := cache.GetOrCompile("no-such-variable")
49+
if resultFailed == resultFailedAgain {
50+
t.Fatal("result of compiling `no-such-variable` should not have been cached")
51+
}
52+
53+
// The cache can hold a second result.
54+
resultFalse := cache.GetOrCompile("false")
55+
require.Nil(t, resultFalse.Error)
56+
resultFalseAgain := cache.GetOrCompile("false")
57+
if resultFalse != resultFalseAgain {
58+
t.Fatal("result of compiling `false` should have been cached")
59+
}
60+
resultTrueAgain = cache.GetOrCompile("true")
61+
if resultTrue != resultTrueAgain {
62+
t.Fatal("result of compiling `true` should still have been cached")
63+
}
64+
65+
// A third result pushes out the least recently used one.
66+
resultOther := cache.GetOrCompile("false && true")
67+
require.Nil(t, resultFalse.Error)
68+
resultOtherAgain := cache.GetOrCompile("false && true")
69+
if resultOther != resultOtherAgain {
70+
t.Fatal("result of compiling `false && true` should have been cached")
71+
}
72+
resultFalseAgain = cache.GetOrCompile("false")
73+
if resultFalse == resultFalseAgain {
74+
t.Fatal("result of compiling `false` should have been evicted from the cache")
75+
}
76+
}
77+
78+
func TestCacheConcurrency(t *testing.T) {
79+
// There's no guarantee that concurrent use of the cache would really
80+
// trigger the race detector in `go test -race`, but in practice
81+
// it does when not using the cacheMutex.
82+
//
83+
// The compileMutex ony affects performance and thus cannot be tested
84+
// without benchmarking.
85+
numWorkers := 10
86+
87+
cache := NewCache(2)
88+
var wg sync.WaitGroup
89+
wg.Add(numWorkers)
90+
for i := 0; i < numWorkers; i++ {
91+
go func(i int) {
92+
defer wg.Done()
93+
result := cache.GetOrCompile(fmt.Sprintf("%d == %d", i, i))
94+
assert.Nil(t, result.Error)
95+
}(i)
96+
}
97+
wg.Wait()
98+
}

staging/src/k8s.io/dynamic-resource-allocation/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ godebug default=go1.23
88

99
require (
1010
github.com/blang/semver/v4 v4.0.0
11-
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
1211
github.com/google/cel-go v0.21.0
1312
github.com/google/go-cmp v0.6.0
1413
github.com/onsi/gomega v1.33.1
@@ -37,6 +36,7 @@ require (
3736
github.com/go-openapi/jsonreference v0.20.2 // indirect
3837
github.com/go-openapi/swag v0.23.0 // indirect
3938
github.com/gogo/protobuf v1.3.2 // indirect
39+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
4040
github.com/golang/protobuf v1.5.4 // indirect
4141
github.com/google/gnostic-models v0.6.8 // indirect
4242
github.com/google/gofuzz v1.2.0 // indirect

staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
draapi "k8s.io/dynamic-resource-allocation/api"
3131
"k8s.io/dynamic-resource-allocation/cel"
3232
"k8s.io/klog/v2"
33-
"k8s.io/utils/keymutex"
3433
"k8s.io/utils/ptr"
3534
)
3635

@@ -46,8 +45,7 @@ type Allocator struct {
4645
allocatedDevices sets.Set[DeviceID]
4746
classLister resourcelisters.DeviceClassLister
4847
slices []*resourceapi.ResourceSlice
49-
celCache *CELCache
50-
celMutex keymutex.KeyMutex
48+
celCache *cel.Cache
5149
}
5250

5351
// NewAllocator returns an allocator for a certain set of claims or an error if
@@ -60,7 +58,7 @@ func NewAllocator(ctx context.Context,
6058
allocatedDevices sets.Set[DeviceID],
6159
classLister resourcelisters.DeviceClassLister,
6260
slices []*resourceapi.ResourceSlice,
63-
celCache *CELCache,
61+
celCache *cel.Cache,
6462
) (*Allocator, error) {
6563
return &Allocator{
6664
adminAccessEnabled: adminAccessEnabled,
@@ -69,7 +67,6 @@ func NewAllocator(ctx context.Context,
6967
classLister: classLister,
7068
slices: slices,
7169
celCache: celCache,
72-
celMutex: keymutex.NewHashed(0),
7370
}, nil
7471
}
7572

@@ -78,25 +75,6 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim {
7875
return a.claimsToAllocate
7976
}
8077

81-
func (a *Allocator) compileCELExpression(expression string) cel.CompilationResult {
82-
// Compiling a CEL expression is expensive enough that it is cheaper
83-
// to lock a mutex than doing it several times in parallel.
84-
a.celMutex.LockKey(expression)
85-
//nolint:errcheck // Only returns an error for unknown keys, which isn't the case here.
86-
defer a.celMutex.UnlockKey(expression)
87-
88-
cached := a.celCache.get(expression)
89-
if cached != nil {
90-
return *cached
91-
}
92-
93-
expr := cel.GetCompiler().CompileCELExpression(expression, cel.Options{})
94-
if expr.Error == nil {
95-
a.celCache.add(expression, &expr)
96-
}
97-
return expr
98-
}
99-
10078
// Allocate calculates the allocation(s) for one particular node.
10179
//
10280
// It returns an error only if some fatal problem occurred. These are errors
@@ -713,7 +691,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSli
713691

714692
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
715693
for i, selector := range selectors {
716-
expr := alloc.compileCELExpression(selector.CEL.Expression)
694+
expr := alloc.celCache.GetOrCompile(selector.CEL.Expression)
717695
if expr.Error != nil {
718696
// Could happen if some future apiserver accepted some
719697
// future expression and then got downgraded. Normally

staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"k8s.io/apimachinery/pkg/runtime"
3838
"k8s.io/apimachinery/pkg/runtime/schema"
3939
"k8s.io/apimachinery/pkg/util/sets"
40+
"k8s.io/dynamic-resource-allocation/cel"
4041
"k8s.io/klog/v2/ktesting"
4142
"k8s.io/utils/ptr"
4243
)
@@ -1376,7 +1377,7 @@ func TestAllocator(t *testing.T) {
13761377
allocatedDevices := slices.Clone(tc.allocatedDevices)
13771378
slices := slices.Clone(tc.slices)
13781379

1379-
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, NewCELCache(1))
1380+
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1))
13801381
g.Expect(err).ToNot(gomega.HaveOccurred())
13811382

13821383
results, err := allocator.Allocate(ctx, tc.node)

staging/src/k8s.io/dynamic-resource-allocation/structured/celcache.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

test/integration/scheduler_perf/dra.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
utilfeature "k8s.io/apiserver/pkg/util/feature"
3535
"k8s.io/client-go/informers"
3636
"k8s.io/client-go/util/workqueue"
37+
"k8s.io/dynamic-resource-allocation/cel"
3738
"k8s.io/dynamic-resource-allocation/structured"
3839
"k8s.io/kubernetes/pkg/features"
3940
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
@@ -291,7 +292,7 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
291292
reflect.TypeOf(&v1.Node{}): true,
292293
}
293294
require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers")
294-
celCache := structured.NewCELCache(10)
295+
celCache := cel.NewCache(10)
295296

296297
// The set of nodes is assumed to be fixed at this point.
297298
nodes, err := nodeLister.List(labels.Everything())

0 commit comments

Comments
 (0)