Skip to content

Commit 164f278

Browse files
hdefazioJooho
andauthored
Stop and resume an inference graph [Raw Deployment] (kserve#4637)
Signed-off-by: Hannah DeFazio <h2defazio@gmail.com> Co-authored-by: Jooho Lee <jlee@redhat.com>
1 parent 64d775b commit 164f278

File tree

2 files changed

+213
-20
lines changed

2 files changed

+213
-20
lines changed

pkg/controller/v1alpha1/inferencegraph/controller.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,13 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque
156156
return reconcile.Result{}, err
157157
}
158158
// resolve service urls
159-
for node, router := range graph.Spec.Nodes {
160-
for i, route := range router.Steps {
161-
isvc := v1beta1.InferenceService{}
162-
if route.ServiceName != "" {
159+
if !forceStopRuntime {
160+
for node, router := range graph.Spec.Nodes {
161+
for i, route := range router.Steps {
162+
isvc := v1beta1.InferenceService{}
163+
if route.ServiceName == "" {
164+
continue
165+
}
163166
err := r.Client.Get(ctx, types.NamespacedName{Namespace: graph.Namespace, Name: route.ServiceName}, &isvc)
164167
if err == nil {
165168
if graph.Spec.Nodes[node].Steps[i].ServiceURL == "" {
@@ -178,6 +181,7 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque
178181
}
179182
}
180183
}
184+
181185
isvcConfigMap, err := v1beta1.GetInferenceServiceConfigMap(ctx, r.Clientset)
182186
if err != nil {
183187
r.Log.Error(err, "unable to get configmap", "name", constants.InferenceServiceConfigMapName, "namespace", constants.KServeNamespace)
@@ -198,18 +202,22 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque
198202
}
199203

200204
r.Log.Info("Inference graph raw", "deployment conditions", deployment.Status.Conditions)
201-
igAvailable := false
202-
for _, con := range deployment.Status.Conditions {
203-
if con.Type == appsv1.DeploymentAvailable {
204-
igAvailable = true
205-
break
205+
if !forceStopRuntime {
206+
// Check if the deployment is ready. If not, requeue
207+
igAvailable := false
208+
for _, con := range deployment.Status.Conditions {
209+
if con.Type == appsv1.DeploymentAvailable {
210+
igAvailable = true
211+
break
212+
}
213+
}
214+
if !igAvailable {
215+
// If Deployment resource not yet available, IG is not available as well. Reconcile again.
216+
return reconcile.Result{Requeue: true}, errors.Wrapf(err,
217+
"Failed to find inference graph deployment %s", graph.Name)
206218
}
207219
}
208-
if !igAvailable {
209-
// If Deployment resource not yet available, IG is not available as well. Reconcile again.
210-
return reconcile.Result{Requeue: true}, errors.Wrapf(err,
211-
"Failed to find inference graph deployment %s", graph.Name)
212-
}
220+
213221
logger.Info("Inference graph raw before propagate status")
214222
PropagateRawStatus(&graph.Status, deployment, url)
215223
} else {
@@ -271,8 +279,8 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque
271279
Status: corev1.ConditionFalse,
272280
}
273281
graph.Status.Conditions = append(graph.Status.Conditions, defaultStoppedCondition)
282+
existingStoppedCondition = &defaultStoppedCondition
274283
}
275-
existingStoppedCondition = graph.Status.GetCondition(v1beta1.Stopped)
276284
if forceStopRuntime {
277285
// If the graph's stopped condition is not set or
278286
// If the graph is currently running, update its status to signal that it should be stopped

pkg/controller/v1alpha1/inferencegraph/controller_test.go

Lines changed: 190 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,7 +1099,7 @@ var _ = Describe("Inference Graph controller test", func() {
10991099
Consistently(func() bool {
11001100
err := k8sClient.Get(ctx, objKey, obj)
11011101
return apierr.IsNotFound(err)
1102-
}, time.Second*10, interval).Should(BeTrue(), "%T %s should not be created", obj, objKey.Name)
1102+
}, timeout, interval).Should(BeTrue(), "%T %s should not be created", obj, objKey.Name)
11031103
}
11041104

11051105
// Wait for any Kubernetes object to be not found.
@@ -1157,7 +1157,7 @@ var _ = Describe("Inference Graph controller test", func() {
11571157
return ig
11581158
}
11591159

1160-
It("Should keep the knative service when the annotation is set to false", func() {
1160+
It("Should keep the knative service when the StopAnnotationKey annotation is set to false", func() {
11611161
ctx, cancel := context.WithCancel(context.Background())
11621162
DeferCleanup(cancel)
11631163

@@ -1183,7 +1183,7 @@ var _ = Describe("Inference Graph controller test", func() {
11831183
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionFalse)
11841184
})
11851185

1186-
It("Should not create the knative service when the annotation is set to true", func() {
1186+
It("Should not create the knative service when the StopAnnotationKey annotation is set to true", func() {
11871187
ctx, cancel := context.WithCancel(context.Background())
11881188
DeferCleanup(cancel)
11891189

@@ -1208,7 +1208,7 @@ var _ = Describe("Inference Graph controller test", func() {
12081208
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionTrue)
12091209
})
12101210

1211-
It("Should delete the knative service when the annotation is updated to true on an existing IG", func() {
1211+
It("Should delete the knative service when the StopAnnotationKey annotation is updated to true on an existing IG", func() {
12121212
ctx, cancel := context.WithCancel(context.Background())
12131213
DeferCleanup(cancel)
12141214

@@ -1247,7 +1247,7 @@ var _ = Describe("Inference Graph controller test", func() {
12471247
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionTrue)
12481248
})
12491249

1250-
It("Should create the knative service when the annotation is updated to false on an existing IG", func() {
1250+
It("Should create the knative service when the StopAnnotationKey annotation is updated to false on an existing IG", func() {
12511251
ctx, cancel := context.WithCancel(context.Background())
12521252
DeferCleanup(cancel)
12531253

@@ -1284,6 +1284,191 @@ var _ = Describe("Inference Graph controller test", func() {
12841284
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionFalse)
12851285
})
12861286
})
1287+
1288+
Describe("in Raw Deployment mode", func() {
1289+
// --- Default values ---
1290+
defaultRawIG := func(serviceKey types.NamespacedName) *v1alpha1.InferenceGraph {
1291+
ig := &v1alpha1.InferenceGraph{
1292+
ObjectMeta: metav1.ObjectMeta{
1293+
Name: serviceKey.Name,
1294+
Namespace: serviceKey.Namespace,
1295+
Annotations: map[string]string{
1296+
"serving.kserve.io/deploymentMode": string(constants.RawDeployment),
1297+
},
1298+
},
1299+
Spec: v1alpha1.InferenceGraphSpec{
1300+
Nodes: map[string]v1alpha1.InferenceRouter{
1301+
v1alpha1.GraphRootNodeName: {
1302+
RouterType: v1alpha1.Sequence,
1303+
Steps: []v1alpha1.InferenceStep{
1304+
{
1305+
InferenceTarget: v1alpha1.InferenceTarget{
1306+
ServiceURL: "http://someservice.example.com",
1307+
},
1308+
},
1309+
},
1310+
},
1311+
},
1312+
},
1313+
}
1314+
return ig
1315+
}
1316+
1317+
// --- Reusable Check Functions ---
1318+
// Updates the status of the deployment to Ready
1319+
// and applies the change to the Kubernetes API server.
1320+
expectDeploymentToBeReady := func(ctx context.Context, serviceKey types.NamespacedName) {
1321+
actualDeployment := &appsv1.Deployment{}
1322+
Eventually(func() error { return k8sClient.Get(ctx, serviceKey, actualDeployment) }, timeout).
1323+
Should(Succeed())
1324+
1325+
updatedDeployment := actualDeployment.DeepCopy()
1326+
updatedDeployment.Status.Conditions = []appsv1.DeploymentCondition{
1327+
{
1328+
Type: appsv1.DeploymentAvailable,
1329+
Status: corev1.ConditionTrue,
1330+
},
1331+
}
1332+
Expect(k8sClient.Status().Update(ctx, updatedDeployment)).NotTo(HaveOccurred())
1333+
}
1334+
1335+
It("Should keep the service/deployment when the StopAnnotationKey annotation is set to false", func() {
1336+
ctx, cancel := context.WithCancel(context.Background())
1337+
DeferCleanup(cancel)
1338+
1339+
// Config map
1340+
configMap := createIGConfigMap()
1341+
Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred())
1342+
defer k8sClient.Delete(context.TODO(), configMap)
1343+
1344+
// Define InferenceGraph
1345+
serviceNamespace := "default"
1346+
graphName := "stop-raw-false-ig"
1347+
graphExpectedRequest := reconcile.Request{NamespacedName: types.NamespacedName{Name: graphName, Namespace: serviceNamespace}}
1348+
graphServiceKey := graphExpectedRequest.NamespacedName
1349+
ig := defaultRawIG(graphServiceKey)
1350+
ig.Annotations[constants.StopAnnotationKey] = "false"
1351+
Expect(k8sClient.Create(ctx, ig)).Should(Succeed())
1352+
defer k8sClient.Delete(ctx, ig)
1353+
1354+
// Check the inference graph
1355+
expectResourceToExist(context.Background(), &appsv1.Deployment{}, graphServiceKey)
1356+
expectDeploymentToBeReady(context.Background(), graphServiceKey)
1357+
1358+
expectResourceToExist(context.Background(), &corev1.Service{}, graphServiceKey)
1359+
expectIGToExist(context.Background(), graphServiceKey)
1360+
1361+
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionFalse)
1362+
})
1363+
1364+
It("Should not create the service/deployment when the StopAnnotationKey annotation is set to true", func() {
1365+
ctx, cancel := context.WithCancel(context.Background())
1366+
DeferCleanup(cancel)
1367+
1368+
configMap := createIGConfigMap()
1369+
Expect(k8sClient.Create(ctx, configMap)).NotTo(HaveOccurred())
1370+
defer k8sClient.Delete(ctx, configMap)
1371+
1372+
graphName := "stop-raw-true-ig"
1373+
serviceNamespace := "default"
1374+
expectedRequest := reconcile.Request{NamespacedName: types.NamespacedName{Name: graphName, Namespace: serviceNamespace}}
1375+
graphServiceKey := expectedRequest.NamespacedName
1376+
ig := defaultRawIG(graphServiceKey)
1377+
ig.Annotations[constants.StopAnnotationKey] = "true"
1378+
Expect(k8sClient.Create(context.Background(), ig)).Should(Succeed())
1379+
defer k8sClient.Delete(context.Background(), ig)
1380+
1381+
// Check that the service and deployment were not created
1382+
expectResourceIsDeleted(context.Background(), &corev1.Service{}, graphServiceKey)
1383+
expectResourceIsDeleted(context.Background(), &appsv1.Deployment{}, graphServiceKey)
1384+
1385+
// Check the inference graph
1386+
expectIGToExist(context.Background(), graphServiceKey)
1387+
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionTrue)
1388+
})
1389+
1390+
It("Should delete the service/deployment when the StopAnnotationKey annotation is updated to true on an existing IG", func() {
1391+
ctx, cancel := context.WithCancel(context.Background())
1392+
DeferCleanup(cancel)
1393+
1394+
// Config map
1395+
configMap := createIGConfigMap()
1396+
Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred())
1397+
defer k8sClient.Delete(context.TODO(), configMap)
1398+
1399+
// Define InferenceGraph
1400+
serviceNamespace := "default"
1401+
graphName := "stop-raw-update-true-ig"
1402+
graphExpectedRequest := reconcile.Request{NamespacedName: types.NamespacedName{Name: graphName, Namespace: serviceNamespace}}
1403+
graphServiceKey := graphExpectedRequest.NamespacedName
1404+
ig := defaultRawIG(graphServiceKey)
1405+
ig.Annotations[constants.StopAnnotationKey] = "false"
1406+
Expect(k8sClient.Create(ctx, ig)).Should(Succeed())
1407+
defer k8sClient.Delete(ctx, ig)
1408+
1409+
// Check the inference graph
1410+
expectResourceToExist(context.Background(), &appsv1.Deployment{}, graphServiceKey)
1411+
expectDeploymentToBeReady(context.Background(), graphServiceKey)
1412+
expectResourceToExist(context.Background(), &corev1.Service{}, graphServiceKey)
1413+
expectIGToExist(context.Background(), graphServiceKey)
1414+
1415+
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionFalse)
1416+
1417+
// Stop the inference graph
1418+
actualIG := expectIGToExist(ctx, graphServiceKey)
1419+
updatedIG := actualIG.DeepCopy()
1420+
updatedIG.Annotations[constants.StopAnnotationKey] = "true"
1421+
Expect(k8sClient.Update(ctx, updatedIG)).NotTo(HaveOccurred())
1422+
1423+
// Check that the service and deployment were deleted
1424+
expectResourceToBeDeleted(context.Background(), &corev1.Service{}, graphServiceKey)
1425+
expectResourceToBeDeleted(context.Background(), &appsv1.Deployment{}, graphServiceKey)
1426+
1427+
// Check the inference graph
1428+
expectIGToExist(context.Background(), graphServiceKey)
1429+
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionTrue)
1430+
})
1431+
1432+
It("Should create the service/deployment when the StopAnnotationKey annotation is updated to false on an existing IG", func() {
1433+
ctx, cancel := context.WithCancel(context.Background())
1434+
DeferCleanup(cancel)
1435+
1436+
configMap := createIGConfigMap()
1437+
Expect(k8sClient.Create(ctx, configMap)).NotTo(HaveOccurred())
1438+
defer k8sClient.Delete(ctx, configMap)
1439+
1440+
graphName := "stop-raw-update-false-ig"
1441+
serviceNamespace := "default"
1442+
expectedRequest := reconcile.Request{NamespacedName: types.NamespacedName{Name: graphName, Namespace: serviceNamespace}}
1443+
graphServiceKey := expectedRequest.NamespacedName
1444+
ig := defaultRawIG(graphServiceKey)
1445+
ig.Annotations[constants.StopAnnotationKey] = "true"
1446+
Expect(k8sClient.Create(context.Background(), ig)).Should(Succeed())
1447+
defer k8sClient.Delete(context.Background(), ig)
1448+
1449+
// Check that the service and deployment were not created
1450+
expectResourceIsDeleted(context.Background(), &corev1.Service{}, graphServiceKey)
1451+
expectResourceIsDeleted(context.Background(), &appsv1.Deployment{}, graphServiceKey)
1452+
1453+
// Check the inference graph
1454+
expectIGToExist(context.Background(), graphServiceKey)
1455+
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionTrue)
1456+
1457+
// Resume the inference graph
1458+
actualIG := expectIGToExist(ctx, graphServiceKey)
1459+
updatedIG := actualIG.DeepCopy()
1460+
updatedIG.Annotations[constants.StopAnnotationKey] = "false"
1461+
Expect(k8sClient.Update(ctx, updatedIG)).NotTo(HaveOccurred())
1462+
1463+
// Check the inference graph
1464+
expectResourceToExist(context.Background(), &appsv1.Deployment{}, graphServiceKey)
1465+
expectDeploymentToBeReady(context.Background(), graphServiceKey)
1466+
expectResourceToExist(context.Background(), &corev1.Service{}, graphServiceKey)
1467+
expectIGToExist(context.Background(), graphServiceKey)
1468+
1469+
expectIGConditionStatus(ctx, graphServiceKey, v1beta1.Stopped, corev1.ConditionFalse)
1470+
})
1471+
})
12871472
})
12881473

12891474
Context("When creating an IG with tolerations in the spec", func() {

0 commit comments

Comments
 (0)