Skip to content

Commit 1ed6d6d

Browse files
committed
fix job not stopped after execute findished
Signed-off-by: Patrick Zhao <[email protected]>
1 parent c877c27 commit 1ed6d6d

File tree

4 files changed

+165
-5
lines changed

4 files changed

+165
-5
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2023 The KodeRover Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package mongodb
18+
19+
import (
20+
"context"
21+
22+
"github.com/pkg/errors"
23+
"go.mongodb.org/mongo-driver/mongo"
24+
25+
"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
26+
mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo"
27+
)
28+
29+
type UpdateWorkflowTaskLogColl struct {
30+
*mongo.Collection
31+
32+
coll string
33+
}
34+
35+
type UpdateWorkflowTaskLog struct {
36+
WorkflowName string `bson:"workflow_name" json:"workflow_name"`
37+
TaskID int64 `bson:"task_id" json:"task_id"`
38+
StartTime int64 `bson:"start_time" json:"start_time"`
39+
EndTime int64 `bson:"end_time" json:"end_time"`
40+
Status string `bson:"status" json:"status"`
41+
Data interface{} `bson:"data" json:"data"`
42+
}
43+
44+
func (c UpdateWorkflowTaskLog) TableName() string {
45+
return "update_workflow_task_log"
46+
}
47+
48+
func NewUpdateWorkflowTaskLogColl() *UpdateWorkflowTaskLogColl {
49+
name := UpdateWorkflowTaskLog{}.TableName()
50+
return &UpdateWorkflowTaskLogColl{
51+
Collection: mongotool.Database(config.MongoDatabase()).Collection(name),
52+
coll: name,
53+
}
54+
}
55+
56+
func (c *UpdateWorkflowTaskLogColl) GetCollectionName() string {
57+
return c.coll
58+
}
59+
60+
func (c *UpdateWorkflowTaskLogColl) EnsureIndex(ctx context.Context) error {
61+
return nil
62+
}
63+
64+
func (c *UpdateWorkflowTaskLogColl) Create(args *UpdateWorkflowTaskLog) error {
65+
if args == nil {
66+
return errors.New("nil UpdateWorkflowTaskLog")
67+
}
68+
69+
_, err := c.InsertOne(context.Background(), args)
70+
return err
71+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2023 The KodeRover Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package mongodb
18+
19+
import (
20+
"context"
21+
22+
"github.com/pkg/errors"
23+
"go.mongodb.org/mongo-driver/mongo"
24+
25+
"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
26+
mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo"
27+
)
28+
29+
type WaitPodFinishLogColl struct {
30+
*mongo.Collection
31+
32+
coll string
33+
}
34+
35+
type WaitPodFinishLog struct {
36+
JobName string `bson:"job_name" json:"job_name"`
37+
JobStatus string `bson:"job_status" json:"job_status"`
38+
Namespace string `bson:"namespace" json:"namespace"`
39+
PodName string `bson:"pod_name" json:"pod_name"`
40+
Status string `bson:"status" json:"status"`
41+
}
42+
43+
func (c WaitPodFinishLog) TableName() string {
44+
return "wait_pod_finish_log"
45+
}
46+
47+
func NewWaitPodFinishLogColl() *WaitPodFinishLogColl {
48+
name := WaitPodFinishLog{}.TableName()
49+
return &WaitPodFinishLogColl{
50+
Collection: mongotool.Database(config.MongoDatabase()).Collection(name),
51+
coll: name,
52+
}
53+
}
54+
55+
func (c *WaitPodFinishLogColl) GetCollectionName() string {
56+
return c.coll
57+
}
58+
59+
func (c *WaitPodFinishLogColl) EnsureIndex(ctx context.Context) error {
60+
return nil
61+
}
62+
63+
func (c *WaitPodFinishLogColl) Create(args *WaitPodFinishLog) error {
64+
if args == nil {
65+
return errors.New("nil WaitPodFinishLog")
66+
}
67+
68+
_, err := c.InsertOne(context.Background(), args)
69+
return err
70+
}

pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -829,18 +829,16 @@ func isPodFailed(podName, namespace string, apiReader client.Reader, xl *zap.Sug
829829

830830
func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time.Time, namespace, jobName string, checkFile bool, kubeClient crClient.Client, clientset kubernetes.Interface, restConfig *rest.Config, informer informers.SharedInformerFactory, jobTask *commonmodels.JobTask, ack func(), xl *zap.SugaredLogger) (status config.Status, errMsg string) {
831831
xl.Infof("wait job to end: %s %s", namespace, jobName)
832-
podLister := informer.Core().V1().Pods().Lister().Pods(namespace)
833-
jobLister := informer.Batch().V1().Jobs().Lister().Jobs(namespace)
834-
cmLister := informer.Core().V1().ConfigMaps().Lister().ConfigMaps(namespace)
835832
for {
836833
select {
837834
case <-ctx.Done():
838835
return config.StatusCancelled, ""
839-
840836
case <-taskTimeout:
841837
return config.StatusTimeout, ""
842-
843838
default:
839+
jobLister := informer.Batch().V1().Jobs().Lister().Jobs(namespace)
840+
cmLister := informer.Core().V1().ConfigMaps().Lister().ConfigMaps(namespace)
841+
844842
job, err := jobLister.Get(jobName)
845843
if err != nil {
846844
errMsg := fmt.Sprintf("failed to get job pod job-name=%s %v", jobName, err)
@@ -857,13 +855,23 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time.
857855
// pod is still running
858856
switch {
859857
case job.Status.Active != 0:
858+
podLister := informer.Core().V1().Pods().Lister().Pods(namespace)
860859
pods, err := podLister.List(labels.Set{"job-name": jobName}.AsSelector())
861860
if err != nil {
862861
errMsg := fmt.Sprintf("failed to find pod with label job-name=%s %v", jobName, err)
863862
xl.Errorf(errMsg)
864863
return config.StatusFailed, errMsg
865864
}
866865
for _, pod := range pods {
866+
commonrepo.NewWaitPodFinishLogColl().Create(
867+
&commonrepo.WaitPodFinishLog{
868+
Namespace: namespace,
869+
PodName: pod.Name,
870+
JobName: jobName,
871+
JobStatus: job.Status.String(),
872+
Status: string(pod.Status.Phase),
873+
})
874+
867875
ipod := wrapper.Pod(pod)
868876
if ipod.Pending() {
869877
continue

pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,17 @@ func updateworkflowStatus(workflow *commonmodels.WorkflowTask) {
549549
}
550550

551551
func (c *workflowCtl) updateWorkflowTask() {
552+
c.workflowTaskMutex.Lock()
553+
commonrepo.NewUpdateWorkflowTaskLogColl().Create(&commonrepo.UpdateWorkflowTaskLog{
554+
WorkflowName: c.workflowTask.WorkflowName,
555+
TaskID: c.workflowTask.TaskID,
556+
StartTime: c.workflowTask.StartTime,
557+
EndTime: c.workflowTask.EndTime,
558+
Status: string(c.workflowTask.Status),
559+
Data: c.workflowTask,
560+
})
561+
c.workflowTaskMutex.Unlock()
562+
552563
taskInColl, err := commonrepo.NewworkflowTaskv4Coll().Find(c.workflowTask.WorkflowName, c.workflowTask.TaskID)
553564
if err != nil {
554565
c.logger.Errorf("find workflow task v4 %s failed,error: %v", c.workflowTask.WorkflowName, err)

0 commit comments

Comments
 (0)