Skip to content

Commit 2d9bfa8

Browse files
elevrankfswain
authored andcommitted
(feat) initial types and interfaces for pluggable data layer (kubernetes-sigs#1154)
* initial data layer types and interfaces Signed-off-by: Etai Lev Ran <[email protected]> * conform to Go boilerplate (missing empty lines) Signed-off-by: Etai Lev Ran <[email protected]> * address subset of review comments Signed-off-by: Etai Lev Ran <[email protected]> * replace mutex+map with sync.Map Signed-off-by: Etai Lev Ran <[email protected]> * Address review comments: - DataSource is no longer tracking Endpoints. - Removed unused interfaces. - Changes to exported scope. - Move Addressable to podinfo.go. Signed-off-by: Etai Lev Ran <[email protected]> * remove duplicate word Signed-off-by: Etai Lev Ran <[email protected]> --------- Signed-off-by: Etai Lev Ran <[email protected]>
1 parent c9c0c0b commit 2d9bfa8

File tree

5 files changed

+448
-0
lines changed

5 files changed

+448
-0
lines changed

pkg/epp/datalayer/attributemap.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
"sync"
21+
)
22+
23+
// Cloneable types support cloning of the value.
24+
type Cloneable interface {
25+
Clone() Cloneable
26+
}
27+
28+
// AttributeMap is used to store flexible metadata or traits
29+
// across different aspects of an inference server.
30+
// Stored values must be Cloneable.
31+
type AttributeMap interface {
32+
Put(string, Cloneable)
33+
Get(string) (Cloneable, bool)
34+
Keys() []string
35+
}
36+
37+
// Attributes provides a goroutine safe implementation of AttributeMap.
38+
type Attributes struct {
39+
data sync.Map
40+
}
41+
42+
// NewAttributes return a new attribute map instance.
43+
func NewAttributes() *Attributes {
44+
return &Attributes{
45+
data: sync.Map{},
46+
}
47+
}
48+
49+
// Put adds (or updates) an attribute in the map.
50+
func (a *Attributes) Put(key string, value Cloneable) {
51+
a.data.Store(key, value) // TODO: Clone into map?
52+
}
53+
54+
// Get returns an attribute from the map.
55+
func (a *Attributes) Get(key string) (Cloneable, bool) {
56+
val, ok := a.data.Load(key)
57+
if !ok {
58+
return nil, false
59+
}
60+
if cloneable, ok := val.(Cloneable); ok {
61+
return cloneable.Clone(), true
62+
}
63+
return nil, false // shouldn't happen since Put accepts Cloneables only
64+
}
65+
66+
// Keys returns an array of all the names of attributes stored in the map.
67+
func (a *Attributes) Keys() []string {
68+
keys := []string{}
69+
a.data.Range(func(key, _ any) bool {
70+
if k, ok := key.(string); ok {
71+
keys = append(keys, k)
72+
}
73+
return true // continue iteration
74+
})
75+
return keys
76+
}
77+
78+
// Clone the attributes object itself.
79+
func (a *Attributes) Clone() *Attributes {
80+
cloned := &Attributes{
81+
data: sync.Map{},
82+
}
83+
84+
a.data.Range(func(k, v interface{}) bool {
85+
cloned.data.Store(k, v)
86+
return true
87+
})
88+
return cloned
89+
}

pkg/epp/datalayer/datasource.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
"errors"
21+
"fmt"
22+
"reflect"
23+
"sync"
24+
)
25+
26+
// DataSource is an interface required from all data layer data collection
27+
// sources.
28+
type DataSource interface {
29+
// Name returns the name of this datasource.
30+
Name() string
31+
32+
// AddExtractor adds an extractor to the data source.
33+
// The extractor will be called whenever the Collector might
34+
// have some new raw information regarding an endpoint.
35+
// The Extractor's expected input type should be validated against
36+
// the data source's output type upon registration.
37+
AddExtractor(extractor Extractor) error
38+
39+
// Collect is triggered by the data layer framework to fetch potentially new
40+
// data for an endpoint. It passes retrieved data to registered Extractors.
41+
Collect(ep Endpoint)
42+
}
43+
44+
// Extractor is used to convert raw data into relevant data layer information
45+
// for an endpoint. They are called by data sources whenever new data might be
46+
// available. Multiple Extractors can be registered with a source. Extractors
47+
// are expected to save their output with an endpoint so it becomes accessible
48+
// to consumers in other subsystem of the inference gateway (e.g., when making
49+
// scheduling decisions).
50+
type Extractor interface {
51+
// Name returns the name of the extractor.
52+
Name() string
53+
54+
// ExpectedType defines the type expected by the extractor. It must match
55+
// the output type of the data source where the extractor is registered.
56+
ExpectedInputType() reflect.Type
57+
58+
// Extract transforms the data source output into a concrete attribute that
59+
// is stored on the given endpoint.
60+
Extract(data any, ep Endpoint)
61+
}
62+
63+
var (
64+
// defaultDataSources is the system default data source registry.
65+
defaultDataSources = DataSourceRegistry{}
66+
)
67+
68+
// DataSourceRegistry stores named data sources and makes them
69+
// accessible to other subsystems in the inference gateway.
70+
type DataSourceRegistry struct {
71+
sources sync.Map
72+
}
73+
74+
// Register adds a source to the registry.
75+
func (dsr *DataSourceRegistry) Register(src DataSource) error {
76+
if src == nil {
77+
return errors.New("unable to register a nil data source")
78+
}
79+
80+
if _, found := dsr.sources.Load(src.Name()); found {
81+
return fmt.Errorf("unable to register duplicate data source: %s", src.Name())
82+
}
83+
dsr.sources.Store(src.Name(), src)
84+
return nil
85+
}
86+
87+
// GetNamedSource returns the named data source, if found.
88+
func (dsr *DataSourceRegistry) GetNamedSource(name string) (DataSource, bool) {
89+
if name == "" {
90+
return nil, false
91+
}
92+
93+
if val, found := dsr.sources.Load(name); found {
94+
if ds, ok := val.(DataSource); ok {
95+
return ds, true
96+
} // ignore type assertion failures and fall through
97+
}
98+
return nil, false
99+
}
100+
101+
// GetSources returns all sources registered.
102+
func (dsr *DataSourceRegistry) GetSources() []DataSource {
103+
sources := []DataSource{}
104+
dsr.sources.Range(func(_, val any) bool {
105+
if ds, ok := val.(DataSource); ok {
106+
sources = append(sources, ds)
107+
}
108+
return true // continue iteration
109+
})
110+
return sources
111+
}
112+
113+
// RegisterSource adds the data source to the default registry.
114+
func RegisterSource(src DataSource) error {
115+
return defaultDataSources.Register(src)
116+
}
117+
118+
// GetNamedSource returns the named source from the default registry,
119+
// if found.
120+
func GetNamedSource(name string) (DataSource, bool) {
121+
return defaultDataSources.GetNamedSource(name)
122+
}
123+
124+
// GetSources returns all sources in the default registry.
125+
func GetSources() []DataSource {
126+
return defaultDataSources.GetSources()
127+
}
128+
129+
// ValidateExtractorType checks if an extractor can handle
130+
// the collector's output.
131+
func ValidateExtractorType(collectorOutputType, extractorInputType reflect.Type) error {
132+
if collectorOutputType == extractorInputType {
133+
return nil
134+
}
135+
136+
// extractor accepts anything (i.e., interface{})
137+
if extractorInputType.Kind() == reflect.Interface && extractorInputType.NumMethod() == 0 {
138+
return nil
139+
}
140+
141+
// check if collector output implements extractor input interface
142+
if collectorOutputType.Implements(extractorInputType) {
143+
return nil
144+
}
145+
146+
return fmt.Errorf("extractor input type %v cannot handle collector output type %v",
147+
extractorInputType, collectorOutputType)
148+
}

pkg/epp/datalayer/endpoint.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
corev1 "k8s.io/api/core/v1"
21+
)
22+
23+
// EndpointPodState allows management of the Pod related attributes.
24+
type EndpointPodState interface {
25+
GetPod() *PodInfo
26+
UpdatePod(*corev1.Pod)
27+
}
28+
29+
// EndpointMetricsState allows management of the Metrics related attributes.
30+
type EndpointMetricsState interface {
31+
GetMetrics() *Metrics
32+
UpdateMetrics(*Metrics)
33+
}
34+
35+
// Endpoint represents an inference serving endpoint and its related attributes.
36+
type Endpoint interface {
37+
EndpointPodState
38+
EndpointMetricsState
39+
AttributeMap
40+
}

pkg/epp/datalayer/metrics.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package datalayer
18+
19+
import (
20+
"fmt"
21+
"time"
22+
)
23+
24+
// Metrics holds the latest metrics snapshot scraped from a pod.
25+
type Metrics struct {
26+
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
27+
ActiveModels map[string]int
28+
WaitingModels map[string]int
29+
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
30+
MaxActiveModels int
31+
RunningQueueSize int
32+
WaitingQueueSize int
33+
KVCacheUsagePercent float64
34+
KvCacheMaxTokenCapacity int
35+
36+
// UpdateTime records the last time when the metrics were updated.
37+
UpdateTime time.Time
38+
}
39+
40+
// NewMetrics initializes a new empty Metrics object.
41+
func NewMetrics() *Metrics {
42+
return &Metrics{
43+
ActiveModels: make(map[string]int),
44+
WaitingModels: make(map[string]int),
45+
}
46+
}
47+
48+
// String returns a string with all Metric information
49+
func (m *Metrics) String() string {
50+
if m == nil {
51+
return ""
52+
}
53+
return fmt.Sprintf("%+v", *m)
54+
}
55+
56+
// Clone creates a copy of Metrics and returns its pointer.
57+
// Clone returns nil if the object being cloned is nil.
58+
func (m *Metrics) Clone() *Metrics {
59+
if m == nil {
60+
return nil
61+
}
62+
activeModels := make(map[string]int, len(m.ActiveModels))
63+
for key, value := range m.ActiveModels {
64+
activeModels[key] = value
65+
}
66+
waitingModels := make(map[string]int, len(m.WaitingModels))
67+
for key, value := range m.WaitingModels {
68+
waitingModels[key] = value
69+
}
70+
return &Metrics{
71+
ActiveModels: activeModels,
72+
WaitingModels: waitingModels,
73+
MaxActiveModels: m.MaxActiveModels,
74+
RunningQueueSize: m.RunningQueueSize,
75+
WaitingQueueSize: m.WaitingQueueSize,
76+
KVCacheUsagePercent: m.KVCacheUsagePercent,
77+
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,
78+
UpdateTime: m.UpdateTime,
79+
}
80+
}

0 commit comments

Comments
 (0)