Skip to content

Commit 455eb3f

Browse files
authored
[kubectl-plugin] Generate submission_id in job_submit.go (#3693)
1 parent be22ecf commit 455eb3f

File tree

1 file changed

+29
-58
lines changed

1 file changed

+29
-58
lines changed

kubectl-plugin/pkg/cmd/job/job_submit.go

Lines changed: 29 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package job
33
import (
44
"bufio"
55
"context"
6+
"crypto/rand"
67
"fmt"
78
"log"
9+
"math/big"
810
"net/http"
911
"os"
1012
"os/exec"
@@ -27,21 +29,17 @@ import (
2729
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
2830
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/generation"
2931
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
30-
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
3132
rayscheme "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
3233
)
3334

3435
const (
3536
dashboardAddr = "http://localhost:8265"
3637
clusterTimeout = 120.0
3738
portforwardtimeout = 60.0
38-
jobIDTimeout = 60.0
39-
jobIDPollInterval = 1.0
4039
)
4140

4241
type SubmitJobOptions struct {
4342
cmdFactory cmdutil.Factory
44-
dashboardClient utils.RayDashboardClientInterface
4543
ioStreams *genericiooptions.IOStreams
4644
RayJob *rayv1.RayJob
4745
workerNodeSelectors map[string]string
@@ -471,10 +469,14 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
471469
}
472470
fmt.Printf("Portforwarding started on %s\n", dashboardAddr)
473471

474-
// Initialize dashboard client after port-forwarding is ready
475-
options.dashboardClient = &utils.RayDashboardClient{}
476-
if err := options.dashboardClient.InitClient(portforwardctx, strings.TrimPrefix(dashboardAddr, "http://"), nil); err != nil {
477-
return fmt.Errorf("failed to initialize dashboard client: %w", err)
472+
// If submission ID is not provided by the user, generate one.
473+
if options.submissionID == "" {
474+
generatedID, err := generateSubmissionID()
475+
if err != nil {
476+
return fmt.Errorf("failed to generate submission ID: %w", err)
477+
}
478+
options.submissionID = generatedID
479+
fmt.Printf("Generated submission ID for Ray job: %s\n", options.submissionID)
478480
}
479481

480482
// Submitting ray job to cluster
@@ -503,38 +505,7 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
503505
}
504506
}()
505507

506-
var rayJobID string
507-
if options.submissionID != "" {
508-
rayJobID = options.submissionID
509-
} else {
510-
// Create a channel to receive rayJobID from the API
511-
rayJobIDChan := make(chan string)
512-
513-
// Poll the API for the rayJobID
514-
go func() {
515-
pollStart := time.Now()
516-
for {
517-
jobID, err := options.getJobIDViaAPI(portforwardctx)
518-
if err == nil {
519-
rayJobIDChan <- jobID
520-
break
521-
}
522-
if time.Since(pollStart).Seconds() > jobIDTimeout {
523-
close(rayJobIDChan)
524-
break
525-
}
526-
sleepDur := time.Duration(jobIDPollInterval * float64(time.Second))
527-
time.Sleep(sleepDur)
528-
}
529-
}()
530-
531-
// Wait till rayJobID is populated or the timeout occurs
532-
jobID, ok := <-rayJobIDChan
533-
if !ok {
534-
return fmt.Errorf("submit failed: timeout waiting for job ID from API after %v", jobIDTimeout)
535-
}
536-
rayJobID = jobID
537-
}
508+
rayJobID := options.submissionID
538509

539510
rayCmdStdOutScanner := bufio.NewScanner(rayCmdStdOut)
540511
rayCmdStdErrScanner := bufio.NewScanner(rayCmdStdErr)
@@ -693,24 +664,6 @@ func (options *SubmitJobOptions) raySubmitCmd() ([]string, error) {
693664
return raySubmitCmd, nil
694665
}
695666

696-
// Get the job ID from the dashboard API
697-
func (options *SubmitJobOptions) getJobIDViaAPI(ctx context.Context) (string, error) {
698-
jobs, err := options.dashboardClient.ListJobs(ctx)
699-
if err != nil {
700-
return "", fmt.Errorf("failed to list jobs via dashboard client: %w", err)
701-
}
702-
703-
if jobs == nil || len(*jobs) == 0 {
704-
return "", fmt.Errorf("no jobs returned from dashboard")
705-
}
706-
707-
// Basically, there is only one job in the list, so we can just return the first one.
708-
for _, job := range *jobs {
709-
return job.SubmissionId, nil
710-
}
711-
return "", fmt.Errorf("no jobs found from dashboard")
712-
}
713-
714667
// Decode RayJob YAML if we decide to submit job using kube client
715668
func decodeRayJobYaml(rayJobFilePath string) (*rayv1.RayJob, error) {
716669
decodedRayJob := &rayv1.RayJob{}
@@ -751,3 +704,21 @@ func runtimeEnvHasWorkingDir(runtimePath string) (string, error) {
751704
func isRayClusterReady(rayCluster *rayv1.RayCluster) bool {
752705
return meta.IsStatusConditionTrue(rayCluster.Status.Conditions, "Ready") || rayCluster.Status.State == rayv1.Ready
753706
}
707+
708+
// Generates a 16-character random ID with a prefix, mimicking Ray Job submission_id.
709+
// ref: ray/python/ray/dashboard/modules/job/job_manager.py
710+
func generateSubmissionID() (string, error) {
711+
// ASCII letters and digits, excluding confusing characters I, l, O, 0, o.
712+
const possibleChars = "abcdefghijkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789"
713+
714+
idRunes := make([]rune, 16)
715+
for i := range idRunes {
716+
// Securely generate a random index.
717+
idx, err := rand.Int(rand.Reader, big.NewInt(int64(len(possibleChars))))
718+
if err != nil {
719+
return "", err
720+
}
721+
idRunes[i] = rune(possibleChars[idx.Int64()])
722+
}
723+
return fmt.Sprintf("raysubmit_%s", string(idRunes)), nil
724+
}

0 commit comments

Comments
 (0)