Skip to content

Commit 7639b9d

Browse files
Future-Outlierkevin85421andrewsykim
authored
[RayJob] Sidecar Mode (#3971)
Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: kaihsun <[email protected]> Co-authored-by: Andrew Sy Kim <[email protected]>
1 parent 9bd31cf commit 7639b9d

File tree

12 files changed

+664
-88
lines changed

12 files changed

+664
-88
lines changed

docs/reference/api.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ _Appears in:_
246246
| `entrypoint` _string_ | Entrypoint represents the command to start execution. | | |
247247
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration<br />provided as a multi-line YAML string. | | |
248248
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
249-
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
249+
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.<br />In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job. | K8sJobMode | |
250250
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the<br />entrypoint command. | | |
251251
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
252252
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ const (
8282
K8sJobMode JobSubmissionMode = "K8sJobMode" // Submit job via Kubernetes Job
8383
HTTPMode JobSubmissionMode = "HTTPMode" // Submit job via HTTP request
8484
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
85+
SidecarMode JobSubmissionMode = "SidecarMode" // Submit job via a sidecar container in the Ray head Pod
8586
)
8687

8788
type DeletionPolicyType string
@@ -174,6 +175,7 @@ type RayJobSpec struct {
174175
// In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
175176
// In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
176177
// In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
178+
// In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.
177179
// +kubebuilder:default:=K8sJobMode
178180
// +optional
179181
SubmissionMode JobSubmissionMode `json:"submissionMode,omitempty"`
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
apiVersion: ray.io/v1
2+
kind: RayJob
3+
metadata:
4+
name: rayjob-sidecar-mode
5+
spec:
6+
# In SidecarMode, the KubeRay operator injects a container into the Ray head Pod to submit the Ray job and tail logs.
7+
# This will avoid inter-Pod communication, which may cause network issues. For example, some users face WebSocket hangs.
8+
# For more details, see https://github.com/ray-project/kuberay/issues/3928#issuecomment-3187164736.
9+
submissionMode: "SidecarMode"
10+
entrypoint: python /home/ray/samples/sample_code.py
11+
runtimeEnvYAML: |
12+
pip:
13+
- requests==2.26.0
14+
- pendulum==2.1.2
15+
env_vars:
16+
counter_name: "test_counter"
17+
18+
rayClusterSpec:
19+
rayVersion: '2.46.0'
20+
headGroupSpec:
21+
rayStartParams: {}
22+
template:
23+
spec:
24+
containers:
25+
- name: ray-head
26+
image: rayproject/ray:2.46.0
27+
ports:
28+
- containerPort: 6379
29+
name: gcs-server
30+
- containerPort: 8265
31+
name: dashboard
32+
- containerPort: 10001
33+
name: client
34+
resources:
35+
limits:
36+
cpu: "1"
37+
requests:
38+
cpu: "200m"
39+
volumeMounts:
40+
- mountPath: /home/ray/samples
41+
name: code-sample
42+
volumes:
43+
- name: code-sample
44+
configMap:
45+
name: ray-job-code-sample
46+
items:
47+
- key: sample_code.py
48+
path: sample_code.py
49+
workerGroupSpecs:
50+
- replicas: 1
51+
minReplicas: 1
52+
maxReplicas: 5
53+
groupName: small-group
54+
rayStartParams: {}
55+
template:
56+
spec:
57+
containers:
58+
- name: ray-worker
59+
image: rayproject/ray:2.46.0
60+
resources:
61+
limits:
62+
cpu: "1"
63+
requests:
64+
cpu: "200m"
65+
66+
---
67+
apiVersion: v1
68+
kind: ConfigMap
69+
metadata:
70+
name: ray-job-code-sample
71+
data:
72+
sample_code.py: |
73+
import ray
74+
import os
75+
import requests
76+
77+
ray.init()
78+
79+
@ray.remote
80+
class Counter:
81+
def __init__(self):
82+
# Used to verify runtimeEnv
83+
self.name = os.getenv("counter_name")
84+
assert self.name == "test_counter"
85+
self.counter = 0
86+
87+
def inc(self):
88+
self.counter += 1
89+
90+
def get_counter(self):
91+
return "{} got {}".format(self.name, self.counter)
92+
93+
counter = Counter.remote()
94+
95+
for _ in range(5):
96+
ray.get(counter.inc.remote())
97+
print(ray.get(counter.get_counter.remote()))
98+
99+
# Verify that the correct runtime env was used for the job.
100+
assert requests.__version__ == "2.26.0"

ray-operator/controllers/ray/common/job.go

Lines changed: 90 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -54,96 +54,150 @@ func GetMetadataJson(metadata map[string]string, rayVersion string) (string, err
5454
return pkgutils.ConvertByteSliceToString(metadataBytes), nil
5555
}
5656

57-
// GetK8sJobCommand builds the K8s job command for the Ray job.
58-
func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
59-
address := rayJobInstance.Status.DashboardURL
57+
// BuildJobSubmitCommand builds the `ray job submit` command based on submission mode.
58+
func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) ([]string, error) {
59+
var address string
60+
port := utils.DefaultDashboardPort
61+
62+
switch submissionMode {
63+
case rayv1.SidecarMode:
64+
// The sidecar submitter shares the same network namespace as the Ray dashboard,
65+
// so it uses 127.0.0.1 to connect to the Ray dashboard.
66+
rayHeadContainer := rayJobInstance.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex]
67+
port = utils.FindContainerPort(&rayHeadContainer, utils.DashboardPortName, utils.DefaultDashboardPort)
68+
address = "http://127.0.0.1:" + strconv.Itoa(port)
69+
case rayv1.K8sJobMode:
70+
// Submitter is a separate K8s Job; use cluster dashboard address.
71+
address = rayJobInstance.Status.DashboardURL
72+
if !strings.HasPrefix(address, "http://") {
73+
address = "http://" + address
74+
}
75+
default:
76+
return nil, fmt.Errorf("unsupported submission mode for job submit command: %s", submissionMode)
77+
}
78+
79+
var cmd []string
6080
metadata := rayJobInstance.Spec.Metadata
6181
jobId := rayJobInstance.Status.JobId
6282
entrypoint := strings.TrimSpace(rayJobInstance.Spec.Entrypoint)
6383
entrypointNumCpus := rayJobInstance.Spec.EntrypointNumCpus
6484
entrypointNumGpus := rayJobInstance.Spec.EntrypointNumGpus
6585
entrypointResources := rayJobInstance.Spec.EntrypointResources
6686

67-
// add http:// if needed
68-
if !strings.HasPrefix(address, "http://") {
69-
address = "http://" + address
70-
}
71-
87+
// In K8sJobMode, we need to avoid submitting the job twice, since the job submitter might retry.
7288
// `ray job submit` alone doesn't handle duplicated submission gracefully. See https://github.com/ray-project/kuberay/issues/2154.
7389
// In order to deal with that, we use `ray job status` first to check if the jobId has been submitted.
7490
// If the jobId has been submitted, we use `ray job logs` to follow the logs.
7591
// Otherwise, we submit the job with `ray job submit --no-wait` + `ray job logs`. The full shell command looks like this:
7692
// if ! ray job status --address http://$RAY_ADDRESS $RAY_JOB_SUBMISSION_ID >/dev/null 2>&1 ;
7793
// then ray job submit --address http://$RAY_ADDRESS --submission-id $RAY_JOB_SUBMISSION_ID --no-wait -- ... ;
7894
// fi ; ray job logs --address http://$RAY_ADDRESS --follow $RAY_JOB_SUBMISSION_ID
95+
// In Sidecar mode, the sidecar container's restart policy is set to Never, so duplicated submission won't happen.
7996
jobStatusCommand := []string{"ray", "job", "status", "--address", address, jobId, ">/dev/null", "2>&1"}
97+
jobSubmitCommand := []string{"ray", "job", "submit", "--address", address}
8098
jobFollowCommand := []string{"ray", "job", "logs", "--address", address, "--follow", jobId}
81-
jobSubmitCommand := []string{"ray", "job", "submit", "--address", address, "--no-wait"}
82-
k8sJobCommand := append([]string{"if", "!"}, jobStatusCommand...)
83-
k8sJobCommand = append(k8sJobCommand, ";", "then")
84-
k8sJobCommand = append(k8sJobCommand, jobSubmitCommand...)
99+
100+
if submissionMode == rayv1.SidecarMode {
101+
// Wait until Ray Dashboard GCS is healthy before proceeding.
102+
// Use the same Ray Dashboard GCS health check command as the readiness probe
103+
rayDashboardGCSHealthCommand := fmt.Sprintf(
104+
utils.BaseWgetHealthCommand,
105+
utils.DefaultReadinessProbeFailureThreshold,
106+
port,
107+
utils.RayDashboardGCSHealthPath,
108+
)
109+
110+
waitLoop := []string{
111+
"until", rayDashboardGCSHealthCommand, ">/dev/null", "2>&1", ";",
112+
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
113+
}
114+
cmd = append(cmd, waitLoop...)
115+
}
116+
117+
// In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,
118+
// so we won't have to check if the job has been submitted.
119+
if submissionMode == rayv1.K8sJobMode {
120+
// Only check job status in K8s mode to handle duplicated submission gracefully
121+
cmd = append(cmd, "if", "!")
122+
cmd = append(cmd, jobStatusCommand...)
123+
cmd = append(cmd, ";", "then")
124+
}
125+
126+
cmd = append(cmd, jobSubmitCommand...)
127+
128+
if submissionMode == rayv1.K8sJobMode {
129+
cmd = append(cmd, "--no-wait")
130+
}
85131

86132
runtimeEnvJson, err := getRuntimeEnvJson(rayJobInstance)
87133
if err != nil {
88134
return nil, err
89135
}
90136
if len(runtimeEnvJson) > 0 {
91-
k8sJobCommand = append(k8sJobCommand, "--runtime-env-json", strconv.Quote(runtimeEnvJson))
137+
cmd = append(cmd, "--runtime-env-json", strconv.Quote(runtimeEnvJson))
92138
}
93139

94140
if len(metadata) > 0 {
95141
metadataJson, err := GetMetadataJson(metadata, rayJobInstance.Spec.RayClusterSpec.RayVersion)
96142
if err != nil {
97143
return nil, err
98144
}
99-
k8sJobCommand = append(k8sJobCommand, "--metadata-json", strconv.Quote(metadataJson))
145+
cmd = append(cmd, "--metadata-json", strconv.Quote(metadataJson))
100146
}
101147

102148
if len(jobId) > 0 {
103-
k8sJobCommand = append(k8sJobCommand, "--submission-id", jobId)
149+
cmd = append(cmd, "--submission-id", jobId)
104150
}
105151

106152
if entrypointNumCpus > 0 {
107-
k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-cpus", fmt.Sprintf("%f", entrypointNumCpus))
153+
cmd = append(cmd, "--entrypoint-num-cpus", fmt.Sprintf("%f", entrypointNumCpus))
108154
}
109155

110156
if entrypointNumGpus > 0 {
111-
k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-gpus", fmt.Sprintf("%f", entrypointNumGpus))
157+
cmd = append(cmd, "--entrypoint-num-gpus", fmt.Sprintf("%f", entrypointNumGpus))
112158
}
113159

114160
if len(entrypointResources) > 0 {
115-
k8sJobCommand = append(k8sJobCommand, "--entrypoint-resources", strconv.Quote(entrypointResources))
161+
cmd = append(cmd, "--entrypoint-resources", strconv.Quote(entrypointResources))
116162
}
117163

118164
// "--" is used to separate the entrypoint from the Ray Job CLI command and its arguments.
119-
k8sJobCommand = append(k8sJobCommand, "--", entrypoint, ";", "fi", ";")
120-
k8sJobCommand = append(k8sJobCommand, jobFollowCommand...)
165+
cmd = append(cmd, "--", entrypoint, ";")
166+
if submissionMode == rayv1.K8sJobMode {
167+
cmd = append(cmd, "fi", ";")
168+
cmd = append(cmd, jobFollowCommand...)
169+
}
121170

122-
return k8sJobCommand, nil
171+
return cmd, nil
123172
}
124173

125174
// GetDefaultSubmitterTemplate creates a default submitter template for the Ray job.
126175
func GetDefaultSubmitterTemplate(rayClusterInstance *rayv1.RayCluster) corev1.PodTemplateSpec {
127176
return corev1.PodTemplateSpec{
128177
Spec: corev1.PodSpec{
129178
Containers: []corev1.Container{
130-
{
131-
Name: "ray-job-submitter",
132-
// Use the image of the Ray head to be defensive against version mismatch issues
133-
Image: rayClusterInstance.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image,
134-
Resources: corev1.ResourceRequirements{
135-
Limits: corev1.ResourceList{
136-
corev1.ResourceCPU: resource.MustParse("1"),
137-
corev1.ResourceMemory: resource.MustParse("1Gi"),
138-
},
139-
Requests: corev1.ResourceList{
140-
corev1.ResourceCPU: resource.MustParse("500m"),
141-
corev1.ResourceMemory: resource.MustParse("200Mi"),
142-
},
143-
},
144-
},
179+
GetDefaultSubmitterContainer(rayClusterInstance),
145180
},
146181
RestartPolicy: corev1.RestartPolicyNever,
147182
},
148183
}
149184
}
185+
186+
// GetDefaultSubmitterContainer creates a default submitter container for the Ray job.
187+
func GetDefaultSubmitterContainer(rayClusterInstance *rayv1.RayCluster) corev1.Container {
188+
return corev1.Container{
189+
Name: utils.SubmitterContainerName,
190+
// Use the image of the Ray head to be defensive against version mismatch issues
191+
Image: rayClusterInstance.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image,
192+
Resources: corev1.ResourceRequirements{
193+
Limits: corev1.ResourceList{
194+
corev1.ResourceCPU: resource.MustParse("1"),
195+
corev1.ResourceMemory: resource.MustParse("1Gi"),
196+
},
197+
Requests: corev1.ResourceList{
198+
corev1.ResourceCPU: resource.MustParse("500m"),
199+
corev1.ResourceMemory: resource.MustParse("200Mi"),
200+
},
201+
},
202+
}
203+
}

ray-operator/controllers/ray/common/job_test.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"strconv"
67
"testing"
78

@@ -75,7 +76,7 @@ func TestGetMetadataJson(t *testing.T) {
7576
assert.JSONEq(t, expected, metadataJson)
7677
}
7778

78-
func TestGetK8sJobCommand(t *testing.T) {
79+
func TestBuildJobSubmitCommandWithK8sJobMode(t *testing.T) {
7980
testRayJob := rayJobTemplate()
8081
expected := []string{
8182
"if",
@@ -93,12 +94,51 @@ func TestGetK8sJobCommand(t *testing.T) {
9394
";", "fi", ";",
9495
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
9596
}
96-
command, err := GetK8sJobCommand(testRayJob)
97+
command, err := BuildJobSubmitCommand(testRayJob, rayv1.K8sJobMode)
9798
require.NoError(t, err)
9899
assert.Equal(t, expected, command)
99100
}
100101

101-
func TestGetK8sJobCommandWithYAML(t *testing.T) {
102+
func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
103+
testRayJob := rayJobTemplate()
104+
testRayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers = []corev1.Container{
105+
{
106+
Ports: []corev1.ContainerPort{
107+
{
108+
Name: utils.DashboardPortName,
109+
ContainerPort: utils.DefaultDashboardPort,
110+
},
111+
},
112+
},
113+
}
114+
115+
expected := []string{
116+
"until",
117+
fmt.Sprintf(
118+
utils.BaseWgetHealthCommand,
119+
utils.DefaultReadinessProbeFailureThreshold,
120+
utils.DefaultDashboardPort,
121+
utils.RayDashboardGCSHealthPath,
122+
),
123+
">/dev/null", "2>&1", ";",
124+
"do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at http://127.0.0.1:8265 ..."), ";", "sleep", "2", ";", "done", ";",
125+
"ray", "job", "submit", "--address", "http://127.0.0.1:8265",
126+
"--runtime-env-json", strconv.Quote(`{"test":"test"}`),
127+
"--metadata-json", strconv.Quote(`{"testKey":"testValue"}`),
128+
"--submission-id", "testJobId",
129+
"--entrypoint-num-cpus", "1.000000",
130+
"--entrypoint-num-gpus", "0.500000",
131+
"--entrypoint-resources", strconv.Quote(`{"Custom_1": 1, "Custom_2": 5.5}`),
132+
"--",
133+
"echo no quote 'single quote' \"double quote\"",
134+
";",
135+
}
136+
command, err := BuildJobSubmitCommand(testRayJob, rayv1.SidecarMode)
137+
require.NoError(t, err)
138+
assert.Equal(t, expected, command)
139+
}
140+
141+
func TestBuildJobSubmitCommandWithK8sJobModeAndYAML(t *testing.T) {
102142
rayJobWithYAML := &rayv1.RayJob{
103143
Spec: rayv1.RayJobSpec{
104144
RuntimeEnvYAML: `
@@ -131,7 +171,7 @@ pip: ["python-multipart==0.0.6"]
131171
";", "fi", ";",
132172
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
133173
}
134-
command, err := GetK8sJobCommand(rayJobWithYAML)
174+
command, err := BuildJobSubmitCommand(rayJobWithYAML, rayv1.K8sJobMode)
135175
require.NoError(t, err)
136176

137177
// Ensure the slices are the same length.

0 commit comments

Comments
 (0)