diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index 87b31579a..0476a69e0 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -21,6 +21,7 @@ import ( "github.com/ghjm/cmdline" "github.com/google/shlex" "github.com/spf13/viper" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -55,6 +56,8 @@ type KubeUnit struct { config *rest.Config clientset kubernetes.Interface Pod *corev1.Pod + Job *batchv1.Job + isJobSpec bool podPendingTimeout time.Duration } @@ -79,6 +82,9 @@ type KubeAPIer interface { List(context.Context, kubernetes.Interface, string, metav1.ListOptions) (*corev1.PodList, error) Watch(context.Context, kubernetes.Interface, string, metav1.ListOptions) (watch.Interface, error) Delete(context.Context, kubernetes.Interface, string, string, metav1.DeleteOptions) error + CreateJob(context.Context, kubernetes.Interface, string, *batchv1.Job, metav1.CreateOptions) (*batchv1.Job, error) + GetJob(context.Context, kubernetes.Interface, string, string, metav1.GetOptions) (*batchv1.Job, error) + DeleteJob(context.Context, kubernetes.Interface, string, string, metav1.DeleteOptions) error SubResource(kubernetes.Interface, string, string) *rest.Request InClusterConfig() (*rest.Config, error) NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules @@ -129,6 +135,18 @@ func (ku KubeAPIWrapper) Delete(ctx context.Context, clientset kubernetes.Interf return clientset.CoreV1().Pods(namespace).Delete(ctx, name, opts) } +func (ku KubeAPIWrapper) CreateJob(ctx context.Context, clientset kubernetes.Interface, namespace string, job *batchv1.Job, opts metav1.CreateOptions) (*batchv1.Job, error) { + return clientset.BatchV1().Jobs(namespace).Create(ctx, job, opts) +} + +func (ku KubeAPIWrapper) GetJob(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, opts metav1.GetOptions) (*batchv1.Job, error) { + return clientset.BatchV1().Jobs(namespace).Get(ctx, name, opts) +} + +func (ku KubeAPIWrapper) DeleteJob(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, opts metav1.DeleteOptions) error { + return clientset.BatchV1().Jobs(namespace).Delete(ctx, name, opts) +} + func (ku KubeAPIWrapper) SubResource(clientset kubernetes.Interface, podName string, podNamespace string) *rest.Request { return clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(podNamespace).SubResource("attach") } @@ -227,6 +245,64 @@ func podRunningAndReady(kw KubeUnit) func(event watch.Event) (bool, error) { return inner } +func (kw *KubeUnit) waitForJobPod() error { + jobName := kw.Job.Name + namespace := kw.Job.Namespace + + for i := 0; i < 60; i++ { + pods, err := kw.KubeAPIWrapperInstance.List(kw.GetContext(), kw.clientset, namespace, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", jobName), + }) + if err != nil { + return fmt.Errorf("failed to list pods for job %s: %w", jobName, err) + } + + if len(pods.Items) > 0 { + kw.Pod = &pods.Items[0] + kw.UpdateFullStatus(func(status *StatusFileData) { + status.State = WorkStatePending + status.Detail = "Job created, pod found" + status.StdoutSize = 0 + status.ExtraData.(*KubeExtraData).PodName = kw.Pod.Name + }) + + return nil + } + + time.Sleep(500 * time.Millisecond) + } + + return fmt.Errorf("job %s did not create any pods within 30 seconds", jobName) +} + +func (kw *KubeUnit) validateAndSetupWorkerContainer(spec *corev1.PodSpec) error { + foundWorker := false + for i := range spec.Containers { + if spec.Containers[i].Name == WorkerContainerName { + spec.Containers[i].Stdin = true + spec.Containers[i].StdinOnce = true + foundWorker = true + + break + } + } + if !foundWorker { + return fmt.Errorf("at least one container must be named worker") + } + spec.RestartPolicy = corev1.RestartPolicyNever + + return nil +} + +func (kw *KubeUnit) extractMetadata(objectMeta *metav1.ObjectMeta, ked *KubeExtraData) { + if objectMeta.Namespace != "" { + ked.KubeNamespace = objectMeta.Namespace + } + if objectMeta.Name != "" { + kw.namePrefix = objectMeta.Name + "-" + } +} + func (kw *KubeUnit) GetKubeTimeoutStart() time.Duration { // RECEPTOR_KUBE_TIMEOUT_START // default: 1 second @@ -603,37 +679,55 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error { var objectMeta *metav1.ObjectMeta if ked.KubePod != "" { decode := scheme.Codecs.UniversalDeserializer().Decode - _, _, err := decode([]byte(ked.KubePod), nil, pod) - if err != nil { - return err - } - foundWorker := false - spec = &pod.Spec - for i := range spec.Containers { - if spec.Containers[i].Name == WorkerContainerName { - spec.Containers[i].Stdin = true - spec.Containers[i].StdinOnce = true - foundWorker = true - break + // Try Job first + job := &batchv1.Job{} + _, _, err := decode([]byte(ked.KubePod), nil, job) + if err == nil && job.Kind == "Job" { + kw.isJobSpec = true + spec = &job.Spec.Template.Spec + kw.extractMetadata(&job.ObjectMeta, ked) + + err = kw.validateAndSetupWorkerContainer(spec) + if err != nil { + return err } + + // Create Job + job.ObjectMeta.Name = "" + job.ObjectMeta.GenerateName = kw.namePrefix + job.ObjectMeta.Namespace = ked.KubeNamespace + + kw.Job, err = kw.KubeAPIWrapperInstance.CreateJob(kw.GetContext(), kw.clientset, ked.KubeNamespace, job, metav1.CreateOptions{}) + if err != nil { + return err + } + + // Wait for Job to create Pod + err = kw.waitForJobPod() + if err != nil { + return err + } + } else { + // Fall back to Pod spec + _, _, err := decode([]byte(ked.KubePod), nil, pod) + if err != nil { + return err + } + + spec = &pod.Spec + kw.extractMetadata(&pod.ObjectMeta, ked) + + err = kw.validateAndSetupWorkerContainer(spec) + if err != nil { + return err + } + + objectMeta = &pod.ObjectMeta + objectMeta.Name = "" + objectMeta.GenerateName = kw.namePrefix + objectMeta.Namespace = ked.KubeNamespace } - if !foundWorker { - return fmt.Errorf("at least one container must be named worker") - } - spec.RestartPolicy = corev1.RestartPolicyNever - userNamespace := pod.ObjectMeta.Namespace - if userNamespace != "" { - ked.KubeNamespace = userNamespace - } - userPodName := pod.ObjectMeta.Name - if userPodName != "" { - kw.namePrefix = userPodName + "-" - } - objectMeta = &pod.ObjectMeta - objectMeta.Name = "" - objectMeta.GenerateName = kw.namePrefix - objectMeta.Namespace = ked.KubeNamespace } else { objectMeta = &metav1.ObjectMeta{ GenerateName: kw.namePrefix, @@ -653,26 +747,29 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error { } } - pod = &corev1.Pod{ - ObjectMeta: *objectMeta, - Spec: *spec, - } + // Only create Pod directly if it's not a Job spec (Job already created Pod) + if !kw.isJobSpec { + pod = &corev1.Pod{ + ObjectMeta: *objectMeta, + Spec: *spec, + } - if env != nil { - evs := make([]corev1.EnvVar, 0) - for k, v := range env { - evs = append(evs, corev1.EnvVar{ - Name: k, - Value: v, - }) + if env != nil { + evs := make([]corev1.EnvVar, 0) + for k, v := range env { + evs = append(evs, corev1.EnvVar{ + Name: k, + Value: v, + }) + } + pod.Spec.Containers[0].Env = evs } - pod.Spec.Containers[0].Env = evs - } - // get pod and store to kw.Pod - kw.Pod, err = kw.KubeAPIWrapperInstance.Create(kw.GetContext(), kw.clientset, ked.KubeNamespace, pod, metav1.CreateOptions{}) - if err != nil { - return err + // get pod and store to kw.Pod + kw.Pod, err = kw.KubeAPIWrapperInstance.Create(kw.GetContext(), kw.clientset, ked.KubeNamespace, pod, metav1.CreateOptions{}) + if err != nil { + return err + } } select { @@ -1788,6 +1885,12 @@ func (kw *KubeUnit) Start() error { func (kw *KubeUnit) Cancel() error { kw.CancelContext() kw.UpdateBasicStatus(WorkStateCanceled, "Canceled", -1) + if kw.Job != nil { + err := kw.KubeAPIWrapperInstance.DeleteJob(context.Background(), kw.clientset, kw.Job.Namespace, kw.Job.Name, metav1.DeleteOptions{}) + if err != nil { + kw.GetWorkceptor().nc.GetLogger().Error("Error deleting job %s: %s", kw.Job.Name, err) + } + } if kw.Pod != nil { err := kw.KubeAPIWrapperInstance.Delete(context.Background(), kw.clientset, kw.Pod.Namespace, kw.Pod.Name, metav1.DeleteOptions{}) if err != nil {