Skip to content

Commit f47d941

Browse files
author
Dev Agent
committed
fix Revision delete using logical deletion
1 parent 34e72db commit f47d941

File tree

9 files changed

+432
-121
lines changed

9 files changed

+432
-121
lines changed

api/handler/model_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestModelHandler_CreateInferenceVersion_Success(t *testing.T) {
2828

2929
req := &types.CreateInferenceVersionReq{
3030
CommitID: "test-commit",
31-
InitialTraffic: 50,
31+
TrafficPercent: 50,
3232
}
3333
body, _ := json.Marshal(req)
3434
w := httptest.NewRecorder()
@@ -52,7 +52,7 @@ func TestModelHandler_CreateInferenceVersion_InvalidID(t *testing.T) {
5252

5353
req := &types.CreateInferenceVersionReq{
5454
CommitID: "test-commit",
55-
InitialTraffic: 50,
55+
TrafficPercent: 50,
5656
}
5757
body, _ := json.Marshal(req)
5858
w := httptest.NewRecorder()
@@ -96,7 +96,7 @@ func TestModelHandler_CreateInferenceVersion_ServiceError(t *testing.T) {
9696

9797
req := &types.CreateInferenceVersionReq{
9898
CommitID: "test-commit",
99-
InitialTraffic: 50,
99+
TrafficPercent: 50,
100100
}
101101
body, _ := json.Marshal(req)
102102
w := httptest.NewRecorder()

builder/deploy/imagerunner/remote_runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ func (h *RemoteRunner) SetVersionsTraffic(ctx context.Context, clusterID, svcNam
525525
if err != nil {
526526
return err
527527
}
528-
url := fmt.Sprintf("%s/api/v1/service/%s/versions/traffic", remote, svcName)
528+
url := fmt.Sprintf("%s/api/v1/service/%s/versions/traffic?cluster_id=%s", remote, svcName, clusterID)
529529
response, err := h.doRequest(ctx, http.MethodPut, url, req)
530530
if err != nil {
531531
return fmt.Errorf("failed to update traffic, %w", err)

common/errorx/error_serverless.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,15 @@ var (
113113
//
114114
// zh-HK: 流量百分比無效
115115
ErrTrafficInvalid error = CustomError{prefix: errServerlessPrefix, code: codeTrafficInvalidErr}
116+
117+
//Description: no other valid revision except
118+
//
119+
//Description_ZH: 没有其他有效修订版本
120+
//
121+
//en-US: no other valid revision except
122+
//
123+
//zh-CN: 没有其他有效修订版本
124+
//
125+
//zh-HK: 沒有其他有效修訂版本
126+
ErrNoOtherValidRevision error = CustomError{prefix: errServerlessPrefix, code: codeTrafficInvalidErr}
116127
)

common/types/model.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ type CreateInferenceVersionReq struct {
472472
DeployId int64 `json:"-"`
473473
CommitID string `json:"commit_id"`
474474

475-
InitialTraffic int `json:"initial_traffic"`
475+
TrafficPercent int `json:"traffic_percent"`
476476
}
477477

478478
type ListInferenceVersionsResp struct {

component/model.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,7 +1418,7 @@ func (c *modelComponentImpl) CreateInferenceVersion(ctx context.Context, req typ
14181418
return errorx.ErrDeployStatusNotMatchErr
14191419
}
14201420

1421-
if req.InitialTraffic > 100 || req.InitialTraffic < 0 {
1421+
if req.TrafficPercent > 100 || req.TrafficPercent < 0 {
14221422
return errorx.ErrTrafficInvalid
14231423
}
14241424

@@ -1436,7 +1436,7 @@ func (c *modelComponentImpl) CreateInferenceVersion(ctx context.Context, req typ
14361436
ClusterID: deploy.ClusterID,
14371437
SvcName: deploy.SvcName,
14381438
Commit: req.CommitID,
1439-
InitialTraffic: req.InitialTraffic,
1439+
InitialTraffic: req.TrafficPercent,
14401440
})
14411441
}
14421442

@@ -1446,9 +1446,6 @@ func (c *modelComponentImpl) ListInferenceVersions(ctx context.Context, id int64
14461446
return nil, errorx.ErrDeployNotFoundErr
14471447
}
14481448
var resp = []types.ListInferenceVersionsResp{}
1449-
if deploy.Status != dcommon.Running {
1450-
return resp, nil
1451-
}
14521449

14531450
versions, err := c.imageRunner.ListKsvcVersions(ctx, deploy.ClusterID, deploy.SvcName)
14541451
if err != nil {

runner/component/service.go

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ func (s *serviceComponentImpl) updateServiceInDB(svc v1.Service, clusterID strin
823823
}
824824
deployment, err := s.getDeploymentByServiceName(ctx, svc, clusterID)
825825
if err != nil {
826-
slog.Error("failed to get deployment ", slog.Any("service", svc.Name), slog.Any("error", err))
826+
slog.ErrorContext(ctx, "failed to get deployment ", slog.Any("service", svc.Name), slog.Any("error", err))
827827
}
828828

829829
oldService.Endpoint = svc.Status.URL.String()
@@ -873,17 +873,18 @@ func (s *serviceComponentImpl) getServiceStatus(ctx context.Context, ks v1.Servi
873873
serviceCondition := ks.Status.GetCondition(v1.ServiceConditionReady)
874874
cluster, err := s.clusterPool.GetClusterByID(ctx, clusterID)
875875
if err != nil {
876+
slog.ErrorContext(ctx, "fail to get cluster", slog.Any("ksvc_name", ks.Name), slog.Any("error", err))
876877
return resp, fmt.Errorf("fail to get cluster,error: %v ", err)
877878
}
878-
slog.Info("get service condition in getServiceStatus",
879-
slog.Any("svc", ks.Name), slog.Any("condition", serviceCondition))
879+
slog.InfoContext(ctx, "get service condition in getServiceStatus",
880+
slog.Any("ksvc_name", ks.Name), slog.Any("condition", serviceCondition))
880881
if serviceCondition != nil {
881882
status, _, err := GetServiceExternalStatus(ctx, cluster, &ks, ks.Namespace)
882883
if err != nil {
883884
return resp, fmt.Errorf("fail to get service external status, error: %w", err)
884885
}
885-
slog.Info("get service external status in getServiceStatus",
886-
slog.Any("svc", ks.Name), slog.Any("status", status))
886+
slog.InfoContext(ctx, "get service external status in getServiceStatus",
887+
slog.Any("ksvc_name", ks.Name), slog.Any("status", status))
887888
serviceCondition.Status = status
888889
}
889890

@@ -901,8 +902,8 @@ func (s *serviceComponentImpl) getServiceStatus(ctx context.Context, ks v1.Servi
901902
resp.Code = common.Deploying
902903
}
903904
case serviceCondition.Status == corev1.ConditionTrue:
904-
slog.Debug("get instance info in getServiceStatus for corev1.ConditionTrue",
905-
slog.Any("svc", ks.Name), slog.Any("instance info", instInfo))
905+
slog.DebugContext(ctx, "get instance info in getServiceStatus for corev1.ConditionTrue",
906+
slog.Any("ksvc_name", ks.Name), slog.Any("instance info", instInfo))
906907
resp.Code = common.Running
907908
if len(instInfo.Instances) == 0 {
908909
resp.Code = common.Sleeping
@@ -1466,11 +1467,13 @@ func (s *serviceComponentImpl) reportServiceLog(msg string, ksvc *database.Knati
14661467
func (s *serviceComponentImpl) CreateRevisions(ctx context.Context, req types.CreateRevisionReq) error {
14671468
cluster, err := s.clusterPool.GetClusterByID(ctx, req.ClusterID)
14681469
if err != nil {
1470+
slog.ErrorContext(ctx, "fail to get cluster", slog.String("svcName", req.SvcName), slog.String("clusterID", req.ClusterID), slog.Any("error", err))
14691471
return fmt.Errorf("fail to get cluster, error %v ", err)
14701472
}
14711473

14721474
ksvc, err := getServices(ctx, cluster, s.k8sNameSpace, req.SvcName)
14731475
if err != nil {
1476+
slog.ErrorContext(ctx, "fail to get service", slog.String("svcName", req.SvcName), slog.String("clusterID", req.ClusterID), slog.Any("error", err))
14741477
return err
14751478
}
14761479

@@ -1494,6 +1497,9 @@ func (s *serviceComponentImpl) CreateRevisions(ctx context.Context, req types.Cr
14941497
duplicateRev := ""
14951498
for _, rev := range revisionList.Items {
14961499
if rev.IsReady() {
1500+
if isRevisionDeleted(&rev) {
1501+
continue
1502+
}
14971503
totalReadyRev++
14981504
}
14991505
if rev.Labels != nil && rev.Labels[CommitId] == req.Commit {
@@ -1521,30 +1527,28 @@ func (s *serviceComponentImpl) CreateRevisions(ctx context.Context, req types.Cr
15211527
// enable automatic revision cleanup (enabled by default, this line makes it explicit)
15221528
ksvc.Annotations["serving.knative.dev/revisionRetentionPolicy"] = "automatic"
15231529
// keep maxScaleInt revisions
1524-
ksvc.Annotations["serving.knative.dev/maxRetainedRevisions"] = ksvc.Spec.Template.Annotations[KeyMaxScale]
1525-
1526-
if req.InitialTraffic > 0 {
1527-
traffic := []v1.TrafficTarget{}
1528-
if req.InitialTraffic == 100 {
1529-
traffic = append(traffic, v1.TrafficTarget{
1530-
Percent: utils.Int64Ptr(int64(req.InitialTraffic)),
1531-
})
1532-
} else {
1533-
remainPercent := int64(100 - req.InitialTraffic)
1534-
traffic = append(traffic, v1.TrafficTarget{
1535-
Percent: utils.Int64Ptr(int64(req.InitialTraffic)),
1536-
})
1537-
1538-
revisionName := getKsvcMaxPercentRevisionName(ksvc)
1539-
1540-
traffic = append(traffic, v1.TrafficTarget{
1541-
LatestRevision: utils.BoolPtr(false),
1542-
Percent: utils.Int64Ptr(remainPercent),
1543-
RevisionName: revisionName,
1544-
})
1545-
}
1546-
ksvc.Spec.Traffic = traffic
1530+
ksvc.Annotations["serving.knative.dev/maxRetainedRevisions"] = "2"
1531+
1532+
traffic := []v1.TrafficTarget{}
1533+
if req.InitialTraffic == 100 {
1534+
traffic = append(traffic, v1.TrafficTarget{
1535+
Percent: utils.Int64Ptr(int64(req.InitialTraffic)),
1536+
})
1537+
} else {
1538+
remainPercent := int64(100 - req.InitialTraffic)
1539+
traffic = append(traffic, v1.TrafficTarget{
1540+
Percent: utils.Int64Ptr(int64(req.InitialTraffic)),
1541+
})
1542+
1543+
revisionName := getKsvcMaxPercentRevisionName(ksvc)
1544+
1545+
traffic = append(traffic, v1.TrafficTarget{
1546+
LatestRevision: utils.BoolPtr(false),
1547+
Percent: utils.Int64Ptr(remainPercent),
1548+
RevisionName: revisionName,
1549+
})
15471550
}
1551+
ksvc.Spec.Traffic = traffic
15481552

15491553
_, err = cluster.KnativeClient.ServingV1().Services(s.k8sNameSpace).Update(ctx, ksvc, metav1.UpdateOptions{})
15501554

@@ -1554,7 +1558,7 @@ func (s *serviceComponentImpl) CreateRevisions(ctx context.Context, req types.Cr
15541558
func getKsvcMaxPercentRevisionName(ksvc *v1.Service) string {
15551559
var maxPercent int64 = 0
15561560
revisionName := ""
1557-
for _, t := range ksvc.Spec.Traffic {
1561+
for _, t := range ksvc.Status.Traffic {
15581562
if t.Percent != nil && *t.Percent > maxPercent && t.RevisionName != "" {
15591563
maxPercent = *t.Percent
15601564
revisionName = t.RevisionName
@@ -1571,21 +1575,25 @@ func getKsvcMaxPercentRevisionName(ksvc *v1.Service) string {
15711575
func (s *serviceComponentImpl) SetVersionsTraffic(ctx context.Context, clusterId string, svcName string, req []types.TrafficReq) error {
15721576
cluster, err := s.clusterPool.GetClusterByID(ctx, clusterId)
15731577
if err != nil {
1578+
slog.ErrorContext(ctx, "fail to get cluster", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
15741579
return fmt.Errorf("fail to get cluster, error %v ", err)
15751580
}
15761581

15771582
ksvc, err := getServices(ctx, cluster, s.k8sNameSpace, svcName)
15781583
if err != nil {
1584+
slog.ErrorContext(ctx, "fail to get service", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
15791585
return fmt.Errorf("fail to get service %s, error %v ", svcName, err)
15801586
}
15811587

15821588
revisionList, err := getRevisionList(ctx, cluster, s.k8sNameSpace, svcName)
15831589
if err != nil {
1590+
slog.ErrorContext(ctx, "fail to get revisions", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
15841591
return fmt.Errorf("fail to get revisions %s, error %v ", svcName, err)
15851592
}
15861593

15871594
commitToRevisionMap, err := buildCommitRevisionMap(revisionList)
15881595
if err != nil {
1596+
slog.ErrorContext(ctx, "fail to build commit revision map", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
15891597
return fmt.Errorf("fail to build commit revision map, error %w", err)
15901598
}
15911599

@@ -1596,11 +1604,13 @@ func (s *serviceComponentImpl) SetVersionsTraffic(ctx context.Context, clusterId
15961604

15971605
trafficTargets, err := buildTrafficTargetsByCommit(ctx, req, commitToRevisionMap)
15981606
if err != nil {
1607+
slog.ErrorContext(ctx, "fail to build traffic targets", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
15991608
return fmt.Errorf("fail to build traffic targets, error %w", err)
16001609
}
16011610
ksvc.Spec.Traffic = trafficTargets
16021611
_, err = cluster.KnativeClient.ServingV1().Services(s.k8sNameSpace).Update(ctx, ksvc, metav1.UpdateOptions{})
16031612
if err != nil {
1613+
slog.ErrorContext(ctx, "fail to update service", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
16041614
return fmt.Errorf("fail to update service %s, error %v ", svcName, err)
16051615
}
16061616

@@ -1610,6 +1620,7 @@ func (s *serviceComponentImpl) SetVersionsTraffic(ctx context.Context, clusterId
16101620
func (s *serviceComponentImpl) ListVersions(ctx context.Context, clusterId string, svcName string) ([]types.KsvcRevisionInfo, error) {
16111621
revisionList, err := s.revisionStore.ListRevisions(ctx, svcName)
16121622
if err != nil {
1623+
slog.ErrorContext(ctx, "fail to get revisions", slog.String("svcName", svcName), slog.Any("error", err))
16131624
return nil, fmt.Errorf("fail to get revisions %s, error %v ", svcName, err)
16141625
}
16151626

@@ -1631,27 +1642,67 @@ func (s *serviceComponentImpl) ListVersions(ctx context.Context, clusterId strin
16311642
func (s *serviceComponentImpl) DeleteKsvcVersion(ctx context.Context, clusterId string, svcName string, commitID string) error {
16321643
cluster, err := s.clusterPool.GetClusterByID(ctx, clusterId)
16331644
if err != nil {
1645+
slog.ErrorContext(ctx, "fail to get cluster", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
16341646
return fmt.Errorf("fail to get cluster, error %v ", err)
16351647
}
16361648

16371649
rev, err := s.revisionStore.QueryRevision(ctx, svcName, commitID)
16381650
if err != nil {
1639-
slog.ErrorContext(ctx, "fail to get revision", slog.String("commitID", commitID), slog.Any("error", err))
1651+
slog.ErrorContext(ctx, "fail to get revision", slog.String("svcName", svcName), slog.String("commitID", commitID), slog.Any("error", err))
16401652
return err
16411653
}
1654+
ksvc, err := getServices(ctx, cluster, s.k8sNameSpace, svcName)
1655+
if err != nil {
1656+
slog.ErrorContext(ctx, "fail to get service", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
1657+
return errorx.ErrDeployNotFoundErr
1658+
}
16421659

1643-
if rev == nil {
1644-
return errorx.ErrDatabaseNoRows
1660+
hasOtherRevision := false
1661+
for _, item := range ksvc.Spec.Traffic {
1662+
if item.RevisionName != rev.RevisionName {
1663+
hasOtherRevision = true
1664+
break
1665+
}
16451666
}
16461667

1647-
if rev.TrafficPercent > 0 {
1648-
return errorx.ErrTrafficPercentNotZero
1668+
if !hasOtherRevision {
1669+
slog.ErrorContext(ctx, "no other valid revision, can not delete", slog.String("delRevision", rev.RevisionName))
1670+
return errorx.ErrNoOtherValidRevision
16491671
}
16501672

1651-
err = cluster.KnativeClient.ServingV1().Revisions(s.k8sNameSpace).Delete(ctx, rev.RevisionName, metav1.DeleteOptions{})
1673+
newTraffic, isValid := TrafficFixAlgorithm(ksvc.Spec.Traffic, rev.RevisionName)
1674+
if !isValid {
1675+
slog.ErrorContext(ctx, "traffic fix failed", slog.String("svcName", svcName))
1676+
return fmt.Errorf("traffic fix for %s failed", svcName)
1677+
}
1678+
ksvc.Spec.Traffic = newTraffic
1679+
_, err = cluster.KnativeClient.ServingV1().Services(s.k8sNameSpace).Update(ctx, ksvc, metav1.UpdateOptions{})
16521680
if err != nil {
1653-
return fmt.Errorf("fail to delete revision %s, error %v ", rev.RevisionName, err)
1681+
slog.ErrorContext(ctx, "fail to update service", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
1682+
return fmt.Errorf("fail to update service %s, error %v ", svcName, err)
16541683
}
16551684

1685+
revision, err := cluster.KnativeClient.
1686+
ServingV1().
1687+
Revisions(s.k8sNameSpace).
1688+
Get(ctx, rev.RevisionName, metav1.GetOptions{})
1689+
if err != nil {
1690+
slog.ErrorContext(ctx, "fail to get revision", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
1691+
return err
1692+
}
1693+
1694+
if revision.Annotations == nil {
1695+
revision.Annotations = map[string]string{}
1696+
}
1697+
revision.Annotations[RevisionState] = "deleted"
1698+
1699+
_, err = cluster.KnativeClient.
1700+
ServingV1().
1701+
Revisions(s.k8sNameSpace).
1702+
Update(ctx, revision, metav1.UpdateOptions{})
1703+
if err != nil {
1704+
slog.ErrorContext(ctx, "fail to update revision", slog.String("svcName", svcName), slog.String("clusterID", clusterId), slog.Any("error", err))
1705+
return err
1706+
}
16561707
return nil
16571708
}

runner/component/service_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,7 @@ func TestServiceComponent_DeleteKsvcVersion(t *testing.T) {
972972
}, nil)
973973

974974
err := sc.DeleteKsvcVersion(ctx, "test", "test-service", "commit1")
975-
require.ErrorContains(t, err, "revisions.serving.knative.dev \"test-service-001\" not found")
975+
require.Contains(t, err.Error(), "SERVERLESS-ERR-1")
976976

977977
// Test case 2: Revision not found
978978
rss.EXPECT().QueryRevision(ctx, "test-service", "nonexistent").Return(nil, sql.ErrNoRows)
@@ -993,5 +993,5 @@ func TestServiceComponent_DeleteKsvcVersion(t *testing.T) {
993993

994994
err = sc.DeleteKsvcVersion(ctx, "test", "test-service", "commit2")
995995
require.Error(t, err)
996-
require.Equal(t, errorx.ErrTrafficPercentNotZero, err)
996+
require.Equal(t, errorx.ErrDeployNotFoundErr, err)
997997
}

0 commit comments

Comments
 (0)