Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions kubectl-plugin/test/e2e/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package e2e

import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
)

type Client interface {
Core() kubernetes.Interface
Ray() rayclient.Interface
Config() rest.Config
}

type testClient struct {
core kubernetes.Interface
ray rayclient.Interface
config rest.Config
}

var _ Client = (*testClient)(nil)

func (t *testClient) Core() kubernetes.Interface {
return t.core
}

func (t *testClient) Ray() rayclient.Interface {
return t.ray
}

func (t *testClient) Config() rest.Config {
return t.config
}

func newTestClient() (Client, error) {
cfg, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
).ClientConfig()
if err != nil {
return nil, err
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}

rayClient, err := rayclient.NewForConfig(cfg)
if err != nil {
return nil, err
}

return &testClient{
core: kubeClient,
ray: rayClient,
config: *cfg,
}, nil
}
74 changes: 20 additions & 54 deletions kubectl-plugin/test/e2e/kubectl_ray_cluster_get_test.go
Original file line number Diff line number Diff line change
@@ -1,79 +1,45 @@
package e2e

import (
"bytes"
"os/exec"
"strings"
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/printers"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("Calling ray plugin `get` command", func() {
var namespace string
var ctx context.Context
var testClient Client

BeforeEach(func() {
namespace = createTestNamespace()
var err error
testClient, err = newTestClient()
Expect(err).NotTo(HaveOccurred())

namespace = createTestNamespace(testClient)
ctx = context.Background()

deployTestRayCluster(namespace)
DeferCleanup(func() {
deleteTestNamespace(namespace)
deleteTestNamespace(namespace, testClient)
namespace = ""
})
})

It("succeed in getting ray cluster information", func() {
cmd := exec.Command("kubectl", "ray", "get", "cluster", "--namespace", namespace)
output, err := cmd.CombinedOutput()

expectedOutputTablePrinter := printers.NewTablePrinter(printers.PrintOptions{})
expectedTestResultTable := &v1.Table{
ColumnDefinitions: []v1.TableColumnDefinition{
{Name: "Name", Type: "string"},
{Name: "Namespace", Type: "string"},
{Name: "Desired Workers", Type: "string"},
{Name: "Available Workers", Type: "string"},
{Name: "CPUs", Type: "string"},
{Name: "GPUs", Type: "string"},
{Name: "TPUs", Type: "string"},
{Name: "Memory", Type: "string"},
{Name: "Condition", Type: "string"},
{Name: "Status", Type: "string"},
{Name: "Age", Type: "string"},
},
}

expectedTestResultTable.Rows = append(expectedTestResultTable.Rows, v1.TableRow{
Cells: []interface{}{
"raycluster-kuberay",
namespace,
"1",
"1",
"2",
"0",
"0",
"3G",
rayv1.RayClusterProvisioned,
rayv1.Ready,
},
})
It("succeed in listing ray cluster information", func() {
rayClusters, err := testClient.Ray().RayV1().RayClusters(namespace).List(ctx, metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

var resbuffer bytes.Buffer
bufferr := expectedOutputTablePrinter.PrintObj(expectedTestResultTable, &resbuffer)
Expect(bufferr).NotTo(HaveOccurred())
Expect(rayClusters.Items).To(HaveLen(1))

Expect(err).NotTo(HaveOccurred())
Expect(strings.TrimSpace(string(output))).To(ContainSubstring(strings.TrimSpace(resbuffer.String())))
Expect(rayClusters.Items[0].Namespace).To(Equal(namespace))
Expect(rayClusters.Items[0].Name).To(Equal("raycluster-kuberay"))
})

It("should not succeed", func() {
cmd := exec.Command("kubectl", "ray", "get", "cluster", "--namespace", namespace, "fakeclustername", "anotherfakeclustername")
output, err := cmd.CombinedOutput()

It("fail on getting non-existing ray cluster", func() {
_, err := testClient.Ray().RayV1().RayClusters(namespace).Get(ctx, "fakeclustername", metav1.GetOptions{})
Expect(err).To(HaveOccurred())
Expect(output).ToNot(ContainElements("fakeclustername"))
})
})
21 changes: 13 additions & 8 deletions kubectl-plugin/test/e2e/kubectl_ray_job_submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ const (

var _ = Describe("Calling ray plugin `job submit` command on Ray Job", func() {
var namespace string
var testClient Client

BeforeEach(func() {
namespace = createTestNamespace()
var err error
testClient, err = newTestClient()
Expect(err).NotTo(HaveOccurred())

namespace = createTestNamespace(testClient)
deployTestRayCluster(namespace)
DeferCleanup(func() {
deleteTestNamespace(namespace)
deleteTestNamespace(namespace, testClient)
namespace = ""
})
})
Expand All @@ -47,7 +52,7 @@ var _ = Describe("Calling ray plugin `job submit` command on Ray Job", func() {
cmdOutputJobID := extractRayJobID(string(output))

// Use kubectl to check status of the rayjob
getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete")
getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete", testClient)
})

It("succeed in submitting RayJob with runtime environment set with working dir", func() {
Expand All @@ -68,7 +73,7 @@ var _ = Describe("Calling ray plugin `job submit` command on Ray Job", func() {
cmdOutputJobID := extractRayJobID(string(output))

// Use kubectl to check status of the rayjob
getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete")
getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete", testClient)
})

It("succeed in submitting RayJob with headNodeSelectors and workerNodeSelectors", func() {
Expand All @@ -94,7 +99,7 @@ var _ = Describe("Calling ray plugin `job submit` command on Ray Job", func() {
// Retrieve the Job ID from the output
cmdOutputJobID := extractRayJobID(string(output))

rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete")
rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete", testClient)
// Retrieve Job Head Node Selectors
Expect(rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.NodeSelector["kubernetes.io/os"]).To(Equal("linux"))
// Retrieve Job Worker Node Selectors
Expand Down Expand Up @@ -122,7 +127,7 @@ var _ = Describe("Calling ray plugin `job submit` command on Ray Job", func() {
// Retrieve the Job ID from the output
cmdOutputJobID := extractRayJobID(string(output))

rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete")
rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete", testClient)
Expect(rayJob.Spec.TTLSecondsAfterFinished).To(Equal(int32(0)))
Expect(rayJob.Spec.ShutdownAfterJobFinishes).To(BeTrue())
})
Expand All @@ -147,7 +152,7 @@ var _ = Describe("Calling ray plugin `job submit` command on Ray Job", func() {
// Retrieve the Job ID from the output
cmdOutputJobID := extractRayJobID(string(output))

rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete")
rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete", testClient)
Expect(rayJob.Spec.TTLSecondsAfterFinished).To(Equal(int32(0)))
Expect(rayJob.Spec.ShutdownAfterJobFinishes).To(BeFalse())
})
Expand All @@ -168,7 +173,7 @@ var _ = Describe("Calling ray plugin `job submit` command on Ray Job", func() {
// Retrieve the Job ID from the output
cmdOutputJobID := extractRayJobID(string(output))

rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete")
rayJob := getAndCheckRayJob(namespace, rayJobName, cmdOutputJobID, "SUCCEEDED", "Complete", testClient)
Expect(rayJob.Spec.TTLSecondsAfterFinished).To(Equal(int32(10)))
Expect(rayJob.Spec.ShutdownAfterJobFinishes).To(BeTrue())
})
Expand Down
8 changes: 6 additions & 2 deletions kubectl-plugin/test/e2e/kubectl_ray_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ var _ = Describe("Calling ray plugin `log` command on Ray Cluster", func() {
var namespace string

BeforeEach(func() {
namespace = createTestNamespace()
var err error
testClient, err := newTestClient()
Expect(err).NotTo(HaveOccurred())

namespace = createTestNamespace(testClient)
deployTestRayCluster(namespace)
DeferCleanup(func() {
deleteTestNamespace(namespace)
deleteTestNamespace(namespace, testClient)
namespace = ""
})
})
Expand Down
26 changes: 15 additions & 11 deletions kubectl-plugin/test/e2e/kubectl_ray_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,23 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("Calling ray plugin `session` command", Ordered, func() {
var namespace string
var testClient Client

BeforeEach(func() {
namespace = createTestNamespace()
var err error
testClient, err = newTestClient()
Expect(err).NotTo(HaveOccurred())

namespace = createTestNamespace(testClient)

deployTestRayCluster(namespace)
DeferCleanup(func() {
deleteTestNamespace(namespace)
deleteTestNamespace(namespace, testClient)
namespace = ""
})
})
Expand Down Expand Up @@ -90,22 +97,19 @@ var _ = Describe("Calling ray plugin `session` command", Ordered, func() {
}, 3*time.Second, 500*time.Millisecond).ShouldNot(HaveOccurred())

// Get the current head pod name
cmd := exec.Command("kubectl", "get", "--namespace", namespace, "pod/raycluster-kuberay-head", "-o", "jsonpath={.metadata.uid}")
output, err := cmd.CombinedOutput()
oldPod, err := testClient.Core().CoreV1().Pods(namespace).Get(ctx, "raycluster-kuberay-head", metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
oldPodUID := string(output)
oldPodUID := string(oldPod.UID)
var newPodUID string

// Delete the pod
cmd = exec.Command("kubectl", "delete", "--namespace", namespace, "pod/raycluster-kuberay-head")
err = cmd.Run()
err = testClient.Core().CoreV1().Pods(namespace).Delete(ctx, "raycluster-kuberay-head", metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

// Wait for the new pod to be created
Eventually(func() error {
cmd := exec.Command("kubectl", "get", "--namespace", namespace, "pod/raycluster-kuberay-head", "-o", "jsonpath={.metadata.uid}")
output, err := cmd.CombinedOutput()
newPodUID = string(output)
newPod, err := testClient.Core().CoreV1().Pods(namespace).Get(ctx, "raycluster-kuberay-head", metav1.GetOptions{})
newPodUID = string(newPod.UID)
if err != nil {
return err
}
Expand All @@ -116,7 +120,7 @@ var _ = Describe("Calling ray plugin `session` command", Ordered, func() {
}, 60*time.Second, 1*time.Second).ShouldNot(HaveOccurred())

// Wait for the new pod to be ready
cmd = exec.Command("kubectl", "wait", "--namespace", namespace, "pod/raycluster-kuberay-head", "--for=condition=Ready", "--timeout=120s")
cmd := exec.Command("kubectl", "wait", "--namespace", namespace, "pod/raycluster-kuberay-head", "--for=condition=Ready", "--timeout=120s")
err = cmd.Run()
Expect(err).NotTo(HaveOccurred())

Expand Down
30 changes: 15 additions & 15 deletions kubectl-plugin/test/e2e/support.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package e2e

import (
"encoding/json"
"context"
"math/rand"
"os/exec"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)
Expand All @@ -22,24 +24,25 @@ func randStringBytes(n int) string {
return string(b)
}

func createTestNamespace() string {
func createTestNamespace(client Client) string {
ctx := context.Background()
GinkgoHelper()
suffix := randStringBytes(5)
ns := "test-ns-" + suffix
cmd := exec.Command("kubectl", "create", "namespace", ns)
err := cmd.Run()
nsObj, err := client.Core().CoreV1().Namespaces().Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(nsObj.Name).To(Equal(ns))
nsWithPrefix := "namespace/" + ns
cmd = exec.Command("kubectl", "wait", "--timeout=20s", "--for", "jsonpath={.status.phase}=Active", nsWithPrefix)
cmd := exec.Command("kubectl", "wait", "--timeout=20s", "--for", "jsonpath={.status.phase}=Active", nsWithPrefix)
err = cmd.Run()
Expect(err).NotTo(HaveOccurred())
return ns
}

func deleteTestNamespace(ns string) {
func deleteTestNamespace(ns string, client Client) {
ctx := context.Background()
GinkgoHelper()
cmd := exec.Command("kubectl", "delete", "namespace", ns)
err := cmd.Run()
err := client.Core().CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
}

Expand All @@ -61,18 +64,15 @@ func getAndCheckRayJob(
expectedJobID,
expectedJobStatus,
expectedJobDeploymentStatus string,
client Client,
) (rayjob rayv1.RayJob) {
ctx := context.Background()
GinkgoHelper()
cmd := exec.Command("kubectl", "get", "--namespace", namespace, "rayjob", name, "-o", "json")
output, err := cmd.CombinedOutput()
Expect(err).ToNot(HaveOccurred())

var rayJob rayv1.RayJob
err = json.Unmarshal(output, &rayJob)
rayJob, err := client.Ray().RayV1().RayJobs(namespace).Get(ctx, name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())

Expect(rayJob.Status.JobId).To(Equal(expectedJobID))
Expect(string(rayJob.Status.JobStatus)).To(Equal(expectedJobStatus))
Expect(string(rayJob.Status.JobDeploymentStatus)).To(Equal(expectedJobDeploymentStatus))
return rayJob
return *rayJob
}
Loading