Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/job/runtime_v2/job/single/kube_single_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package single
import (
"context"
"fmt"
"reflect"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -142,7 +143,10 @@ func (sp *KubeSingleJob) AddEventListener(ctx context.Context, listenerType stri
}

func filterFunc(obj interface{}) bool {
job := obj.(*unstructured.Unstructured)
job, ok := obj.(*unstructured.Unstructured)
if !ok {
return false
}
labels := job.GetLabels()
jobName := job.GetLabels()
if labels != nil && labels[pfschema.JobOwnerLabel] == pfschema.JobOwnerValue {
Expand All @@ -158,7 +162,12 @@ func filterFunc(obj interface{}) bool {

// JobStatus get single job status, message from interface{}, and covert to JobStatus
func (sp *KubeSingleJob) JobStatus(obj interface{}) (api.StatusInfo, error) {
unObj := obj.(*unstructured.Unstructured)
unObj, ok := obj.(*unstructured.Unstructured)
if !ok {
err := fmt.Errorf("interface {} is %v, not Unstructured", reflect.TypeOf(obj))
log.Errorf("convert unstructured object failed. err: %v", err)
return api.StatusInfo{}, err
}
// convert to Pod struct
job := &v1.Pod{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unObj.Object, job); err != nil {
Expand Down
21 changes: 17 additions & 4 deletions pkg/job/runtime_v2/job/util/kuberuntime/job_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package kuberuntime

import (
"fmt"
"reflect"
"strings"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
k8sschema "k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -89,7 +90,11 @@ func getJobStatus(statusInfo api.StatusInfo, annotations map[string]string) api.
}

func JobAddFunc(obj interface{}, getStatusFunc api.GetStatusFunc) (*api.JobSyncInfo, error) {
jobObj := obj.(*unstructured.Unstructured)
jobObj, ok := obj.(*unstructured.Unstructured)
if !ok {
err := fmt.Errorf("interface {} is %v, not Unstructured", reflect.TypeOf(obj))
return nil, err
}
gvk := jobObj.GroupVersionKind()

log.Infof("begin add %s job. jobName: %s, namespace: %s", gvk.String(), jobObj.GetName(), jobObj.GetNamespace())
Expand Down Expand Up @@ -180,7 +185,12 @@ func JobUpdateFunc(old, new interface{}, getStatusFunc api.GetStatusFunc) (*api.
}

func JobDeleteFunc(obj interface{}, getStatusFunc api.GetStatusFunc) (*api.JobSyncInfo, error) {
jobObj := obj.(*unstructured.Unstructured)
jobObj, ok := obj.(*unstructured.Unstructured)
if !ok {
err := fmt.Errorf("interface {} is %v, not Unstructured", reflect.TypeOf(obj))
log.Errorf("convert unstructured object failed. err: %v", err)
return nil, err
}
// get job id and GroupVersionKind
gvk := jobObj.GroupVersionKind()
labels := jobObj.GetLabels()
Expand Down Expand Up @@ -325,7 +335,10 @@ func handlePendingPod(podStatus *v1.PodStatus, jobName, podName, namespace strin
}

func TaskUpdateFunc(obj interface{}, action schema.ActionType, taskQueue workqueue.RateLimitingInterface) {
podObj := obj.(*unstructured.Unstructured)
podObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return
}
uid := podObj.GetUID()
name := podObj.GetName()
namespace := podObj.GetNamespace()
Expand Down
13 changes: 11 additions & 2 deletions pkg/job/runtime_v2/job/util/kuberuntime/kube_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ func (kj *KubeBaseJob) delete(obj interface{}) {

// ResponsibleForJob filter job belong to PaddleFlow
func ResponsibleForJob(obj interface{}) bool {
job := obj.(*unstructured.Unstructured)
job, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Errorf("interface {} is %v, not Unstructured", reflect.TypeOf(obj))
return false
}
labels := job.GetLabels()
if labels != nil && labels[schema.JobOwnerLabel] == schema.JobOwnerValue {
log.Debugf("responsible for handle job. jobName:[%s]", job.GetName())
Expand Down Expand Up @@ -864,7 +868,12 @@ func updateKubeJobPriority(jobInfo *api.PFJob, runtimeClient framework.RuntimeCl
log.Errorf("get pod group for job %s failed, err: %v", jobInfo.ID, err)
return err
}
unObj := obj.(*unstructured.Unstructured)
unObj, ok := obj.(*unstructured.Unstructured)
if !ok {
err := fmt.Errorf("interface {} is %v, not Unstructured", reflect.TypeOf(obj))
log.Errorf("get pod group for job %s failed, err: %v", jobInfo.ID, err)
return err
}
oldPG := &schedulingv1beta1.PodGroup{}
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unObj.Object, oldPG); err != nil {
log.Errorf("convert unstructured object [%v] to pod group failed. err: %v", obj, err)
Expand Down