Skip to content

Commit eb7dc7c

Browse files
authored
Add fallback logic (#1122)
1 parent b208f2e commit eb7dc7c

File tree

2 files changed

+56
-29
lines changed

2 files changed

+56
-29
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,20 +239,26 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
239239
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
240240
}
241241
// primary profile is used to set destination
242-
// TODO should use multiple destinations according to epp protocol. current code assumes a single target
243-
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod()
244-
245242
pool, err := d.datastore.PoolGet()
246243
if err != nil {
247244
return reqCtx, err
248245
}
246+
targetPods := []*backend.Pod{}
249247
targetPort := int(pool.Spec.TargetPortNumber)
248+
targetEndpoints := []string{}
249+
250+
for _, pod := range result.ProfileResults[result.PrimaryProfileName].TargetPods {
251+
curPod := pod.GetPod()
252+
curEndpoint := net.JoinHostPort(curPod.Address, strconv.Itoa(targetPort))
253+
targetPods = append(targetPods, curPod)
254+
targetEndpoints = append(targetEndpoints, curEndpoint)
255+
}
250256

251-
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort))
252-
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)
257+
multiEndpointString := strings.Join(targetEndpoints, ",")
258+
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", multiEndpointString)
253259

254-
reqCtx.TargetPod = targetPod
255-
reqCtx.TargetEndpoint = endpoint
260+
reqCtx.TargetPod = targetPods[0]
261+
reqCtx.TargetEndpoint = multiEndpointString
256262

257263
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
258264

@@ -274,6 +280,8 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC
274280
Headers: reqCtx.Response.Headers,
275281
}
276282

283+
// TODO: to extend fallback functionality, handle cases where target pod is unavailable
284+
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224
277285
d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)
278286

279287
return reqCtx, nil

pkg/epp/requestcontrol/director_test.go

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package requestcontrol
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"testing"
2324
"time"
2425

@@ -29,7 +30,6 @@ import (
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/apimachinery/pkg/runtime"
3132
"k8s.io/apimachinery/pkg/types"
32-
k8stypes "k8s.io/apimachinery/pkg/types"
3333
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3434
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3535

@@ -109,26 +109,29 @@ func TestDirector_HandleRequest(t *testing.T) {
109109
},
110110
}
111111

112-
// Pod setup
113-
testPod := &corev1.Pod{
114-
ObjectMeta: metav1.ObjectMeta{
115-
Name: "pod1",
116-
Namespace: "default",
117-
Labels: map[string]string{"app": "inference"},
118-
},
119-
Status: corev1.PodStatus{
120-
PodIP: "192.168.1.100",
121-
Phase: corev1.PodRunning,
122-
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
123-
},
124-
}
125112
scheme := runtime.NewScheme()
126113
_ = clientgoscheme.AddToScheme(scheme)
127114
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
128115
if err := ds.PoolSet(ctx, fakeClient, pool); err != nil {
129116
t.Fatalf("Error while setting inference pool: %v", err)
130117
}
131-
ds.PodUpdateOrAddIfNotExist(testPod)
118+
119+
for i := range 5 {
120+
// Pod setup
121+
testPod := &corev1.Pod{
122+
ObjectMeta: metav1.ObjectMeta{
123+
Name: fmt.Sprintf("pod%v", i+1),
124+
Namespace: "default",
125+
Labels: map[string]string{"app": "inference"},
126+
},
127+
Status: corev1.PodStatus{
128+
PodIP: fmt.Sprintf("192.168.%v.100", i+1),
129+
Phase: corev1.PodRunning,
130+
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
131+
},
132+
}
133+
ds.PodUpdateOrAddIfNotExist(testPod)
134+
}
132135

133136
defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{
134137
ProfileResults: map[string]*schedulingtypes.ProfileRunResult{
@@ -138,7 +141,23 @@ func TestDirector_HandleRequest(t *testing.T) {
138141
Pod: &schedulingtypes.PodMetrics{
139142
Pod: &backend.Pod{
140143
Address: "192.168.1.100",
141-
NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"},
144+
NamespacedName: types.NamespacedName{Name: "pod1", Namespace: "default"},
145+
},
146+
},
147+
},
148+
&schedulingtypes.ScoredPod{
149+
Pod: &schedulingtypes.PodMetrics{
150+
Pod: &backend.Pod{
151+
Address: "192.168.2.100",
152+
NamespacedName: types.NamespacedName{Name: "pod2", Namespace: "default"},
153+
},
154+
},
155+
},
156+
&schedulingtypes.ScoredPod{
157+
Pod: &schedulingtypes.PodMetrics{
158+
Pod: &backend.Pod{
159+
Address: "192.168.4.100",
160+
NamespacedName: types.NamespacedName{Name: "pod4", Namespace: "default"},
142161
},
143162
},
144163
},
@@ -174,7 +193,7 @@ func TestDirector_HandleRequest(t *testing.T) {
174193
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
175194
Address: "192.168.1.100",
176195
},
177-
TargetEndpoint: "192.168.1.100:8000",
196+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
178197
},
179198
wantMutatedBodyModel: model,
180199
},
@@ -199,7 +218,7 @@ func TestDirector_HandleRequest(t *testing.T) {
199218
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
200219
Address: "192.168.1.100",
201220
},
202-
TargetEndpoint: "192.168.1.100:8000",
221+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
203222
},
204223
wantMutatedBodyModel: model,
205224
},
@@ -228,7 +247,7 @@ func TestDirector_HandleRequest(t *testing.T) {
228247
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
229248
Address: "192.168.1.100",
230249
},
231-
TargetEndpoint: "192.168.1.100:8000",
250+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
232251
},
233252
wantMutatedBodyModel: model,
234253
},
@@ -249,7 +268,7 @@ func TestDirector_HandleRequest(t *testing.T) {
249268
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
250269
Address: "192.168.1.100",
251270
},
252-
TargetEndpoint: "192.168.1.100:8000",
271+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
253272
},
254273
wantMutatedBodyModel: modelSheddable,
255274
},
@@ -270,7 +289,7 @@ func TestDirector_HandleRequest(t *testing.T) {
270289
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
271290
Address: "192.168.1.100",
272291
},
273-
TargetEndpoint: "192.168.1.100:8000",
292+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
274293
},
275294
wantMutatedBodyModel: "resolved-target-model-A",
276295
},
@@ -286,7 +305,7 @@ func TestDirector_HandleRequest(t *testing.T) {
286305
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
287306
Address: "192.168.1.100",
288307
},
289-
TargetEndpoint: "192.168.1.100:8000",
308+
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
290309
},
291310
wantMutatedBodyModel: "food-review-1",
292311
reqBodyMap: map[string]any{

0 commit comments

Comments
 (0)