Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 148 additions & 45 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ghjm/cmdline"
"github.com/google/shlex"
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -55,6 +56,8 @@ type KubeUnit struct {
config *rest.Config
clientset kubernetes.Interface
Pod *corev1.Pod
Job *batchv1.Job
isJobSpec bool
podPendingTimeout time.Duration
}

Expand All @@ -79,6 +82,9 @@ type KubeAPIer interface {
List(context.Context, kubernetes.Interface, string, metav1.ListOptions) (*corev1.PodList, error)
Watch(context.Context, kubernetes.Interface, string, metav1.ListOptions) (watch.Interface, error)
Delete(context.Context, kubernetes.Interface, string, string, metav1.DeleteOptions) error
CreateJob(context.Context, kubernetes.Interface, string, *batchv1.Job, metav1.CreateOptions) (*batchv1.Job, error)
GetJob(context.Context, kubernetes.Interface, string, string, metav1.GetOptions) (*batchv1.Job, error)
DeleteJob(context.Context, kubernetes.Interface, string, string, metav1.DeleteOptions) error
SubResource(kubernetes.Interface, string, string) *rest.Request
InClusterConfig() (*rest.Config, error)
NewDefaultClientConfigLoadingRules() *clientcmd.ClientConfigLoadingRules
Expand Down Expand Up @@ -129,6 +135,18 @@ func (ku KubeAPIWrapper) Delete(ctx context.Context, clientset kubernetes.Interf
return clientset.CoreV1().Pods(namespace).Delete(ctx, name, opts)
}

func (ku KubeAPIWrapper) CreateJob(ctx context.Context, clientset kubernetes.Interface, namespace string, job *batchv1.Job, opts metav1.CreateOptions) (*batchv1.Job, error) {
return clientset.BatchV1().Jobs(namespace).Create(ctx, job, opts)
}

func (ku KubeAPIWrapper) GetJob(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, opts metav1.GetOptions) (*batchv1.Job, error) {
return clientset.BatchV1().Jobs(namespace).Get(ctx, name, opts)
}

func (ku KubeAPIWrapper) DeleteJob(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, opts metav1.DeleteOptions) error {
return clientset.BatchV1().Jobs(namespace).Delete(ctx, name, opts)
}

func (ku KubeAPIWrapper) SubResource(clientset kubernetes.Interface, podName string, podNamespace string) *rest.Request {
return clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(podNamespace).SubResource("attach")
}
Expand Down Expand Up @@ -227,6 +245,64 @@ func podRunningAndReady(kw KubeUnit) func(event watch.Event) (bool, error) {
return inner
}

func (kw *KubeUnit) waitForJobPod() error {
jobName := kw.Job.Name
namespace := kw.Job.Namespace

for i := 0; i < 60; i++ {
pods, err := kw.KubeAPIWrapperInstance.List(kw.GetContext(), kw.clientset, namespace, metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", jobName),
})
if err != nil {
return fmt.Errorf("failed to list pods for job %s: %w", jobName, err)
}

if len(pods.Items) > 0 {
kw.Pod = &pods.Items[0]
kw.UpdateFullStatus(func(status *StatusFileData) {
status.State = WorkStatePending
status.Detail = "Job created, pod found"
status.StdoutSize = 0
status.ExtraData.(*KubeExtraData).PodName = kw.Pod.Name
})

return nil
}

time.Sleep(500 * time.Millisecond)
}

return fmt.Errorf("job %s did not create any pods within 30 seconds", jobName)
}

func (kw *KubeUnit) validateAndSetupWorkerContainer(spec *corev1.PodSpec) error {
foundWorker := false
for i := range spec.Containers {
if spec.Containers[i].Name == WorkerContainerName {
spec.Containers[i].Stdin = true
spec.Containers[i].StdinOnce = true
foundWorker = true

break
}
}
if !foundWorker {
return fmt.Errorf("at least one container must be named worker")
}
spec.RestartPolicy = corev1.RestartPolicyNever

return nil
}

func (kw *KubeUnit) extractMetadata(objectMeta *metav1.ObjectMeta, ked *KubeExtraData) {
if objectMeta.Namespace != "" {
ked.KubeNamespace = objectMeta.Namespace
}
if objectMeta.Name != "" {
kw.namePrefix = objectMeta.Name + "-"
}
}

func (kw *KubeUnit) GetKubeTimeoutStart() time.Duration {
// RECEPTOR_KUBE_TIMEOUT_START
// default: 1 second
Expand Down Expand Up @@ -603,37 +679,55 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error {
var objectMeta *metav1.ObjectMeta
if ked.KubePod != "" {
decode := scheme.Codecs.UniversalDeserializer().Decode
_, _, err := decode([]byte(ked.KubePod), nil, pod)
if err != nil {
return err
}
foundWorker := false
spec = &pod.Spec
for i := range spec.Containers {
if spec.Containers[i].Name == WorkerContainerName {
spec.Containers[i].Stdin = true
spec.Containers[i].StdinOnce = true
foundWorker = true

break
// Try Job first
job := &batchv1.Job{}
_, _, err := decode([]byte(ked.KubePod), nil, job)
if err == nil && job.Kind == "Job" {
kw.isJobSpec = true
spec = &job.Spec.Template.Spec
kw.extractMetadata(&job.ObjectMeta, ked)

err = kw.validateAndSetupWorkerContainer(spec)
if err != nil {
return err
}

// Create Job
job.ObjectMeta.Name = ""
job.ObjectMeta.GenerateName = kw.namePrefix
job.ObjectMeta.Namespace = ked.KubeNamespace

kw.Job, err = kw.KubeAPIWrapperInstance.CreateJob(kw.GetContext(), kw.clientset, ked.KubeNamespace, job, metav1.CreateOptions{})
if err != nil {
return err
}

// Wait for Job to create Pod
err = kw.waitForJobPod()
if err != nil {
return err
}
} else {
// Fall back to Pod spec
_, _, err := decode([]byte(ked.KubePod), nil, pod)
if err != nil {
return err
}

spec = &pod.Spec
kw.extractMetadata(&pod.ObjectMeta, ked)

err = kw.validateAndSetupWorkerContainer(spec)
if err != nil {
return err
}

objectMeta = &pod.ObjectMeta
objectMeta.Name = ""
objectMeta.GenerateName = kw.namePrefix
objectMeta.Namespace = ked.KubeNamespace
}
if !foundWorker {
return fmt.Errorf("at least one container must be named worker")
}
spec.RestartPolicy = corev1.RestartPolicyNever
userNamespace := pod.ObjectMeta.Namespace
if userNamespace != "" {
ked.KubeNamespace = userNamespace
}
userPodName := pod.ObjectMeta.Name
if userPodName != "" {
kw.namePrefix = userPodName + "-"
}
objectMeta = &pod.ObjectMeta
objectMeta.Name = ""
objectMeta.GenerateName = kw.namePrefix
objectMeta.Namespace = ked.KubeNamespace
} else {
objectMeta = &metav1.ObjectMeta{
GenerateName: kw.namePrefix,
Expand All @@ -653,26 +747,29 @@ func (kw *KubeUnit) CreatePod(env map[string]string) error {
}
}

pod = &corev1.Pod{
ObjectMeta: *objectMeta,
Spec: *spec,
}
// Only create Pod directly if it's not a Job spec (Job already created Pod)
if !kw.isJobSpec {
pod = &corev1.Pod{
ObjectMeta: *objectMeta,
Spec: *spec,
}

if env != nil {
evs := make([]corev1.EnvVar, 0)
for k, v := range env {
evs = append(evs, corev1.EnvVar{
Name: k,
Value: v,
})
if env != nil {
evs := make([]corev1.EnvVar, 0)
for k, v := range env {
evs = append(evs, corev1.EnvVar{
Name: k,
Value: v,
})
}
pod.Spec.Containers[0].Env = evs
}
pod.Spec.Containers[0].Env = evs
}

// get pod and store to kw.Pod
kw.Pod, err = kw.KubeAPIWrapperInstance.Create(kw.GetContext(), kw.clientset, ked.KubeNamespace, pod, metav1.CreateOptions{})
if err != nil {
return err
// get pod and store to kw.Pod
kw.Pod, err = kw.KubeAPIWrapperInstance.Create(kw.GetContext(), kw.clientset, ked.KubeNamespace, pod, metav1.CreateOptions{})
if err != nil {
return err
}
}

select {
Expand Down Expand Up @@ -1788,6 +1885,12 @@ func (kw *KubeUnit) Start() error {
func (kw *KubeUnit) Cancel() error {
kw.CancelContext()
kw.UpdateBasicStatus(WorkStateCanceled, "Canceled", -1)
if kw.Job != nil {
err := kw.KubeAPIWrapperInstance.DeleteJob(context.Background(), kw.clientset, kw.Job.Namespace, kw.Job.Name, metav1.DeleteOptions{})
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Error("Error deleting job %s: %s", kw.Job.Name, err)
}
}
if kw.Pod != nil {
err := kw.KubeAPIWrapperInstance.Delete(context.Background(), kw.clientset, kw.Pod.Namespace, kw.Pod.Name, metav1.DeleteOptions{})
if err != nil {
Expand Down
Loading