Skip to content

Commit 322d62c

Browse files
committed
fix ray pipeline and lower casing pipeline names in upgrade tests
Signed-off-by: Nelesh Singla <117123879+nsingla@users.noreply.github.com>
1 parent e86d034 commit 322d62c

File tree

4 files changed

+257
-68
lines changed

4 files changed

+257
-68
lines changed

backend/test/v2/api/upgrade_api_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,14 @@ var _ = Describe("Upgrade Test Preparation >", Label(constants.UpgradePreparatio
5151
preparePipelines()
5252
})
5353
It("Create pipeline run", func() {
54-
preparePipelineRun(helloWorldPipelineFileName, "Pipeline 1", "Experiment 1", "Run 1")
54+
preparePipelineRun(helloWorldPipelineFileName, "pipeline-1", "Experiment-1", "Run 1")
5555
})
5656
It("Create pipeline run and wait it to go to RUNNING state", func() {
57-
run := preparePipelineRun(longRunningPipelineFileName, "Pipeline 3", "Experiment 3", "Run 3")
57+
run := preparePipelineRun(longRunningPipelineFileName, "pipeline-3", "Experiment-3", "Run 3")
5858
testutil.WaitForRunToBeInState(runClient, &run.RunID, []run_model.V2beta1RuntimeState{run_model.V2beta1RuntimeStateRUNNING}, nil)
5959
})
6060
It("Create scheduled pipeline run", func() {
61-
prepareScheduledPipelineRun(helloWorldPipelineFileName, "Pipeline 4", "Experiment 4", "Scheduled Run 1")
61+
prepareScheduledPipelineRun(helloWorldPipelineFileName, "pipeline-4", "Experiment-4", "Scheduled Run 1")
6262
})
6363

6464
})
@@ -73,21 +73,21 @@ var _ = Describe("Upgrade Test Verification >", Label(constants.UpgradeVerificat
7373
verifyPipelines()
7474
})
7575
It("Verify pipeline run", func() {
76-
verifyPipelineRun(helloWorldPipelineFileName, "Pipeline 1", "Experiment 1", "Run 1")
76+
verifyPipelineRun(helloWorldPipelineFileName, "pipeline-1", "Experiment-1", "Run 1")
7777
})
7878
It("Verify you can create a pipeline run after upgrade", func() {
79-
preparePipelineRun(helloWorldPipelineFileName, "Pipeline 2", "Experiment 2", "Run 2")
80-
verifyPipelineRun(longRunningPipelineFileName, "Pipeline 2", "Experiment 2", "Run 2")
79+
preparePipelineRun(helloWorldPipelineFileName, "pipeline-2", "Experiment-2", "Run 2")
80+
verifyPipelineRun(longRunningPipelineFileName, "pipeline-2", "Experiment-2", "Run 2")
8181
})
8282
It("Verify pipeline run that was in RUNNING state, still exists after the upgrade", func() {
83-
verifyPipelineRun(longRunningPipelineFileName, "Pipeline 3", "Experiment 3", "Run 3")
83+
verifyPipelineRun(longRunningPipelineFileName, "pipeline-3", "Experiment-3", "Run 3")
8484
})
8585
It("Verify scheduled pipeline run", func() {
86-
verifyScheduledPipelineRun(helloWorldPipelineFileName, "Pipeline 4", "Experiment 4", "Scheduled Run 1")
86+
verifyScheduledPipelineRun(helloWorldPipelineFileName, "pipeline-4", "Experiment-4", "Scheduled Run 1")
8787
})
8888
It("Verify you can create scheduled pipeline run after upgrade", func() {
89-
prepareScheduledPipelineRun(helloWorldPipelineFileName, "Pipeline 5", "Experiment 5", "Scheduled Run 2")
90-
verifyScheduledPipelineRun(helloWorldPipelineFileName, "Pipeline 5", "Experiment 5", "Scheduled Run 2")
89+
prepareScheduledPipelineRun(helloWorldPipelineFileName, "pipeline-5", "Experiment-5", "Scheduled Run 2")
90+
verifyScheduledPipelineRun(helloWorldPipelineFileName, "pipeline-5", "Experiment-5", "Scheduled Run 2")
9191
})
9292
})
9393
})
@@ -214,7 +214,7 @@ func getPipelineAndExperimentForRun(pipelineToUpload string, pipelineName string
214214
}
215215

216216
if uploadedPipeline == nil {
217-
logger.Log("Uploading pipeline from file %s", pipelineFilePath)
217+
logger.Log("Uploading pipeline %s from file %s", pipelineName, pipelineFilePath)
218218
uploadedPipeline, err = pipelineUploadClient.UploadFile(pipelineFilePath, pipelineUploadParams)
219219
Expect(err).To(BeNil(), "Failed to upload pipeline: %s", pipelineFilePath)
220220
logger.Log("Uploaded pipeline from file %s", pipelineFilePath)

test_data/compiled-workflows/ray_integration_compiled.yaml

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ metadata:
66
spec:
77
arguments:
88
parameters:
9-
- name: components-eb7eb51dbee0acc8a8a931f51717a89b74a2dd63c51005e0780004bc92615b8d
9+
- name: components-9ebde03023b0a76c9ab1ee72a6db1c1e510ea58ae1d58274de6ffdaf1c861e07
1010
value: '{"executorLabel":"exec-ray-fn","outputDefinitions":{"parameters":{"Output":{"parameterType":"NUMBER_INTEGER"}}}}'
11-
- name: implementations-eb7eb51dbee0acc8a8a931f51717a89b74a2dd63c51005e0780004bc92615b8d
11+
- name: implementations-9ebde03023b0a76c9ab1ee72a6db1c1e510ea58ae1d58274de6ffdaf1c861e07
1212
value: '{"args":["--executor_input","{{$}}","--function_to_execute","ray_fn"],"command":["sh","-c","\nif
1313
! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3
1414
-m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
1515
python3 -m pip install --quiet --no-warn-script-location ''codeflare-sdk==0.32.2'' \u0026\u0026 python3
16-
-m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps''
16+
-m pip install --quiet --no-warn-script-location ''kfp==2.15.2'' ''--no-deps''
1717
''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026
1818
\"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\"
1919
\u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3
@@ -22,18 +22,60 @@ spec:
2222
ray_fn() -\u003e int:\n import ray # noqa: PLC0415\n from codeflare_sdk
2323
import generate_cert # noqa: PLC0415\n from codeflare_sdk.ray.cluster
2424
import Cluster, ClusterConfiguration # noqa: PLC0415\n\n cluster = Cluster(\n ClusterConfiguration(\n name=\"raytest\",\n num_workers=1,\n head_cpu_requests=1,\n head_cpu_limits=1,\n head_memory_requests=4,\n head_memory_limits=4,\n worker_cpu_requests=1,\n worker_cpu_limits=1,\n worker_memory_requests=1,\n worker_memory_limits=2,\n #
25-
Corresponds to quay.io/modh/ray:2.47.1-py311-cu121\n image=\"quay.io/modh/ray@sha256:6d076aeb38ab3c34a6a2ef0f58dc667089aa15826fa08a73273c629333e12f1e\",\n verify_tls=False\n )\n )\n\n #
26-
always clean the resources\n cluster.down()\n print(cluster.status())\n cluster.up()\n cluster.wait_ready()\n print(cluster.status())\n print(cluster.details())\n\n ray_dashboard_uri
27-
= cluster.cluster_dashboard_uri()\n ray_cluster_uri = cluster.cluster_uri()\n print(ray_dashboard_uri)\n print(ray_cluster_uri)\n\n #
28-
before proceeding make sure the cluster exists and the uri is not empty\n assert
29-
ray_cluster_uri, \"Ray cluster needs to be started and set before proceeding\"\n\n #
30-
reset the ray context in case there''s already one.\n ray.shutdown()\n #
31-
establish connection to ray cluster\n generate_cert.generate_tls_cert(cluster.config.name,
32-
cluster.config.namespace)\n generate_cert.export_env(cluster.config.name,
33-
cluster.config.namespace)\n ray.init(address=cluster.cluster_uri(), logging_level=\"DEBUG\")\n print(\"Ray
34-
cluster is up and running: \", ray.is_initialized())\n\n @ray.remote\n def
35-
train_fn():\n return 100\n\n result = ray.get(train_fn.remote())\n assert
36-
100 == result\n ray.shutdown()\n cluster.down()\n return result\n\n"],"image":"registry.access.redhat.com/ubi9/python-311:latest"}'
25+
Updated to Ray 2.53.0 compatible image\n image=\"quay.io/modh/ray@sha256:6d076aeb38ab3c34a6a2ef0f58dc667089aa15826fa08a73273c629333e12f1e\",\n verify_tls=False\n )\n )\n\n #
26+
Clean up any existing cluster with the same name first\n print(\"Cleaning
27+
up any existing cluster resources...\")\n try:\n cluster.down()\n print(\"Cleaned
28+
up existing cluster\")\n except Exception as e:\n print(f\"No existing
29+
cluster to clean up (expected): {e}\")\n\n # Create and start the cluster
30+
using current best practice\n print(\"Creating Ray cluster...\")\n cluster.apply()\n\n #
31+
Custom wait logic since wait_ready() can hang\n print(\"Waiting for Ray
32+
cluster to be ready...\")\n import time\n max_wait_time = 300 # 5 minutes
33+
timeout\n wait_interval = 10 # Check every 10 seconds\n elapsed_time
34+
= 0\n\n cluster_ready = False\n while elapsed_time \u003c max_wait_time:\n try:\n print(f\"Checking
35+
cluster readiness... ({elapsed_time}s elapsed)\")\n\n # Try to
36+
get cluster URIs as a readiness check\n dashboard_uri = cluster.cluster_dashboard_uri()\n cluster_uri
37+
= cluster.cluster_uri()\n\n if dashboard_uri and cluster_uri:\n print(f\"Cluster
38+
is ready! Dashboard: {dashboard_uri}\")\n print(f\"Cluster
39+
URI: {cluster_uri}\")\n cluster_ready = True\n break\n else:\n print(\"Cluster
40+
URIs not ready yet, waiting...\")\n\n except Exception as e:\n print(f\"Cluster
41+
not ready yet: {e}\")\n\n time.sleep(wait_interval)\n elapsed_time
42+
+= wait_interval\n\n if not cluster_ready:\n # Try to get more details
43+
for debugging\n try:\n print(\"Cluster details for debugging:\")\n print(cluster.details())\n except
44+
Exception as e:\n print(f\"Could not get cluster details: {e}\")\n raise
45+
RuntimeError(f\"Ray cluster failed to become ready within {max_wait_time}
46+
seconds\")\n\n print(\"Cluster is fully ready!\")\n\n # Get cluster
47+
connection info\n ray_dashboard_uri = cluster.cluster_dashboard_uri()\n ray_cluster_uri
48+
= cluster.cluster_uri()\n print(f\"Ray dashboard URI: {ray_dashboard_uri}\")\n print(f\"Ray
49+
cluster URI: {ray_cluster_uri}\")\n\n # Verify cluster URI is available\n assert
50+
ray_cluster_uri, \"Ray cluster URI is empty - cluster may not be ready\"\n\n #
51+
Set up TLS and connect to Ray cluster\n print(\"Attempting to connect to
52+
Ray cluster...\")\n\n # Try TLS setup first, fall back to direct connection
53+
if it fails\n tls_setup_successful = False\n try:\n print(\"Setting
54+
up TLS certificates...\")\n generate_cert.generate_tls_cert(cluster.config.name,
55+
cluster.config.namespace)\n generate_cert.export_env(cluster.config.name,
56+
cluster.config.namespace)\n print(\"TLS certificates configured successfully\")\n tls_setup_successful
57+
= True\n except Exception as e:\n print(f\"TLS setup failed (will
58+
try direct connection): {e}\")\n print(\"Since cluster was configured
59+
with verify_tls=False, attempting direct connection...\")\n\n # Connect
60+
to the Ray cluster\n try:\n ray.init(address=ray_cluster_uri, logging_level=\"DEBUG\")\n print(f\"Ray
61+
cluster connected: {ray.is_initialized()}\")\n\n if not ray.is_initialized():\n raise
62+
RuntimeError(\"Ray failed to initialize\")\n\n except Exception as e:\n print(f\"Ray
63+
connection failed: {e}\")\n if tls_setup_successful:\n print(\"Connection
64+
failed even with TLS setup\")\n else:\n print(\"Trying alternative
65+
connection approaches...\")\n\n # Alternative: Try connecting without
66+
explicit address (auto-discovery)\n try:\n ray.shutdown() #
67+
Clean any previous attempts\n print(\"Attempting Ray auto-discovery
68+
connection...\")\n ray.init(logging_level=\"DEBUG\")\n print(f\"Ray
69+
auto-discovery connection: {ray.is_initialized()}\")\n except Exception
70+
as e2:\n print(f\"Auto-discovery also failed: {e2}\")\n raise
71+
RuntimeError(f\"All Ray connection attempts failed. Original error: {e}\")\n\n #
72+
Verify cluster resources\n print(f\"Ray cluster resources: {ray.cluster_resources()}\")\n\n #
73+
Define and run remote function\n @ray.remote\n def train_fn():\n return
74+
100\n\n print(\"Executing remote Ray function...\")\n result = ray.get(train_fn.remote())\n print(f\"Ray
75+
function result: {result}\")\n assert result == 100, f\"Expected 100, got
76+
{result}\"\n\n # Clean shutdown\n print(\"Shutting down Ray connection...\")\n ray.shutdown()\n\n print(\"Cleaning
77+
up Ray cluster...\")\n cluster.down()\n print(\"Ray cluster cleanup
78+
completed\")\n\n return result\n\n"],"image":"registry.access.redhat.com/ubi9/python-311:latest"}'
3779
- name: components-root
3880
value: '{"dag":{"tasks":{"ray-fn":{"cachingOptions":{},"componentRef":{"name":"comp-ray-fn"},"taskInfo":{"name":"ray-fn"}}}}}'
3981
entrypoint: entrypoint
@@ -233,11 +275,11 @@ spec:
233275
- arguments:
234276
parameters:
235277
- name: component
236-
value: '{{workflow.parameters.components-eb7eb51dbee0acc8a8a931f51717a89b74a2dd63c51005e0780004bc92615b8d}}'
278+
value: '{{workflow.parameters.components-9ebde03023b0a76c9ab1ee72a6db1c1e510ea58ae1d58274de6ffdaf1c861e07}}'
237279
- name: task
238280
value: '{"cachingOptions":{},"componentRef":{"name":"comp-ray-fn"},"taskInfo":{"name":"ray-fn"}}'
239281
- name: container
240-
value: '{{workflow.parameters.implementations-eb7eb51dbee0acc8a8a931f51717a89b74a2dd63c51005e0780004bc92615b8d}}'
282+
value: '{{workflow.parameters.implementations-9ebde03023b0a76c9ab1ee72a6db1c1e510ea58ae1d58274de6ffdaf1c861e07}}'
241283
- name: task-name
242284
value: ray-fn
243285
- name: parent-dag-id

test_data/sdk_compiled_pipelines/valid/integration/ray_integration.py

Lines changed: 117 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,141 @@ def ray_fn() -> int:
2020
worker_cpu_limits=1,
2121
worker_memory_requests=1,
2222
worker_memory_limits=2,
23-
# Corresponds to quay.io/modh/ray:2.47.1-py311-cu121
2423
image="quay.io/modh/ray@sha256:6d076aeb38ab3c34a6a2ef0f58dc667089aa15826fa08a73273c629333e12f1e",
2524
verify_tls=False
2625
)
2726
)
2827

29-
# always clean the resources
30-
cluster.down()
31-
print(cluster.status())
32-
cluster.up()
33-
cluster.wait_ready()
34-
print(cluster.status())
35-
print(cluster.details())
28+
# Clean up any existing cluster with the same name first
29+
print("Cleaning up any existing cluster resources...")
30+
try:
31+
cluster.down()
32+
print("Cleaned up existing cluster")
33+
except Exception as e:
34+
print(f"No existing cluster to clean up (expected): {e}")
35+
36+
# Create and start the cluster using current best practice
37+
print("Creating Ray cluster...")
38+
cluster.apply()
39+
40+
# Custom wait logic since wait_ready() can hang
41+
print("Waiting for Ray cluster to be ready...")
42+
import time
43+
max_wait_time = 300 # 5 minutes timeout
44+
wait_interval = 10 # Check every 10 seconds
45+
elapsed_time = 0
46+
47+
cluster_ready = False
48+
while elapsed_time < max_wait_time:
49+
try:
50+
print(f"Checking cluster readiness... ({elapsed_time}s elapsed)")
51+
52+
# Try to get cluster URIs as a readiness check
53+
dashboard_uri = cluster.cluster_dashboard_uri()
54+
cluster_uri = cluster.cluster_uri()
55+
56+
if dashboard_uri and cluster_uri:
57+
print(f"Cluster is ready! Dashboard: {dashboard_uri}")
58+
print(f"Cluster URI: {cluster_uri}")
59+
cluster_ready = True
60+
break
61+
else:
62+
print("Cluster URIs not ready yet, waiting...")
63+
64+
except (ConnectionError, TimeoutError, RuntimeError, AttributeError) as e:
65+
print(f"Cluster not ready yet: {e}")
66+
except Exception as e:
67+
print(f"Unexpected error checking cluster readiness: {e}")
68+
69+
time.sleep(wait_interval)
70+
elapsed_time += wait_interval
71+
72+
if not cluster_ready:
73+
print("Cluster details for debugging:")
74+
print(cluster.details())
75+
raise RuntimeError(f"Ray cluster failed to become ready within {max_wait_time} seconds")
76+
77+
print("Cluster is fully ready!")
3678

79+
# Get cluster connection info
3780
ray_dashboard_uri = cluster.cluster_dashboard_uri()
3881
ray_cluster_uri = cluster.cluster_uri()
39-
print(ray_dashboard_uri)
40-
print(ray_cluster_uri)
82+
print(f"Ray dashboard URI: {ray_dashboard_uri}")
83+
print(f"Ray cluster URI: {ray_cluster_uri}")
4184

42-
# before proceeding make sure the cluster exists and the uri is not empty
43-
assert ray_cluster_uri, "Ray cluster needs to be started and set before proceeding"
85+
# Verify cluster URI is available
86+
assert ray_cluster_uri, "Ray cluster URI is empty - cluster may not be ready"
4487

45-
# reset the ray context in case there's already one.
46-
ray.shutdown()
47-
# establish connection to ray cluster
48-
generate_cert.generate_tls_cert(cluster.config.name, cluster.config.namespace)
49-
generate_cert.export_env(cluster.config.name, cluster.config.namespace)
50-
ray.init(address=cluster.cluster_uri(), logging_level="DEBUG")
51-
print("Ray cluster is up and running: ", ray.is_initialized())
88+
# Set up TLS and connect to Ray cluster
89+
print("Attempting to connect to Ray cluster...")
90+
91+
# Try TLS setup first, fall back to direct connection if it fails
92+
tls_setup_successful = False
93+
try:
94+
print("Setting up TLS certificates...")
95+
generate_cert.generate_tls_cert(cluster.config.name, cluster.config.namespace)
96+
generate_cert.export_env(cluster.config.name, cluster.config.namespace)
97+
print("TLS certificates configured successfully")
98+
tls_setup_successful = True
99+
except (OSError, PermissionError, ValueError, RuntimeError) as e:
100+
print(f"TLS setup failed (will try direct connection): {e}")
101+
print("Since cluster was configured with verify_tls=False, attempting direct connection...")
102+
except Exception as e:
103+
print(f"Unexpected TLS setup error (will try direct connection): {e}")
104+
print("Since cluster was configured with verify_tls=False, attempting direct connection...")
105+
106+
# Connect to the Ray cluster
107+
try:
108+
ray.init(address=ray_cluster_uri, logging_level="DEBUG")
109+
print(f"Ray cluster connected: {ray.is_initialized()}")
110+
111+
if not ray.is_initialized():
112+
raise RuntimeError("Ray failed to initialize")
113+
114+
except (ConnectionError, TimeoutError, RuntimeError, OSError) as e:
115+
print(f"Ray connection failed: {e}")
116+
if tls_setup_successful:
117+
print("Connection failed even with TLS setup")
118+
else:
119+
print("Trying alternative connection approaches...")
52120

121+
# Alternative: Try connecting without explicit address (auto-discovery)
122+
try:
123+
ray.shutdown() # Clean any previous attempts
124+
print("Attempting Ray auto-discovery connection...")
125+
ray.init(logging_level="DEBUG")
126+
print(f"Ray auto-discovery connection: {ray.is_initialized()}")
127+
except (ConnectionError, TimeoutError, RuntimeError, OSError) as e2:
128+
print(f"Auto-discovery also failed: {e2}")
129+
raise RuntimeError(f"All Ray connection attempts failed. Original error: {e}")
130+
except Exception as e2:
131+
print(f"Unexpected auto-discovery error: {e2}")
132+
raise RuntimeError(f"All Ray connection attempts failed. Original error: {e}")
133+
except Exception as e:
134+
print(f"Unexpected Ray connection error: {e}")
135+
raise RuntimeError(f"Ray connection failed with unexpected error: {e}")
136+
137+
# Verify cluster resources
138+
print(f"Ray cluster resources: {ray.cluster_resources()}")
139+
140+
# Define and run remote function
53141
@ray.remote
54142
def train_fn():
55143
return 100
56144

145+
print("Executing remote Ray function...")
57146
result = ray.get(train_fn.remote())
58-
assert 100 == result
147+
print(f"Ray function result: {result}")
148+
assert result == 100, f"Expected 100, got {result}"
149+
150+
# Clean shutdown
151+
print("Shutting down Ray connection...")
59152
ray.shutdown()
153+
154+
print("Cleaning up Ray cluster...")
60155
cluster.down()
156+
print("Ray cluster cleanup completed")
157+
61158
return result
62159

63160

0 commit comments

Comments
 (0)