Skip to content

Commit d27a716

Browse files
authored
added plugin state that can be used to share data between different extension point of a plugin (#1299)
* added plugin state that can be used to share data between different extension point of a plugin Signed-off-by: Nir Rozenbaum <[email protected]> * fix license header Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent bc2d20f commit d27a716

File tree

4 files changed

+186
-23
lines changed

4 files changed

+186
-23
lines changed

pkg/epp/plugins/plugin_state.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package plugins
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
"time"
24+
25+
"sigs.k8s.io/controller-runtime/pkg/log"
26+
27+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
28+
)
29+
30+
const (
31+
// stalenessThreshold defines the threshold for considering data as stale.
32+
// if data of a request hasn't been read/write in the last "stalenessThreshold", it is considered as stale data
33+
// and will be cleaned in the next cleanup cycle.
34+
stalenessThreshold = time.Minute * 5
35+
// cleanupInterval defines the periodic interval that the cleanup go routine uses to check for stale data.
36+
cleanupInterval = time.Minute
37+
)
38+
39+
// NewPluginState initializes a new PluginState and returns its pointer.
40+
func NewPluginState(ctx context.Context) *PluginState {
41+
pluginState := &PluginState{}
42+
go pluginState.cleanup(ctx)
43+
return pluginState
44+
}
45+
46+
// PluginState provides a mechanism for plugins to store and retrieve arbitrary data by multiple extension points.
47+
// Data stored by the plugin in one extension point can be written, read or altered by another extension point.
48+
// The data stored in PluginState is always stored in the context of a given request.
49+
// If the data hasn't been accessed during "stalenessThreshold", it is cleaned by a cleanup internal mechanism.
50+
//
51+
// Note: PluginState uses a sync.Map to back the storage, because it is thread safe.
52+
// It's aimed to optimize for the "write once and read many times" scenarios.
53+
type PluginState struct {
54+
// key: RequestID, value: map[StateKey]StateData
55+
storage sync.Map
56+
// key: RequestID, value: time.Time
57+
requestToLastAccessTime sync.Map
58+
}
59+
60+
// Read retrieves data with the given "key" in the context of "requestID" from PluginState.
61+
// If the key is not present, ErrNotFound is returned.
62+
func (s *PluginState) Read(requestID string, key StateKey) (StateData, error) {
63+
s.requestToLastAccessTime.Store(requestID, time.Now())
64+
stateMap, ok := s.storage.Load(requestID)
65+
if !ok {
66+
return nil, ErrNotFound
67+
}
68+
69+
stateData := stateMap.(map[StateKey]StateData)
70+
if value, ok := stateData[key]; ok {
71+
return value, nil
72+
}
73+
74+
return nil, ErrNotFound
75+
}
76+
77+
// Write stores the given "val" in PluginState with the given "key" in the context of the given "requestID".
78+
func (s *PluginState) Write(requestID string, key StateKey, val StateData) {
79+
s.requestToLastAccessTime.Store(requestID, time.Now())
80+
var stateData map[StateKey]StateData
81+
stateMap, ok := s.storage.Load(requestID)
82+
if ok {
83+
stateData = stateMap.(map[StateKey]StateData)
84+
} else {
85+
stateData = map[StateKey]StateData{}
86+
}
87+
88+
stateData[key] = val
89+
90+
s.storage.Store(requestID, stateData)
91+
}
92+
93+
// Delete deletes data associated with the given requestID.
94+
// It is possible to call Delete explicitly when the handling of a request is completed
95+
// or alternatively, if the request failed during its processing, a cleanup goroutine will
96+
// clean data of stale requests.
97+
func (s *PluginState) Delete(requestID string) {
98+
s.storage.Delete(requestID)
99+
s.requestToLastAccessTime.Delete(requestID)
100+
}
101+
102+
// cleanup periodically deletes data associated with the given requestID.
103+
func (s *PluginState) cleanup(ctx context.Context) {
104+
ticker := time.NewTicker(cleanupInterval)
105+
defer ticker.Stop()
106+
for {
107+
select {
108+
case <-ctx.Done():
109+
log.FromContext(ctx).V(logutil.DEFAULT).Info("Shutting down plugin state cleanup")
110+
return
111+
case <-ticker.C:
112+
s.requestToLastAccessTime.Range(func(k, v any) bool {
113+
requestID := k.(string)
114+
lastAccessTime := v.(time.Time)
115+
if time.Since(lastAccessTime) > stalenessThreshold {
116+
s.Delete(requestID) // cleanup stale requests (this is safe in sync.Map)
117+
}
118+
return true
119+
})
120+
}
121+
}
122+
123+
}
124+
125+
// ReadPluginStateKey retrieves data with the given key from PluginState and asserts it to type T.
126+
// Returns an error if the key is not found or the type assertion fails.
127+
func ReadPluginStateKey[T StateData](state *PluginState, requestID string, key StateKey) (T, error) {
128+
var zero T
129+
130+
raw, err := state.Read(requestID, key)
131+
if err != nil {
132+
return zero, err
133+
}
134+
135+
val, ok := raw.(T)
136+
if !ok {
137+
return zero, fmt.Errorf("unexpected type for key %q: got %T", key, raw)
138+
}
139+
140+
return val, nil
141+
}

pkg/epp/plugins/shared_state.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package plugins
18+
19+
import (
20+
"errors"
21+
)
22+
23+
var (
24+
// ErrNotFound is the not found error message.
25+
ErrNotFound = errors.New("not found")
26+
)
27+
28+
// StateKey is the type of keys stored in PluginState.
29+
type StateKey string
30+
31+
// StateData is a generic type for arbitrary data stored in PluginState.
32+
type StateData interface {
33+
// Clone is an interface to make a copy of StateData.
34+
Clone() StateData
35+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (s ServerID) String() string {
9696
}
9797

9898
// compile-time type validation
99-
var _ types.StateData = &SchedulingContextState{}
99+
var _ plugins.StateData = &SchedulingContextState{}
100100

101101
// SchedulingContextState is the state of this plugin to be used during a scheduling cycle.
102102
type SchedulingContextState struct {
@@ -106,7 +106,7 @@ type SchedulingContextState struct {
106106
PrefixCacheServers map[ServerID]int
107107
}
108108

109-
func (s *SchedulingContextState) Clone() types.StateData {
109+
func (s *SchedulingContextState) Clone() plugins.StateData {
110110
prefixHashes := make([]BlockHash, len(s.PrefixHashes))
111111
copy(prefixHashes, s.PrefixHashes)
112112
prefixCacheServers := make(map[ServerID]int, len(s.PrefixCacheServers))
@@ -180,7 +180,7 @@ func (m *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques
180180
PrefixCacheServers: m.matchLongestPrefix(ctx, hashes),
181181
}
182182

183-
cycleState.Write(types.StateKey(m.TypedName().Type), state)
183+
cycleState.Write(plugins.StateKey(m.TypedName().Type), state)
184184
loggerTrace.Info(fmt.Sprintf("cached servers: %+v", state.PrefixCacheServers), "hashes", state.PrefixHashes)
185185
// calculate the scores of pods
186186
scores := make(map[types.Pod]float64, len(pods))

pkg/epp/scheduling/types/cycle_state.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,12 @@ limitations under the License.
1717
package types
1818

1919
import (
20-
"errors"
2120
"fmt"
2221
"sync"
23-
)
2422

25-
var (
26-
// ErrNotFound is the not found error message.
27-
ErrNotFound = errors.New("not found")
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2824
)
2925

30-
// StateData is a generic type for arbitrary data stored in CycleState.
31-
type StateData interface {
32-
// Clone is an interface to make a copy of StateData.
33-
Clone() StateData
34-
}
35-
36-
// StateKey is the type of keys stored in CycleState.
37-
type StateKey string
38-
3926
// NewCycleState initializes a new CycleState and returns its pointer.
4027
func NewCycleState() *CycleState {
4128
return &CycleState{}
@@ -55,30 +42,30 @@ type CycleState struct {
5542
// present, ErrNotFound is returned.
5643
//
5744
// See CycleState for notes on concurrency.
58-
func (c *CycleState) Read(key StateKey) (StateData, error) {
45+
func (c *CycleState) Read(key plugins.StateKey) (plugins.StateData, error) {
5946
if v, ok := c.storage.Load(key); ok {
60-
return v.(StateData), nil
47+
return v.(plugins.StateData), nil
6148
}
62-
return nil, ErrNotFound
49+
return nil, plugins.ErrNotFound
6350
}
6451

6552
// Write stores the given "val" in CycleState with the given "key".
6653
//
6754
// See CycleState for notes on concurrency.
68-
func (c *CycleState) Write(key StateKey, val StateData) {
55+
func (c *CycleState) Write(key plugins.StateKey, val plugins.StateData) {
6956
c.storage.Store(key, val)
7057
}
7158

7259
// Delete deletes data with the given key from CycleState.
7360
//
7461
// See CycleState for notes on concurrency.
75-
func (c *CycleState) Delete(key StateKey) {
62+
func (c *CycleState) Delete(key plugins.StateKey) {
7663
c.storage.Delete(key)
7764
}
7865

7966
// ReadCycleStateKey retrieves data with the given key from CycleState and asserts it to type T.
8067
// Returns an error if the key is not found or the type assertion fails.
81-
func ReadCycleStateKey[T StateData](c *CycleState, key StateKey) (T, error) {
68+
func ReadCycleStateKey[T plugins.StateData](c *CycleState, key plugins.StateKey) (T, error) {
8269
var zero T
8370

8471
raw, err := c.Read(key)

0 commit comments

Comments
 (0)