Skip to content

Commit 4c9dc9e

Browse files
authored
[RHOAIENG-20548] - Env variable removed from ISVC are not removed from predictor pod (#516) (#520)
* [RHOAIENG-20548] - Env variable removed from ISVC are not removed from predictor pod chore: Fixes the reconcile loop to effectivelly update the EnvVars and replicas count. Also address this issue: https://issues.redhat.com/browse/RHOAIENG-20634 * review comments --------- Signed-off-by: Spolti <[email protected]>
1 parent fd73280 commit 4c9dc9e

File tree

4 files changed

+301
-14
lines changed

4 files changed

+301
-14
lines changed

pkg/controller/v1beta1/inferenceservice/rawkube_controller_test.go

Lines changed: 199 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,20 +74,19 @@ var _ = Describe("v1beta1 inference service controller", func() {
7474
}
7575
)
7676

77-
Context("When creating inference service with raw kube predictor", func() {
78-
configs := map[string]string{
79-
"explainers": `{
77+
configs := map[string]string{
78+
"explainers": `{
8079
"alibi": {
8180
"image": "kserve/alibi-explainer",
8281
"defaultImageVersion": "latest"
8382
}
8483
}`,
85-
"ingress": `{
84+
"ingress": `{
8685
"ingressGateway": "knative-serving/knative-ingress-gateway",
8786
"localGateway": "knative-serving/knative-local-gateway",
8887
"localGatewayService": "knative-local-gateway.istio-system.svc.cluster.local"
8988
}`,
90-
"storageInitializer": `{
89+
"storageInitializer": `{
9190
"image" : "kserve/storage-initializer:latest",
9291
"memoryRequest": "100Mi",
9392
"memoryLimit": "1Gi",
@@ -97,7 +96,8 @@ var _ = Describe("v1beta1 inference service controller", func() {
9796
"caBundleVolumeMountPath": "/etc/ssl/custom-certs",
9897
"enableDirectPvcVolumeMount": false
9998
}`,
100-
}
99+
}
100+
Context("When creating inference service with raw kube predictor", func() {
101101

102102
It("Should have ingress/service/deployment/hpa created", func() {
103103
By("By creating a new InferenceService")
@@ -1383,6 +1383,199 @@ var _ = Describe("v1beta1 inference service controller", func() {
13831383
ShouldNot(Succeed())
13841384
})
13851385
})
1386+
1387+
Context("When updating ISVC envs", func() {
1388+
It("Should reconcile the deployment if isvc envs are updated", func() {
1389+
defaultEnvs := []v1.EnvVar{
1390+
{
1391+
Name: "env1",
1392+
Value: "val1",
1393+
},
1394+
{
1395+
Name: "env2",
1396+
Value: "val2",
1397+
},
1398+
{
1399+
Name: "env3",
1400+
Value: "val3",
1401+
},
1402+
}
1403+
1404+
// Create configmap
1405+
var configMap = &v1.ConfigMap{
1406+
ObjectMeta: metav1.ObjectMeta{
1407+
Name: constants.InferenceServiceConfigMapName,
1408+
Namespace: constants.KServeNamespace,
1409+
},
1410+
Data: configs,
1411+
}
1412+
Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred())
1413+
defer k8sClient.Delete(context.TODO(), configMap)
1414+
// Create ServingRuntime
1415+
servingRuntime := &v1alpha1.ServingRuntime{
1416+
ObjectMeta: metav1.ObjectMeta{
1417+
Name: "tf-serving-raw",
1418+
Namespace: "default",
1419+
},
1420+
Spec: v1alpha1.ServingRuntimeSpec{
1421+
SupportedModelFormats: []v1alpha1.SupportedModelFormat{
1422+
{
1423+
Name: "tensorflow",
1424+
Version: proto.String("1"),
1425+
AutoSelect: proto.Bool(true),
1426+
},
1427+
},
1428+
ServingRuntimePodSpec: v1alpha1.ServingRuntimePodSpec{
1429+
Containers: []v1.Container{
1430+
{
1431+
Name: "kserve-container",
1432+
Image: "tensorflow/serving:1.14.0",
1433+
Command: []string{"/usr/bin/tensorflow_model_server"},
1434+
Args: []string{
1435+
"--port=9000",
1436+
"--rest_api_port=8080",
1437+
"--model_base_path=/mnt/models",
1438+
"--rest_api_timeout_in_ms=60000",
1439+
},
1440+
Resources: defaultResource,
1441+
},
1442+
},
1443+
},
1444+
Disabled: proto.Bool(false),
1445+
},
1446+
}
1447+
k8sClient.Create(context.TODO(), servingRuntime)
1448+
defer k8sClient.Delete(context.TODO(), servingRuntime)
1449+
serviceName := "raw-test-env"
1450+
var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: serviceName, Namespace: "default"}}
1451+
var serviceKey = expectedRequest.NamespacedName
1452+
// create isvc
1453+
var storageUri = "s3://test/mnist/export"
1454+
isvcOriginal := &v1beta1.InferenceService{
1455+
ObjectMeta: metav1.ObjectMeta{
1456+
Name: serviceKey.Name,
1457+
Namespace: serviceKey.Namespace,
1458+
Annotations: map[string]string{
1459+
"serving.kserve.io/deploymentMode": "RawDeployment",
1460+
"serving.kserve.io/autoscalerClass": "hpa",
1461+
"serving.kserve.io/metrics": "cpu",
1462+
"serving.kserve.io/targetUtilizationPercentage": "75",
1463+
},
1464+
},
1465+
Spec: v1beta1.InferenceServiceSpec{
1466+
Predictor: v1beta1.PredictorSpec{
1467+
ComponentExtensionSpec: v1beta1.ComponentExtensionSpec{
1468+
MinReplicas: v1beta1.GetIntReference(1),
1469+
MaxReplicas: 3,
1470+
},
1471+
Tensorflow: &v1beta1.TFServingSpec{
1472+
PredictorExtensionSpec: v1beta1.PredictorExtensionSpec{
1473+
1474+
StorageURI: &storageUri,
1475+
RuntimeVersion: proto.String("1.14.0"),
1476+
Container: v1.Container{
1477+
Name: constants.InferenceServiceContainerName,
1478+
Resources: defaultResource,
1479+
Env: defaultEnvs,
1480+
},
1481+
},
1482+
},
1483+
},
1484+
},
1485+
}
1486+
1487+
isvcOriginal.DefaultInferenceService(nil, nil, &v1beta1.SecurityConfig{AutoMountServiceAccountToken: false}, nil)
1488+
Expect(k8sClient.Create(ctx, isvcOriginal)).Should(Succeed())
1489+
1490+
inferenceService := &v1beta1.InferenceService{}
1491+
Eventually(func() bool {
1492+
err := k8sClient.Get(ctx, serviceKey, inferenceService)
1493+
if err != nil {
1494+
return false
1495+
}
1496+
return true
1497+
}, timeout, interval).Should(BeTrue())
1498+
1499+
deployed1 := &appsv1.Deployment{}
1500+
predictorDeploymentKey := types.NamespacedName{Name: constants.PredictorServiceName(serviceKey.Name),
1501+
Namespace: serviceKey.Namespace}
1502+
Eventually(func() error {
1503+
return k8sClient.Get(context.TODO(), predictorDeploymentKey, deployed1)
1504+
}, timeout, interval).Should(Succeed())
1505+
Expect(deployed1.Spec.Template.Spec.Containers[0].Env).To(ContainElements(defaultEnvs))
1506+
1507+
// Now, update the isvc with new env
1508+
newEnvs := []v1.EnvVar{
1509+
{
1510+
Name: "newEnv1",
1511+
Value: "newValue1",
1512+
},
1513+
{
1514+
Name: "newEnv2",
1515+
Value: "delete",
1516+
},
1517+
}
1518+
1519+
// Update the isvc to add new envs
1520+
fmt.Fprintln(GinkgoWriter, "### Adding new envs")
1521+
isvcUpdated1 := &v1beta1.InferenceService{}
1522+
Eventually(func() bool {
1523+
// get the latest deployed version
1524+
err := k8sClient.Get(ctx, serviceKey, inferenceService)
1525+
if err != nil {
1526+
return false
1527+
}
1528+
1529+
isvcUpdated1 = inferenceService.DeepCopy()
1530+
isvcUpdated1.Spec.Predictor.Model.Env = append(isvcUpdated1.Spec.Predictor.Model.Env, newEnvs...)
1531+
err = k8sClient.Update(ctx, isvcUpdated1)
1532+
if err != nil {
1533+
return false
1534+
}
1535+
return true
1536+
}, timeout, interval).Should(BeTrue())
1537+
1538+
// The deployment should be reconciled
1539+
deployed2 := &appsv1.Deployment{}
1540+
appendedEnvs := append(defaultEnvs, newEnvs...)
1541+
Eventually(func() []v1.EnvVar {
1542+
_ = k8sClient.Get(context.TODO(), predictorDeploymentKey, deployed2)
1543+
return deployed2.Spec.Template.Spec.Containers[0].Env
1544+
}, timeout, interval).Should(ContainElements(appendedEnvs))
1545+
1546+
// Now remove the default envs and update the isvc
1547+
fmt.Fprintln(GinkgoWriter, "### Removing default envs")
1548+
isvcUpdated2 := &v1beta1.InferenceService{}
1549+
Eventually(func() bool {
1550+
// get the latest deployed version
1551+
err := k8sClient.Get(ctx, serviceKey, isvcUpdated1)
1552+
if err != nil {
1553+
return false
1554+
}
1555+
1556+
isvcUpdated2 = isvcUpdated1.DeepCopy()
1557+
isvcUpdated2.Spec.Predictor.Model.Env = newEnvs
1558+
// Make sure the default envs were removed before updating the isvc
1559+
Expect(isvcUpdated2.Spec.Predictor.Model.Env).ToNot(ContainElements(defaultEnvs))
1560+
1561+
err = k8sClient.Update(ctx, isvcUpdated2)
1562+
if err != nil {
1563+
return false
1564+
}
1565+
return true
1566+
}, timeout, interval).Should(BeTrue())
1567+
1568+
deployed3 := &appsv1.Deployment{}
1569+
Eventually(func() []v1.EnvVar {
1570+
_ = k8sClient.Get(context.TODO(), predictorDeploymentKey, deployed3)
1571+
return deployed3.Spec.Template.Spec.Containers[0].Env
1572+
}, timeout, interval).Should(Not(ContainElements(defaultEnvs)))
1573+
1574+
Expect(deployed3.Spec.Template.Spec.Containers[0].Env).ToNot(ContainElement(HaveField("Value", "env_marked_for_deletion")))
1575+
Expect(deployed3.Spec.Template.Spec.Containers[0].Env).To(ContainElements(newEnvs))
1576+
})
1577+
})
1578+
13861579
Context("When creating inference service with raw kube predictor and empty ingressClassName", func() {
13871580
configs := map[string]string{
13881581
"explainers": `{

pkg/controller/v1beta1/inferenceservice/reconcilers/deployment/deployment_reconciler.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"strings"
2727

2828
"k8s.io/apimachinery/pkg/api/resource"
29+
"k8s.io/apimachinery/pkg/util/strategicpatch"
2930
"k8s.io/client-go/kubernetes"
3031

3132
"github.com/google/go-cmp/cmp/cmpopts"
@@ -40,7 +41,6 @@ import (
4041
"k8s.io/apimachinery/pkg/runtime"
4142
"k8s.io/apimachinery/pkg/types"
4243
"k8s.io/apimachinery/pkg/util/intstr"
43-
"k8s.io/apimachinery/pkg/util/strategicpatch"
4444
"knative.dev/pkg/kmp"
4545
kclient "sigs.k8s.io/controller-runtime/pkg/client"
4646
logf "sigs.k8s.io/controller-runtime/pkg/log"
@@ -412,13 +412,15 @@ func (r *DeploymentReconciler) checkDeploymentExist(client kclient.Client, deplo
412412
log.Error(err, "Failed to perform dry-run update of deployment", "Deployment", deployment.Name)
413413
return constants.CheckResultUnknown, nil, err
414414
}
415+
415416
processedExistingDep := v1beta1utils.RemoveCookieSecretArg(*existingDeployment)
416417
processedNewDep := v1beta1utils.RemoveCookieSecretArg(*deployment)
417-
if diff, err := kmp.SafeDiff(processedNewDep.Spec, processedExistingDep.Spec, ignoreFields); err != nil {
418+
if diff, err := kmp.SafeDiff(processedExistingDep.Spec, processedNewDep.Spec, ignoreFields); err != nil {
419+
log.Error(err, "Failed to diff deployments", "Deployment", deployment.Name)
418420
return constants.CheckResultUnknown, nil, err
419-
} else if diff != "" {
421+
} else if len(diff) > 0 {
420422
log.Info("Deployment Updated", "Diff", diff)
421-
return constants.CheckResultUpdate, existingDeployment, nil
423+
return constants.CheckResultUpdate, processedNewDep, nil
422424
}
423425
return constants.CheckResultExisted, existingDeployment, nil
424426
}
@@ -554,6 +556,7 @@ func addGPUResourceToDeployment(deployment *appsv1.Deployment, targetContainerNa
554556
func (r *DeploymentReconciler) Reconcile() ([]*appsv1.Deployment, error) {
555557
for _, deployment := range r.DeploymentList {
556558
// Reconcile Deployment
559+
originalDeployment := &appsv1.Deployment{}
557560
checkResult, _, err := r.checkDeploymentExist(r.client, deployment)
558561
if err != nil {
559562
return nil, err
@@ -565,26 +568,58 @@ func (r *DeploymentReconciler) Reconcile() ([]*appsv1.Deployment, error) {
565568
case constants.CheckResultCreate:
566569
opErr = r.client.Create(context.TODO(), deployment)
567570
case constants.CheckResultUpdate:
568-
curJson, err := json.Marshal(deployment)
571+
// get the current deployment
572+
_ = r.client.Get(context.TODO(), types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, originalDeployment)
573+
// we need to remove the Replicas field from the deployment spec
574+
originalDeployment.Spec.Replicas = nil
575+
curJson, err := json.Marshal(originalDeployment)
569576
if err != nil {
570577
return nil, err
571578
}
572579

580+
// Check if there are any envs to remove
581+
// If there, its value will be set to "delete" so we can update the patchBytes with
582+
// "patch": "delete"
583+
// The strategic merge patch does not remove items from list just by removing it from the patch,
584+
// to delete lists items using strategic merge patch, the $patch delete pattern is used.
585+
// Example:
586+
// - env:
587+
// - "name": "ENV1",
588+
// "$patch": "delete"
589+
for i, deploymentC := range deployment.Spec.Template.Spec.Containers {
590+
envs := []corev1.EnvVar{}
591+
for _, OriginalC := range originalDeployment.Spec.Template.Spec.Containers {
592+
if deploymentC.Name == OriginalC.Name {
593+
envsToRemove, envsToKeep := utils.CheckEnvsToRemove(deploymentC.Env, OriginalC.Env)
594+
if len(envsToRemove) > 0 {
595+
envs = append(envs, envsToKeep...)
596+
envs = append(envs, envsToRemove...)
597+
} else {
598+
envs = deploymentC.Env
599+
}
600+
}
601+
}
602+
deployment.Spec.Template.Spec.Containers[i].Env = envs
603+
}
604+
573605
// To avoid the conflict between HPA and Deployment,
574606
// we need to remove the Replicas field from the deployment spec
575-
modDeployment := deployment.DeepCopy()
576-
modDeployment.Spec.Replicas = nil
607+
deployment.Spec.Replicas = nil
577608

578-
modJson, err := json.Marshal(modDeployment)
609+
modJson, err := json.Marshal(deployment)
579610
if err != nil {
580611
return nil, err
581612
}
613+
582614
// Generate the strategic merge patch between the current and modified JSON
583615
patchByte, err := strategicpatch.StrategicMergePatch(curJson, modJson, appsv1.Deployment{})
584616
if err != nil {
585617
return nil, err
586618
}
587619

620+
// override the envs that needs to be removed with "$patch": "delete"
621+
patchByte = []byte(strings.ReplaceAll(string(patchByte), "\"value\":\""+utils.PLACEHOLDER_FOR_DELETION+"\"", "\"$patch\":\"delete\""))
622+
588623
// Patch the deployment object with the strategic merge patch
589624
opErr = r.client.Patch(context.TODO(), deployment, kclient.RawPatch(types.StrategicMergePatchType, patchByte))
590625
}

pkg/utils/utils.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,29 @@ func IsValidCustomGPUArray(s string) bool {
310310

311311
return true
312312
}
313+
314+
const PLACEHOLDER_FOR_DELETION = "env_marked_for_deletion"
315+
316+
// CheckEnvsToRemove checks the current envs against the desired ones and returns the envs that needs to be
317+
// removed from the target env list and envs that need to be kept.
318+
// Returns envsToRemove, envsToKeep
319+
func CheckEnvsToRemove(desired, current []v1.EnvVar) ([]v1.EnvVar, []v1.EnvVar) {
320+
var envsToRemove []v1.EnvVar
321+
var envsToKeep []v1.EnvVar
322+
for _, currentEnv := range current {
323+
found := false
324+
for _, desiredEnv := range desired {
325+
if currentEnv.Name == desiredEnv.Name {
326+
envsToKeep = append(envsToKeep, currentEnv)
327+
found = true
328+
break
329+
}
330+
}
331+
if !found {
332+
// replace the value of the found env to a placeholder to mark it for deletion
333+
currentEnv.Value = PLACEHOLDER_FOR_DELETION
334+
envsToRemove = append(envsToRemove, currentEnv)
335+
}
336+
}
337+
return envsToRemove, envsToKeep
338+
}

0 commit comments

Comments
 (0)