Skip to content

Commit 8dc40a5

Browse files
authored
feat: Added a factory function for the DecisionTree filter (#1053)
* Added a factory function for the DecisionTreeFilter Signed-off-by: Shmuel Kallner <[email protected]> * Added tests of the factory function of the DecisionTreeFilter Signed-off-by: Shmuel Kallner <[email protected]> * Registered the factory function of the DecisionTreeFilter Signed-off-by: Shmuel Kallner <[email protected]> * Refactored the configuration loading Signed-off-by: Shmuel Kallner <[email protected]> --------- Signed-off-by: Shmuel Kallner <[email protected]>
1 parent 5f64c1d commit 8dc40a5

File tree

4 files changed

+396
-23
lines changed

4 files changed

+396
-23
lines changed

cmd/epp/runner/register.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
// RegisterAllPlugins registers the factory functions of all known plugins
2929
func RegisterAllPlugins() {
30+
plugins.Register(filter.DecisionTreeFilterType, filter.DecisionTreeFilterFactory)
3031
plugins.Register(filter.LeastKVCacheFilterType, filter.LeastKVCacheFilterFactory)
3132
plugins.Register(filter.LeastQueueFilterType, filter.LeastQueueFilterFactory)
3233
plugins.Register(filter.LoraAffinityFilterType, filter.LoraAffinityFilterFactory)

cmd/epp/runner/runner.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -216,29 +216,10 @@ func (r *Runner) Run(ctx context.Context) error {
216216
return err
217217
}
218218

219-
if len(*configText) != 0 || len(*configFile) != 0 {
220-
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
221-
if err != nil {
222-
setupLog.Error(err, "Failed to load the configuration")
223-
return err
224-
}
225-
226-
epp := newEppHandle()
227-
228-
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
229-
if err != nil {
230-
setupLog.Error(err, "Failed to instantiate the plugins")
231-
return err
232-
}
233-
234-
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
235-
if err != nil {
236-
setupLog.Error(err, "Failed to create Scheduler configuration")
237-
return err
238-
}
239-
240-
// Add requestControl plugins
241-
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
219+
err = r.parseConfiguration()
220+
if err != nil {
221+
setupLog.Error(err, "Failed to parse the configuration")
222+
return err
242223
}
243224

244225
// --- Initialize Core EPP Components ---
@@ -328,6 +309,31 @@ func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
328309
return scheduler, nil
329310
}
330311

312+
func (r *Runner) parseConfiguration() error {
313+
if len(*configText) != 0 || len(*configFile) != 0 {
314+
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
315+
if err != nil {
316+
return fmt.Errorf("failed to load the configuration - %w", err)
317+
}
318+
319+
epp := newEppHandle()
320+
321+
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
322+
if err != nil {
323+
return fmt.Errorf("failed to instantiate the plugins - %w", err)
324+
}
325+
326+
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
327+
if err != nil {
328+
return fmt.Errorf("failed to create Scheduler configuration - %w", err)
329+
}
330+
331+
// Add requestControl plugins
332+
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
333+
}
334+
return nil
335+
}
336+
331337
func initLogging(opts *zap.Options) {
332338
// Unless -zap-log-level is explicitly set, use -v
333339
useV := true

pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,21 @@ package filter
1818

1919
import (
2020
"context"
21+
"encoding/json"
22+
"errors"
23+
"fmt"
2124

2225
"sigs.k8s.io/controller-runtime/pkg/log"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2327
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2428
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2529
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2630
)
2731

32+
const (
33+
DecisionTreeFilterType = "decision-tree"
34+
)
35+
2836
// compile-time type assertion
2937
var _ framework.Filter = &DecisionTreeFilter{}
3038

@@ -47,6 +55,82 @@ type DecisionTreeFilter struct {
4755
NextOnSuccessOrFailure framework.Filter
4856
}
4957

58+
type decisionTreeFilterParameters struct {
59+
Current *decisionTreeFilterEntry `json:"current"`
60+
NextOnSuccess *decisionTreeFilterEntry `json:"nextOnSuccess"`
61+
NextOnFailure *decisionTreeFilterEntry `json:"nextOnFailure"`
62+
NextOnSuccessOrFailure *decisionTreeFilterEntry `json:"nextOnSuccessOrFailure"`
63+
}
64+
65+
type decisionTreeFilterEntry struct {
66+
PluginRef *string `json:"pluginRef"`
67+
DecisionTree *decisionTreeFilterParameters `json:"decisionTree"`
68+
}
69+
70+
func DecisionTreeFilterFactory(name string, rawParameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
71+
parameters := decisionTreeFilterParameters{}
72+
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
73+
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", name, err)
74+
}
75+
return loadDecisionTree(&parameters, handle)
76+
}
77+
78+
func loadDecisionTree(parameters *decisionTreeFilterParameters, handle plugins.Handle) (*DecisionTreeFilter, error) {
79+
result := &DecisionTreeFilter{}
80+
var err error
81+
82+
if parameters.Current == nil {
83+
return nil, errors.New("a current filter must be specified")
84+
}
85+
result.Current, err = loadDecisionTreeEntry(parameters.Current, handle)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
if parameters.NextOnSuccess != nil {
91+
result.NextOnSuccess, err = loadDecisionTreeEntry(parameters.NextOnSuccess, handle)
92+
if err != nil {
93+
return nil, err
94+
}
95+
}
96+
97+
if parameters.NextOnFailure != nil {
98+
result.NextOnFailure, err = loadDecisionTreeEntry(parameters.NextOnFailure, handle)
99+
if err != nil {
100+
return nil, err
101+
}
102+
}
103+
104+
if parameters.NextOnSuccessOrFailure != nil {
105+
result.NextOnSuccessOrFailure, err = loadDecisionTreeEntry(parameters.NextOnSuccessOrFailure, handle)
106+
if err != nil {
107+
return nil, err
108+
}
109+
}
110+
111+
return result, nil
112+
}
113+
114+
func loadDecisionTreeEntry(entry *decisionTreeFilterEntry, handle plugins.Handle) (framework.Filter, error) {
115+
if entry.PluginRef != nil && entry.DecisionTree != nil {
116+
return nil, errors.New("both pluginRef and decisionTree may not be specified")
117+
}
118+
119+
if entry.PluginRef != nil {
120+
instance := handle.Plugins().Plugin(*entry.PluginRef)
121+
if instance == nil {
122+
return nil, errors.New(*entry.PluginRef + " is a reference to an undefined Plugin")
123+
}
124+
if theFilter, ok := instance.(framework.Filter); ok {
125+
return theFilter, nil
126+
}
127+
return nil, errors.New(*entry.PluginRef + " is not a filter")
128+
} else if entry.DecisionTree != nil {
129+
return loadDecisionTree(entry.DecisionTree, handle)
130+
}
131+
return nil, errors.New("either pluginRef or decisionTree must be specified")
132+
}
133+
50134
// Type returns the type of the filter.
51135
func (f *DecisionTreeFilter) Type() string {
52136
if f == nil {

0 commit comments

Comments
 (0)