Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,32 @@ type Container struct {
SupplementalGroups []int `json:"supplementalGroupIds,omitempty"`
}

// InitContainer is a container that runs to completion before the main job
// container starts. Init containers are only used when the cluster reports
// supportsInitContainers in its ClusterInfo response.
type InitContainer struct {
// The name of the init container.
Name string `json:"name"`

// The name of the container image to use.
Image string `json:"image"`

// The shell command to run. Mutually exclusive with Exe.
Command string `json:"command,omitempty"`

// The executable to run. Mutually exclusive with Command.
Exe string `json:"exe,omitempty"`

// The arguments to pass to the Command or Exe.
Args []string `json:"args,omitempty"`

// The file system mounts to apply when the init container runs.
Mounts []Mount `json:"mounts,omitempty"`

// The environment variables for the init container.
Env []Env `json:"environment,omitempty"`
}

// Env is an environment variable.
type Env struct {
// The name of the environment variable.
Expand Down Expand Up @@ -361,6 +387,10 @@ type Job struct {
// containers. Optional.
Container *Container `json:"container,omitempty"`

// The init containers to run before the main job container. Only used
// when the cluster reports supportsInitContainers. Optional.
InitContainers []InitContainer `json:"initContainers,omitempty"`

// The host on which the Job was (or is being) run.
Host string `json:"host,omitempty"`

Expand All @@ -371,7 +401,9 @@ type Job struct {
StatusMsg string `json:"statusMessage,omitempty"`

// The standard code/enum for the current status of the Job, if known.
// Optional.
// Optional. Carried in job state responses as part of the Job payload,
// and forwarded explicitly in job status stream responses via
// [launcher.StreamResponseWriter.WriteJobStatus].
StatusCode string `json:"statusCode,omitempty"`

// The process ID of the Job, if applicable. Optional.
Expand Down Expand Up @@ -434,6 +466,11 @@ type Job struct {
// "custom".
Profile string `json:"resourceProfile,omitempty"`

// The persistent identifier for the Launcher instance that submitted
// this Job. Set by the Launcher; plugins must preserve and return
// this value unchanged.
InstanceID string `json:"instanceId,omitempty"`

// Plugin-local storage of job attributes not exposed through Launcher's
// existing API.
Misc map[string]interface{} `json:"-"`
Expand Down Expand Up @@ -462,6 +499,8 @@ func (job *Job) WithFields(fields []string) *Job {
scrubbed.WorkDir = job.WorkDir
case "container":
scrubbed.Container = job.Container
case "initContainers":
scrubbed.InitContainers = job.InitContainers
case "host":
scrubbed.Host = job.Host
case "status":
Expand Down Expand Up @@ -508,6 +547,8 @@ func (job *Job) WithFields(fields []string) *Job {
scrubbed.Metadata = job.Metadata
case "resourceProfile":
scrubbed.Profile = job.Profile
case "instanceId":
scrubbed.InstanceID = job.InstanceID
}
}
return scrubbed
Expand Down Expand Up @@ -584,6 +625,9 @@ type Mount struct {
// The destination path of the mount.
Path string `json:"mountPath"`

// An optional name for the mount.
Name string `json:"name,omitempty"`

// Whether the source path should be mounted with write permissions.
ReadOnly bool `json:"readOnly,omitempty"`

Expand Down Expand Up @@ -803,10 +847,10 @@ type ResourceProfile struct {
Limits []ResourceLimit `json:"limits,omitempty"`

// The submission queue for this profile, if applicable. Optional.
Queue string `json:"queue"`
Queue string `json:"queue,omitempty"`

// Placement constraints for this profile. Optional.
Constraints []PlacementConstraint `json:"placementConstraints"`
Constraints []PlacementConstraint `json:"placementConstraints,omitempty"`
}

// Node represents a Launcher/plugin node when running in a load-balanced scenario.
Expand Down
20 changes: 11 additions & 9 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons
done := false
err := r.Lookup(user, id, func(job *api.Job) {
//nolint:errcheck // fire-and-forget convenience wrapper
w.WriteJobStatus(job.ID, job.Status, job.StatusMsg)
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg)
// Break off early if we know there will be no further updates.
if api.TerminalStatus(job.Status) {
done = true
Expand All @@ -275,7 +275,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons
return
}
//nolint:errcheck // fire-and-forget convenience wrapper
w.WriteJobStatus(j.ID, j.Status, j.StatusMsg)
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg)
}
}
}
Expand All @@ -286,7 +286,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo
r.store.JobsForUser(user, nil, func(jobs []*api.Job) {
for _, job := range jobs {
//nolint:errcheck // fire-and-forget convenience wrapper
w.WriteJobStatus(job.ID, job.Status, job.StatusMsg)
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg)
}
})
ch := make(chan *statusUpdate, 1)
Expand All @@ -300,7 +300,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo
return
}
//nolint:errcheck // fire-and-forget convenience wrapper
w.WriteJobStatus(j.ID, j.Status, j.StatusMsg)
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg)
}
}
}
Expand Down Expand Up @@ -447,15 +447,17 @@ func (s *subManager) Close() int {
}

type statusUpdate struct {
ID api.JobID
User string
Status string
StatusMsg string
ID api.JobID
User string
Name string
Status string
StatusCode string
StatusMsg string
}

func newStatusUpdateFromJob(job *api.Job) *statusUpdate {
return &statusUpdate{
job.ID, job.User, job.Status, job.StatusMsg,
job.ID, job.User, job.Name, job.Status, job.StatusCode, job.StatusMsg,
}
}

Expand Down
60 changes: 20 additions & 40 deletions internal/protocol/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ func requestForType(rt requestType) (interface{}, error) {
return &JobNetworkRequest{}, nil
case requestClusterInfo:
return &ClusterInfoRequest{}, nil
case requestMultiClusterInfo:
return &MultiClusterInfoRequest{}, nil
case requestSetLoadBalancerNodes:
return &SetLoadBalancerNodesRequest{}, nil
case requestConfigReload:
Expand All @@ -97,7 +95,6 @@ const (
requestJobResourceUtil
requestJobNetwork
requestClusterInfo
requestMultiClusterInfo requestType = 17
requestSetLoadBalancerNodes requestType = 201
requestConfigReload requestType = 202
)
Expand Down Expand Up @@ -207,11 +204,6 @@ type SetLoadBalancerNodesRequest struct {
Nodes []api.Node `json:"nodes"`
}

// MultiClusterInfoRequest is an extension mechanism for supporting multiple clusters.
type MultiClusterInfoRequest struct {
BaseUserRequest
}

// ConfigReloadRequest is the config reload request.
type ConfigReloadRequest struct {
BaseUserRequest
Expand All @@ -230,7 +222,6 @@ const (
responseJobResourceUtil
responseJobNetwork
responseClusterInfo
responseMultiClusterInfo responseType = 17
responseSetLoadBalancerNodes responseType = 201
responseConfigReload responseType = 202
responseMetrics responseType = 203
Expand Down Expand Up @@ -298,18 +289,20 @@ type JobStatusStreamResponse struct {
ID api.JobID `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
Msg string `json:"statusMessage,omitempty"`
Code string `json:"statusCode,omitempty"`
Msg string `json:"statusMessage,omitempty"`
}

// NewJobStatusStreamResponse creates a new job status stream response.
func NewJobStatusStreamResponse(responseID uint64, id, status, msg string) *JobStatusStreamResponse {
func NewJobStatusStreamResponse(responseID uint64, id, name, status, statusCode, msg string) *JobStatusStreamResponse {
base := responseBase{responseJobStatus, 0, responseID}
return &JobStatusStreamResponse{
responseBase: base,
Sequences: []StreamSequence{}, // Ensure we never send null.
ID: api.JobID(id),
Name: name,
Status: status,
Code: statusCode,
Msg: msg,
}
}
Expand Down Expand Up @@ -394,20 +387,20 @@ type ClusterInfoResponse struct {
ClusterInfo
}

// ClusterInfo is the body of a cluster info response; reused for the multicluster extension.
// ClusterInfo is the body of a cluster info response.
type ClusterInfo struct {
Containers bool `json:"supportsContainers"`
Configs []api.JobConfig `json:"config"`
Constraints []api.PlacementConstraint `json:"placementConstraints"`
Queues []string `json:"queues,omitempty"`
DefaultQueue string `json:"defaultQueue,omitempty"`
Limits []api.ResourceLimit `json:"resourceLimits"`
Images []string `json:"images,omitempty"`
DefaultImage string `json:"defaultImage,omitempty"`
AllowUnknown bool `json:"allowUnknownImages"`
Profiles []api.ResourceProfile `json:"resourceProfiles"`
HostNetwork bool `json:"containersUseHostNetwork"`
Name string `json:"name,omitempty"`
Containers bool `json:"supportsContainers"`
InitContainers bool `json:"supportsInitContainers"`
Configs []api.JobConfig `json:"config"`
Constraints []api.PlacementConstraint `json:"placementConstraints"`
Queues []string `json:"queues,omitempty"`
DefaultQueue string `json:"defaultQueue,omitempty"`
Limits []api.ResourceLimit `json:"resourceLimits"`
Images []string `json:"images,omitempty"`
DefaultImage string `json:"defaultImage,omitempty"`
AllowUnknown bool `json:"allowUnknownImages"`
Profiles []api.ResourceProfile `json:"resourceProfiles"`
HostNetwork bool `json:"containersUseHostNetwork"`
}

// NewClusterInfoResponse creates a new cluster info response.
Expand All @@ -428,21 +421,6 @@ func NewClusterInfoResponse(requestID, responseID uint64, cluster ClusterInfo) *
return &ClusterInfoResponse{responseBase: base, ClusterInfo: cluster}
}

// MultiClusterInfoResponse is an extension mechanism for supporting multiple clusters.
type MultiClusterInfoResponse struct {
responseBase
Clusters []ClusterInfo `json:"clusters"`
}

// NewMultiClusterInfoResponse creates a new multicluster info response.
func NewMultiClusterInfoResponse(requestID, responseID uint64, clusters []ClusterInfo) *MultiClusterInfoResponse {
base := responseBase{responseMultiClusterInfo, requestID, responseID}
if clusters == nil {
clusters = []ClusterInfo{} // Ensure we never send null.
}
return &MultiClusterInfoResponse{responseBase: base, Clusters: clusters}
}

// SetLoadBalancerNodesResponse is the set load balanced nodes response.
type SetLoadBalancerNodesResponse = responseBase

Expand Down Expand Up @@ -482,17 +460,19 @@ type HistogramSample struct {
type MetricsResponse struct {
responseBase
UptimeSeconds uint64 `json:"uptimeSeconds"`
MemoryUsageBytes uint64 `json:"memoryUsageBytes"`
ClusterInteractionLatencySample *HistogramSample `json:"clusterInteractionLatencySample,omitempty"`
}

// NewMetricsResponse creates a new metrics response. The requestId and
// responseId are both zero because this message is not a response to a
// request.
func NewMetricsResponse(uptimeSeconds uint64, latency *HistogramSample) *MetricsResponse {
func NewMetricsResponse(uptimeSeconds, memoryBytes uint64, latency *HistogramSample) *MetricsResponse {
base := responseBase{responseMetrics, 0, 0}
return &MetricsResponse{
responseBase: base,
UptimeSeconds: uptimeSeconds,
MemoryUsageBytes: memoryBytes,
ClusterInteractionLatencySample: latency,
}
}
Loading
Loading