Skip to content

Commit 65dc5dd

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents 5bde392 + d355b36 commit 65dc5dd

File tree

7 files changed

+363
-142
lines changed

7 files changed

+363
-142
lines changed

tests/odh/mnist_ray_test.go

Lines changed: 14 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"bytes"
2121
"fmt"
2222
"testing"
23-
"time"
2423

2524
. "github.com/onsi/gomega"
2625
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
@@ -160,52 +159,19 @@ func mnistRay(t *testing.T, numGpus int, gpuResourceName string, rayImage string
160159
),
161160
)
162161

163-
// Fetch created raycluster
162+
// Try to monitor the Ray job via external dashboard (best-effort)
163+
// This provides job status logs and API logs when it works
164164
rayClusterName := "mnisttest"
165-
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
166-
test.Expect(err).ToNot(HaveOccurred())
167-
168-
// Initialise raycluster client to interact with raycluster to get rayjob details using REST-API
169-
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
170-
rayClient := GetRayClusterClient(test, dashboardUrl, test.Config().BearerToken)
171-
172-
// wait until rayjob exists
173-
test.Eventually(func() ([]RayJobDetailsResponse, error) {
174-
return rayClient.ListJobs()
175-
}, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found")
176-
177-
// Get test job-id
178-
jobID := GetTestJobId(test, rayClient)
179-
test.Expect(jobID).ToNot(BeEmpty())
180-
181-
// Wait for the job to be succeeded or failed
182-
var rayJobStatus string
183-
test.T().Logf("Waiting for job to be Succeeded...\n")
184-
test.Eventually(func() (string, error) {
185-
resp, err := rayClient.GetJobDetails(jobID)
186-
if err != nil {
187-
return rayJobStatus, err
188-
}
189-
rayJobStatusVal := resp.Status
190-
if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" {
191-
test.T().Logf("JobStatus - %s\n", rayJobStatusVal)
192-
rayJobStatus = rayJobStatusVal
193-
return rayJobStatus, nil
194-
}
195-
if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" {
196-
test.T().Logf("JobStatus - %s...\n", rayJobStatusVal)
197-
rayJobStatus = rayJobStatusVal
198-
}
199-
return rayJobStatus, nil
200-
}, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
201-
202-
// Store job logs in output directory
203-
WriteRayJobAPILogs(test, rayClient, jobID)
204-
205-
// Assert ray-job status after job execution
206-
test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !")
207-
208-
// Make sure the RayCluster finishes and is deleted
209-
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
210-
Should(BeEmpty())
165+
jobStatus, monitored := TryMonitorRayJob(test, namespace, rayClusterName)
166+
if monitored {
167+
test.T().Logf("Successfully monitored Ray job via external dashboard, status: %s", jobStatus)
168+
test.Expect(jobStatus).To(Equal("SUCCEEDED"), "RayJob failed!")
169+
} else {
170+
test.T().Logf("Could not monitor Ray job via external dashboard, falling back to RayCluster deletion check")
171+
}
172+
173+
// Wait for the RayCluster to be deleted (primary success indicator from notebook)
174+
test.T().Logf("Waiting for notebook to complete and delete the RayCluster...")
175+
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutDouble).
176+
Should(BeEmpty(), "RayCluster was not deleted - notebook may have failed")
211177
}

tests/odh/mnist_raytune_hpo_test.go

Lines changed: 18 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"bytes"
2121
"fmt"
2222
"testing"
23-
"time"
2423

2524
. "github.com/onsi/gomega"
2625
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
@@ -65,11 +64,11 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
6564
Resources: []v1beta1.ResourceQuota{
6665
{
6766
Name: corev1.ResourceCPU,
68-
NominalQuota: resource.MustParse("8"),
67+
NominalQuota: resource.MustParse("12"),
6968
},
7069
{
7170
Name: corev1.ResourceMemory,
72-
NominalQuota: resource.MustParse("12Gi"),
71+
NominalQuota: resource.MustParse("32Gi"),
7372
},
7473
{
7574
Name: corev1.ResourceName("nvidia.com/gpu"),
@@ -90,9 +89,9 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
9089
mnist_hpo := ParseAWSArgs(test, readFile(test, "resources/mnist_hpo.py"))
9190

9291
if numGpus > 0 {
93-
mnist_hpo = bytes.Replace(mnist_hpo, []byte("gpu_value=\"has to be specified\""), []byte("gpu_value=\"1\""), 1)
92+
mnist_hpo = bytes.Replace(mnist_hpo, []byte("int(\"has to be specified\")"), []byte("1"), 1)
9493
} else {
95-
mnist_hpo = bytes.Replace(mnist_hpo, []byte("gpu_value=\"has to be specified\""), []byte("gpu_value=\"0\""), 1)
94+
mnist_hpo = bytes.Replace(mnist_hpo, []byte("int(\"has to be specified\")"), []byte("0"), 1)
9695
}
9796

9897
config := CreateConfigMap(test, namespace.Name, map[string][]byte{
@@ -143,52 +142,19 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
143142
),
144143
)
145144

146-
// Fetch created raycluster
145+
// Try to monitor the Ray job via external dashboard (best-effort)
146+
// This provides job status logs and API logs when it works
147147
rayClusterName := "mnisthpotest"
148-
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
149-
test.Expect(err).ToNot(HaveOccurred())
150-
151-
// Initialise raycluster client to interact with raycluster to get rayjob details using REST-API
152-
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
153-
rayClient := GetRayClusterClient(test, dashboardUrl, test.Config().BearerToken)
154-
155-
// wait until rayjob exists
156-
test.Eventually(func() ([]RayJobDetailsResponse, error) {
157-
return rayClient.ListJobs()
158-
}, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found")
159-
160-
// Get rayjob-ID
161-
jobID := GetTestJobId(test, rayClient)
162-
test.Expect(jobID).ToNot(BeEmpty())
163-
164-
// Wait for the job to either succeed or fail
165-
var rayJobStatus string
166-
test.T().Logf("Waiting for job to be Succeeded...\n")
167-
test.Eventually(func() (string, error) {
168-
resp, err := rayClient.GetJobDetails(jobID)
169-
if err != nil {
170-
return rayJobStatus, err
171-
}
172-
rayJobStatusVal := resp.Status
173-
if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" {
174-
test.T().Logf("JobStatus - %s\n", rayJobStatusVal)
175-
rayJobStatus = rayJobStatusVal
176-
return rayJobStatus, nil
177-
}
178-
if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" {
179-
test.T().Logf("JobStatus - %s...\n", rayJobStatusVal)
180-
rayJobStatus = rayJobStatusVal
181-
}
182-
return rayJobStatus, nil
183-
}, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
184-
185-
// Store job logs in output directory
186-
WriteRayJobAPILogs(test, rayClient, jobID)
187-
188-
// Assert ray-job status after job execution
189-
test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !")
190-
191-
// Make sure the RayCluster finishes and is deleted
192-
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
193-
Should(BeEmpty())
148+
jobStatus, monitored := TryMonitorRayJob(test, namespace, rayClusterName)
149+
if monitored {
150+
test.T().Logf("Successfully monitored Ray job via external dashboard, status: %s", jobStatus)
151+
test.Expect(jobStatus).To(Equal("SUCCEEDED"), "RayJob failed!")
152+
} else {
153+
test.T().Logf("Could not monitor Ray job via external dashboard, falling back to RayCluster deletion check")
154+
}
155+
156+
// Wait for the RayCluster to be deleted (primary success indicator from notebook)
157+
test.T().Logf("Waiting for notebook to complete and delete the RayCluster...")
158+
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutDouble).
159+
Should(BeEmpty(), "RayCluster was not deleted - notebook may have failed")
194160
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
torchvision==0.18.0
1+
torchvision
22
minio

tests/odh/resources/mnist_hpo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def train_mnist(config):
188188
if __name__ == "__main__":
189189
# for early stopping
190190
sched = AsyncHyperBandScheduler()
191-
gpu_value="has to be specified"
191+
gpu_value=int("has to be specified")
192192
resources_per_trial = {"cpu": 1, "gpu": gpu_value}
193193
tuner = tune.Tuner(
194194
tune.with_resources(train_mnist, resources=resources_per_trial),

tests/odh/resources/mnist_hpo_raytune.ipynb

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,21 @@
7272
"outputs": [],
7373
"source": [
7474
"# Create ray cluster\n",
75+
"# HPO runs num_samples=5 trials, each needing 1 CPU, so we need more resources\n",
7576
"cluster = Cluster(\n",
7677
" ClusterConfiguration(\n",
7778
" namespace=namespace,\n",
7879
" name='mnisthpotest',\n",
79-
" head_cpu_requests=1,\n",
80-
" head_cpu_limits=1,\n",
81-
" head_memory_requests=6,\n",
82-
" head_memory_limits=8,\n",
80+
" head_cpu_requests=2,\n",
81+
" head_cpu_limits=2,\n",
82+
" head_memory_requests=8,\n",
83+
" head_memory_limits=10,\n",
8384
" head_extended_resource_requests={'nvidia.com/gpu':0},\n",
84-
" num_workers=1,\n",
85-
" worker_cpu_requests=1,\n",
86-
" worker_cpu_limits=1,\n",
87-
" worker_memory_requests=1,\n",
88-
" worker_memory_limits=4,\n",
85+
" num_workers=2,\n",
86+
" worker_cpu_requests=2,\n",
87+
" worker_cpu_limits=2,\n",
88+
" worker_memory_requests=2,\n",
89+
" worker_memory_limits=6,\n",
8990
" worker_extended_resource_requests={'nvidia.com/gpu':int(num_gpus)},\n",
9091
" image=ray_image,\n",
9192
" write_to_file=True,\n",
@@ -125,8 +126,8 @@
125126
"source": [
126127
"# Bring up the cluster\n",
127128
"cluster.up()\n",
128-
"# Wait until status is updated\n",
129-
"cluster.wait_ready()"
129+
"# Wait until status is updated (skip dashboard check as route naming changed in kuberay operator)\n",
130+
"cluster.wait_ready(dashboard_check=False)"
130131
]
131132
},
132133
{
@@ -160,7 +161,11 @@
160161
"metadata": {},
161162
"outputs": [],
162163
"source": [
163-
"ray_dashboard = cluster.cluster_dashboard_uri()\n",
164+
"# Access dashboard directly via internal service (notebook runs inside the cluster)\n",
165+
"# The service mnisthpotest-head-svc exposes the Ray dashboard on port 8265\n",
166+
"ray_dashboard = f\"http://mnisthpotest-head-svc.{namespace}.svc.cluster.local:8265\"\n",
167+
"print(f\"Ray dashboard URL: {ray_dashboard}\")\n",
168+
"\n",
164169
"header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n",
165170
"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n",
166171
"\n",
@@ -185,17 +190,34 @@
185190
"metadata": {},
186191
"outputs": [],
187192
"source": [
188-
"finished = False\n",
189-
"while not finished:\n",
193+
"import requests\n",
194+
"status = None\n",
195+
"error_count = 0\n",
196+
"max_errors = 60 # Max consecutive errors before giving up\n",
197+
"\n",
198+
"while status != \"SUCCEEDED\":\n",
190199
" sleep(1)\n",
191-
" status = client.get_job_status(submission_id)\n",
192-
" finished = (status == \"SUCCEEDED\")\n",
193-
"if finished:\n",
200+
" try:\n",
201+
" status = client.get_job_status(submission_id)\n",
202+
" error_count = 0 # Reset on success\n",
203+
" print(f\"Job status: {status}\")\n",
204+
" if status == \"FAILED\":\n",
205+
" print(\"Job failed!\")\n",
206+
" break\n",
207+
" except (RuntimeError, requests.exceptions.ConnectionError, ConnectionError) as e:\n",
208+
" error_count += 1\n",
209+
" print(f\"Transient error ({error_count}/{max_errors}) checking job status: {type(e).__name__}: {e}\")\n",
210+
" if error_count >= max_errors:\n",
211+
" print(f\"Too many consecutive errors, giving up\")\n",
212+
" break\n",
213+
" continue\n",
214+
"\n",
215+
"if status == \"SUCCEEDED\":\n",
194216
" print(\"Job completed Successfully !\")\n",
195217
"else:\n",
196-
" print(\"Job failed !\")\n",
218+
" print(f\"Job did not succeed. Final status: {status}\")\n",
197219
"\n",
198-
"sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\""
220+
"sleep(10) # Brief pause before cleanup"
199221
]
200222
},
201223
{

tests/odh/resources/mnist_ray_mini.ipynb

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@
125125
"source": [
126126
"# Bring up the cluster\n",
127127
"cluster.up()\n",
128-
"# Wait until status is updated\n",
129-
"cluster.wait_ready()"
128+
"# Wait until status is updated (skip dashboard check as route naming changed in kuberay operator)\n",
129+
"cluster.wait_ready(dashboard_check=False)"
130130
]
131131
},
132132
{
@@ -160,11 +160,15 @@
160160
"metadata": {},
161161
"outputs": [],
162162
"source": [
163-
"ray_dashboard = cluster.cluster_dashboard_uri()\n",
163+
"# Access dashboard directly via internal service (notebook runs inside the cluster)\n",
164+
"# The service mnisttest-head-svc exposes the Ray dashboard on port 8265\n",
165+
"ray_dashboard = f\"http://mnisttest-head-svc.{namespace}.svc.cluster.local:8265\"\n",
166+
"print(f\"Ray dashboard URL: {ray_dashboard}\")\n",
167+
"\n",
164168
"header = {\"Authorization\": f\"Bearer {kubernetes_user_bearer_token}\"}\n",
165-
"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n",
169+
"ray_client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\n",
166170
"\n",
167-
"submission_id = client.submit_job(\n",
171+
"submission_id = ray_client.submit_job(\n",
168172
" entrypoint=\"python mnist.py\",\n",
169173
" runtime_env={\n",
170174
" \"env_vars\": {\n",
@@ -186,23 +190,34 @@
186190
"metadata": {},
187191
"outputs": [],
188192
"source": [
189-
"finished = False\n",
190-
"while not finished:\n",
193+
"import requests\n",
194+
"status = None\n",
195+
"error_count = 0\n",
196+
"max_errors = 60 # Max consecutive errors before giving up\n",
197+
"\n",
198+
"while status != \"SUCCEEDED\":\n",
191199
" sleep(1)\n",
192200
" try:\n",
193-
" status = client.get_job_status(submission_id)\n",
194-
" except RuntimeError:\n",
195-
" # At times, the ray dashboard displays a \"RuntimeError: Request failed with status code 504: <html><body><h1>504 Gateway Time-out</h1>\" \n",
196-
" # message, leading to a crashloopback error in the notebook pod. However, the ray job continues running and disregards the error. \n",
197-
" # Consider eliminating the try-except block when using the updated version of Ray 2.38.\n",
198-
" pass\n",
199-
" finished = (status == \"SUCCEEDED\")\n",
200-
"if finished:\n",
201+
" status = ray_client.get_job_status(submission_id)\n",
202+
" error_count = 0 # Reset on success\n",
203+
" print(f\"Job status: {status}\")\n",
204+
" if status == \"FAILED\":\n",
205+
" print(\"Job failed!\")\n",
206+
" break\n",
207+
" except (RuntimeError, requests.exceptions.ConnectionError, ConnectionError) as e:\n",
208+
" error_count += 1\n",
209+
" print(f\"Transient error ({error_count}/{max_errors}) checking job status: {type(e).__name__}: {e}\")\n",
210+
" if error_count >= max_errors:\n",
211+
" print(f\"Too many consecutive errors, giving up\")\n",
212+
" break\n",
213+
" continue\n",
214+
"\n",
215+
"if status == \"SUCCEEDED\":\n",
201216
" print(\"Job completed Successfully !\")\n",
202217
"else:\n",
203-
" print(\"Job failed !\")\n",
218+
" print(f\"Job did not succeed. Final status: {status}\")\n",
204219
"\n",
205-
"sleep(10) # For avoiding race condition(raycluster gets deleted as soon as notebook execution completes after rayjob submission gets succeeded) and to assert rayjob success status before raycluster gets deleted during test excution\""
220+
"sleep(10) # Brief pause before cleanup"
206221
]
207222
},
208223
{

0 commit comments

Comments
 (0)