diff --git a/api/types.go b/api/types.go index c18d98d..9e512e9 100644 --- a/api/types.go +++ b/api/types.go @@ -301,6 +301,32 @@ type Container struct { SupplementalGroups []int `json:"supplementalGroupIds,omitempty"` } +// InitContainer is a container that runs to completion before the main job +// container starts. Init containers are only used when the cluster reports +// supportsInitContainers in its ClusterInfo response. +type InitContainer struct { + // The name of the init container. + Name string `json:"name"` + + // The name of the container image to use. + Image string `json:"image"` + + // The shell command to run. Mutually exclusive with Exe. + Command string `json:"command,omitempty"` + + // The executable to run. Mutually exclusive with Command. + Exe string `json:"exe,omitempty"` + + // The arguments to pass to the Command or Exe. + Args []string `json:"args,omitempty"` + + // The file system mounts to apply when the init container runs. + Mounts []Mount `json:"mounts,omitempty"` + + // The environment variables for the init container. + Env []Env `json:"environment,omitempty"` +} + // Env is an environment variable. type Env struct { // The name of the environment variable. @@ -361,6 +387,10 @@ type Job struct { // containers. Optional. Container *Container `json:"container,omitempty"` + // The init containers to run before the main job container. Only used + // when the cluster reports supportsInitContainers. Optional. + InitContainers []InitContainer `json:"initContainers,omitempty"` + // The host on which the Job was (or is being) run. Host string `json:"host,omitempty"` @@ -371,7 +401,9 @@ type Job struct { StatusMsg string `json:"statusMessage,omitempty"` // The standard code/enum for the current status of the Job, if known. - // Optional. + // Optional. Carried in job state responses as part of the Job payload, + // and forwarded explicitly in job status stream responses via + // [launcher.StreamResponseWriter.WriteJobStatus]. StatusCode string `json:"statusCode,omitempty"` // The process ID of the Job, if applicable. Optional. @@ -434,6 +466,11 @@ type Job struct { // "custom". Profile string `json:"resourceProfile,omitempty"` + // The persistent identifier for the Launcher instance that submitted + // this Job. Set by the Launcher; plugins must preserve and return + // this value unchanged. + InstanceID string `json:"instanceId,omitempty"` + // Plugin-local storage of job attributes not exposed through Launcher's // existing API. Misc map[string]interface{} `json:"-"` @@ -462,6 +499,8 @@ func (job *Job) WithFields(fields []string) *Job { scrubbed.WorkDir = job.WorkDir case "container": scrubbed.Container = job.Container + case "initContainers": + scrubbed.InitContainers = job.InitContainers case "host": scrubbed.Host = job.Host case "status": @@ -508,6 +547,8 @@ func (job *Job) WithFields(fields []string) *Job { scrubbed.Metadata = job.Metadata case "resourceProfile": scrubbed.Profile = job.Profile + case "instanceId": + scrubbed.InstanceID = job.InstanceID } } return scrubbed @@ -584,6 +625,9 @@ type Mount struct { // The destination path of the mount. Path string `json:"mountPath"` + // An optional name for the mount. + Name string `json:"name,omitempty"` + // Whether the source path should be mounted with write permissions. ReadOnly bool `json:"readOnly,omitempty"` @@ -803,10 +847,10 @@ type ResourceProfile struct { Limits []ResourceLimit `json:"limits,omitempty"` // The submission queue for this profile, if applicable. Optional. - Queue string `json:"queue"` + Queue string `json:"queue,omitempty"` // Placement constraints for this profile. Optional. - Constraints []PlacementConstraint `json:"placementConstraints"` + Constraints []PlacementConstraint `json:"placementConstraints,omitempty"` } // Node represents a Launcher/plugin node when running in a load-balanced scenario. diff --git a/cache/cache.go b/cache/cache.go index 10da0b8..07f22fb 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -249,7 +249,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons done := false err := r.Lookup(user, id, func(job *api.Job) { //nolint:errcheck // fire-and-forget convenience wrapper - w.WriteJobStatus(job.ID, job.Status, job.StatusMsg) + w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg) // Break off early if we know there will be no further updates. if api.TerminalStatus(job.Status) { done = true @@ -275,7 +275,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons return } //nolint:errcheck // fire-and-forget convenience wrapper - w.WriteJobStatus(j.ID, j.Status, j.StatusMsg) + w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg) } } } @@ -286,7 +286,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo r.store.JobsForUser(user, nil, func(jobs []*api.Job) { for _, job := range jobs { //nolint:errcheck // fire-and-forget convenience wrapper - w.WriteJobStatus(job.ID, job.Status, job.StatusMsg) + w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg) } }) ch := make(chan *statusUpdate, 1) @@ -300,7 +300,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo return } //nolint:errcheck // fire-and-forget convenience wrapper - w.WriteJobStatus(j.ID, j.Status, j.StatusMsg) + w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg) } } } @@ -447,15 +447,17 @@ func (s *subManager) Close() int { } type statusUpdate struct { - ID api.JobID - User string - Status string - StatusMsg string + ID api.JobID + User string + Name string + Status string + StatusCode string + StatusMsg string } func newStatusUpdateFromJob(job *api.Job) *statusUpdate { return &statusUpdate{ - job.ID, job.User, job.Status, job.StatusMsg, + job.ID, job.User, job.Name, job.Status, job.StatusCode, job.StatusMsg, } } diff --git a/internal/protocol/rpc.go b/internal/protocol/rpc.go index 48542dc..34e3a50 100644 --- a/internal/protocol/rpc.go +++ b/internal/protocol/rpc.go @@ -73,8 +73,6 @@ func requestForType(rt requestType) (interface{}, error) { return &JobNetworkRequest{}, nil case requestClusterInfo: return &ClusterInfoRequest{}, nil - case requestMultiClusterInfo: - return &MultiClusterInfoRequest{}, nil case requestSetLoadBalancerNodes: return &SetLoadBalancerNodesRequest{}, nil case requestConfigReload: @@ -97,7 +95,6 @@ const ( requestJobResourceUtil requestJobNetwork requestClusterInfo - requestMultiClusterInfo requestType = 17 requestSetLoadBalancerNodes requestType = 201 requestConfigReload requestType = 202 ) @@ -207,11 +204,6 @@ type SetLoadBalancerNodesRequest struct { Nodes []api.Node `json:"nodes"` } -// MultiClusterInfoRequest is an extension mechanism for supporting multiple clusters. -type MultiClusterInfoRequest struct { - BaseUserRequest -} - // ConfigReloadRequest is the config reload request. type ConfigReloadRequest struct { BaseUserRequest @@ -230,7 +222,6 @@ const ( responseJobResourceUtil responseJobNetwork responseClusterInfo - responseMultiClusterInfo responseType = 17 responseSetLoadBalancerNodes responseType = 201 responseConfigReload responseType = 202 responseMetrics responseType = 203 @@ -298,18 +289,20 @@ type JobStatusStreamResponse struct { ID api.JobID `json:"id"` Name string `json:"name"` Status string `json:"status"` - Msg string `json:"statusMessage,omitempty"` Code string `json:"statusCode,omitempty"` + Msg string `json:"statusMessage,omitempty"` } // NewJobStatusStreamResponse creates a new job status stream response. -func NewJobStatusStreamResponse(responseID uint64, id, status, msg string) *JobStatusStreamResponse { +func NewJobStatusStreamResponse(responseID uint64, id, name, status, statusCode, msg string) *JobStatusStreamResponse { base := responseBase{responseJobStatus, 0, responseID} return &JobStatusStreamResponse{ responseBase: base, Sequences: []StreamSequence{}, // Ensure we never send null. ID: api.JobID(id), + Name: name, Status: status, + Code: statusCode, Msg: msg, } } @@ -394,20 +387,20 @@ type ClusterInfoResponse struct { ClusterInfo } -// ClusterInfo is the body of a cluster info response; reused for the multicluster extension. +// ClusterInfo is the body of a cluster info response. type ClusterInfo struct { - Containers bool `json:"supportsContainers"` - Configs []api.JobConfig `json:"config"` - Constraints []api.PlacementConstraint `json:"placementConstraints"` - Queues []string `json:"queues,omitempty"` - DefaultQueue string `json:"defaultQueue,omitempty"` - Limits []api.ResourceLimit `json:"resourceLimits"` - Images []string `json:"images,omitempty"` - DefaultImage string `json:"defaultImage,omitempty"` - AllowUnknown bool `json:"allowUnknownImages"` - Profiles []api.ResourceProfile `json:"resourceProfiles"` - HostNetwork bool `json:"containersUseHostNetwork"` - Name string `json:"name,omitempty"` + Containers bool `json:"supportsContainers"` + InitContainers bool `json:"supportsInitContainers"` + Configs []api.JobConfig `json:"config"` + Constraints []api.PlacementConstraint `json:"placementConstraints"` + Queues []string `json:"queues,omitempty"` + DefaultQueue string `json:"defaultQueue,omitempty"` + Limits []api.ResourceLimit `json:"resourceLimits"` + Images []string `json:"images,omitempty"` + DefaultImage string `json:"defaultImage,omitempty"` + AllowUnknown bool `json:"allowUnknownImages"` + Profiles []api.ResourceProfile `json:"resourceProfiles"` + HostNetwork bool `json:"containersUseHostNetwork"` } // NewClusterInfoResponse creates a new cluster info response. @@ -428,21 +421,6 @@ func NewClusterInfoResponse(requestID, responseID uint64, cluster ClusterInfo) * return &ClusterInfoResponse{responseBase: base, ClusterInfo: cluster} } -// MultiClusterInfoResponse is an extension mechanism for supporting multiple clusters. -type MultiClusterInfoResponse struct { - responseBase - Clusters []ClusterInfo `json:"clusters"` -} - -// NewMultiClusterInfoResponse creates a new multicluster info response. -func NewMultiClusterInfoResponse(requestID, responseID uint64, clusters []ClusterInfo) *MultiClusterInfoResponse { - base := responseBase{responseMultiClusterInfo, requestID, responseID} - if clusters == nil { - clusters = []ClusterInfo{} // Ensure we never send null. - } - return &MultiClusterInfoResponse{responseBase: base, Clusters: clusters} -} - // SetLoadBalancerNodesResponse is the set load balanced nodes response. type SetLoadBalancerNodesResponse = responseBase @@ -482,17 +460,19 @@ type HistogramSample struct { type MetricsResponse struct { responseBase UptimeSeconds uint64 `json:"uptimeSeconds"` + MemoryUsageBytes uint64 `json:"memoryUsageBytes"` ClusterInteractionLatencySample *HistogramSample `json:"clusterInteractionLatencySample,omitempty"` } // NewMetricsResponse creates a new metrics response. The requestId and // responseId are both zero because this message is not a response to a // request. -func NewMetricsResponse(uptimeSeconds uint64, latency *HistogramSample) *MetricsResponse { +func NewMetricsResponse(uptimeSeconds, memoryBytes uint64, latency *HistogramSample) *MetricsResponse { base := responseBase{responseMetrics, 0, 0} return &MetricsResponse{ responseBase: base, UptimeSeconds: uptimeSeconds, + MemoryUsageBytes: memoryBytes, ClusterInteractionLatencySample: latency, } } diff --git a/internal/protocol/rpc_test.go b/internal/protocol/rpc_test.go index ae868b5..44cd6eb 100644 --- a/internal/protocol/rpc_test.go +++ b/internal/protocol/rpc_test.go @@ -62,7 +62,7 @@ func TestNewConfigReloadResponse_Success(t *testing.T) { } func TestNewMetricsResponse_Basic(t *testing.T) { - resp := NewMetricsResponse(3600, nil) + resp := NewMetricsResponse(3600, 0, nil) data, err := json.Marshal(resp) if err != nil { @@ -86,6 +86,9 @@ func TestNewMetricsResponse_Basic(t *testing.T) { if uptime := uint64(got["uptimeSeconds"].(float64)); uptime != 3600 { t.Errorf("uptimeSeconds = %d, want 3600", uptime) } + if mem := uint64(got["memoryUsageBytes"].(float64)); mem != 0 { + t.Errorf("memoryUsageBytes = %d, want 0", mem) + } if _, ok := got["clusterInteractionLatencySample"]; ok { t.Error("clusterInteractionLatencySample should be omitted when nil") } @@ -96,7 +99,7 @@ func TestNewMetricsResponse_WithLatency(t *testing.T) { Buckets: []float64{0, 2, 3, 0, 0, 0, 0, 0, 0, 0}, Sum: 1.52, } - resp := NewMetricsResponse(120, latency) + resp := NewMetricsResponse(120, 1024*1024, latency) data, err := json.Marshal(resp) if err != nil { @@ -114,6 +117,9 @@ func TestNewMetricsResponse_WithLatency(t *testing.T) { if uptime := uint64(got["uptimeSeconds"].(float64)); uptime != 120 { t.Errorf("uptimeSeconds = %d, want 120", uptime) } + if mem := uint64(got["memoryUsageBytes"].(float64)); mem != 1024*1024 { + t.Errorf("memoryUsageBytes = %d, want %d", mem, 1024*1024) + } sample, ok := got["clusterInteractionLatencySample"].(map[string]interface{}) if !ok { @@ -136,6 +142,62 @@ func TestNewMetricsResponse_WithLatency(t *testing.T) { } } +func TestNewJobStatusStreamResponse(t *testing.T) { + t.Run("all fields present", func(t *testing.T) { + resp := NewJobStatusStreamResponse(5, "job-1", "My Job", "Running", "PodRunning", "all good") + + data, err := json.Marshal(resp) + if err != nil { + t.Fatalf("json.Marshal() error = %v", err) + } + + var got map[string]interface{} + if err := json.Unmarshal(data, &got); err != nil { + t.Fatalf("json.Unmarshal() error = %v", err) + } + + if mt := int(got["messageType"].(float64)); mt != 3 { + t.Errorf("messageType = %d, want 3", mt) + } + if id := got["id"].(string); id != "job-1" { + t.Errorf("id = %q, want %q", id, "job-1") + } + if name := got["name"].(string); name != "My Job" { + t.Errorf("name = %q, want %q", name, "My Job") + } + if status := got["status"].(string); status != "Running" { + t.Errorf("status = %q, want %q", status, "Running") + } + if code := got["statusCode"].(string); code != "PodRunning" { + t.Errorf("statusCode = %q, want %q", code, "PodRunning") + } + if msg := got["statusMessage"].(string); msg != "all good" { + t.Errorf("statusMessage = %q, want %q", msg, "all good") + } + }) + + t.Run("statusCode omitted when empty", func(t *testing.T) { + resp := NewJobStatusStreamResponse(5, "job-1", "My Job", "Running", "", "") + + data, err := json.Marshal(resp) + if err != nil { + t.Fatalf("json.Marshal() error = %v", err) + } + + var got map[string]interface{} + if err := json.Unmarshal(data, &got); err != nil { + t.Fatalf("json.Unmarshal() error = %v", err) + } + + if _, ok := got["statusCode"]; ok { + t.Error("statusCode should be omitted when empty") + } + if _, ok := got["statusMessage"]; ok { + t.Error("statusMessage should be omitted when empty") + } + }) +} + func TestNewConfigReloadResponse_ErrorTypes(t *testing.T) { tests := []struct { name string diff --git a/launcher/launcher.go b/launcher/launcher.go index cac44ae..4082c2c 100644 --- a/launcher/launcher.go +++ b/launcher/launcher.go @@ -212,14 +212,6 @@ type BootstrappedPlugin interface { Bootstrap(ctx context.Context, w ResponseWriter) } -// MultiClusterPlugin can be implemented by plugins that allow job submission to -// more than one cluster. Note that this is an extension mechanism not supported -// by all Launcher implementations. -type MultiClusterPlugin interface { - Plugin - GetClusters(ctx context.Context, w MultiClusterResponseWriter, user string) -} - // LoadBalancedPlugin can be implemented by plugins that must be aware of other // nodes. type LoadBalancedPlugin interface { @@ -304,20 +296,13 @@ type ResponseWriter interface { // StreamResponseWriter is the interface for writing streaming responses. type StreamResponseWriter interface { ResponseWriter - WriteJobStatus(id api.JobID, status, msg string) error + WriteJobStatus(id api.JobID, name, status, statusCode, msg string) error WriteJobOutput(output string, outputType api.JobOutput) error WriteJobResourceUtil(cpuPercent float64, cpuTime float64, residentMem float64, virtualMem float64) error Close() error } -// MultiClusterResponseWriter is the writer for multicluster responses. -type MultiClusterResponseWriter interface { - ResponseWriter - // WriteClusters sends Launcher information about available clusters. - WriteClusters([]ClusterOptions) error -} - // ClusterOptions describes the capabilities and configuration of a cluster. type ClusterOptions struct { Constraints []api.PlacementConstraint @@ -327,23 +312,22 @@ type ClusterOptions struct { ImageOpt ImageOptions Configs []api.JobConfig Profiles []api.ResourceProfile - Name string } func (o *ClusterOptions) toProtocol() protocol.ClusterInfo { return protocol.ClusterInfo{ - Containers: len(o.ImageOpt.Images) != 0, - Constraints: o.Constraints, - Queues: o.Queues, - DefaultQueue: o.DefaultQueue, - Limits: o.Limits, - Images: o.ImageOpt.Images, - DefaultImage: o.ImageOpt.Default, - AllowUnknown: o.ImageOpt.AllowUnknown, - Configs: o.Configs, - Profiles: o.Profiles, - HostNetwork: o.ImageOpt.HostNetwork, - Name: o.Name, + Containers: len(o.ImageOpt.Images) != 0, + InitContainers: o.ImageOpt.InitContainers, + Constraints: o.Constraints, + Queues: o.Queues, + DefaultQueue: o.DefaultQueue, + Limits: o.Limits, + Images: o.ImageOpt.Images, + DefaultImage: o.ImageOpt.Default, + AllowUnknown: o.ImageOpt.AllowUnknown, + Configs: o.Configs, + Profiles: o.Profiles, + HostNetwork: o.ImageOpt.HostNetwork, } } @@ -357,6 +341,10 @@ type ImageOptions struct { // specify exposed ports. This is common when using Singularity or other // HPC container solutions. HostNetwork bool + + // When true, the cluster supports init containers — containers that run + // to completion before the main job container starts. + InitContainers bool } // Errorf creates an error with the corresponding plugin API code. @@ -522,16 +510,6 @@ func createHandler(ctx context.Context, lgr *slog.Logger, p Plugin, metricsInter case *protocol.ClusterInfoRequest: w = newResponseWriter(req, ch) p.ClusterInfo(ctx, w, r.Username) - case *protocol.MultiClusterInfoRequest: - w = newResponseWriter(req, ch) - mcPlugin, ok := p.(MultiClusterPlugin) - if !ok { - // Servers must allow multicluster requests to return a - // single-cluster response. - p.ClusterInfo(ctx, w, r.Username) - return - } - mcPlugin.GetClusters(ctx, w, r.Username) case *protocol.SetLoadBalancerNodesRequest: w = newResponseWriter(req, ch) lbPlugin, ok := p.(LoadBalancedPlugin) @@ -632,13 +610,13 @@ func (w *defaultResponseWriter) WriteJobs(jobs []*api.Job) error { // non-stream response writer. var errNotStreamWriter = fmt.Errorf("method called on non-stream response writer") -func (w *defaultResponseWriter) WriteJobStatus(id api.JobID, status, msg string) error { +func (w *defaultResponseWriter) WriteJobStatus(id api.JobID, name, status, statusCode, msg string) error { if w.store == nil { return errNotStreamWriter } rid := w.req.ID() resp := protocol.NewJobStatusStreamResponse(nextResponseID(), string(id), - status, msg) + name, status, statusCode, msg) resp.Sequences = []protocol.StreamSequence{ {RequestID: rid, SequenceID: w.store.SequenceID(rid)}, } @@ -714,16 +692,6 @@ func (w *defaultResponseWriter) WriteClusterInfo(o ClusterOptions) error { return w.sendResponse(resp) } -func (w *defaultResponseWriter) WriteClusters(o []ClusterOptions) error { - clusters := make([]protocol.ClusterInfo, len(o)) - for i := range o { - clusters[i] = o[i].toProtocol() - } - resp := protocol.NewMultiClusterInfoResponse(w.req.ID(), nextResponseID(), - clusters) - return w.sendResponse(resp) -} - func (w *defaultResponseWriter) WriteHeartbeat() error { return w.sendResponse(protocol.NewHeartbeatResponse()) } diff --git a/launcher/metrics.go b/launcher/metrics.go index 32ec6da..5d29d43 100644 --- a/launcher/metrics.go +++ b/launcher/metrics.go @@ -35,6 +35,13 @@ type PluginMetrics struct { // snapshot here via [Histogram.Drain]. When nil, no latency data is // reported. ClusterInteractionLatency *protocol.HistogramSample + + // MemoryUsageBytes is the current memory usage of the plugin process in + // bytes. This field is always serialized on the wire (no omitempty). + // Return zero if usage is unavailable or cannot be determined; the + // Launcher treats zero as unknown rather than as a measurement of + // zero bytes. + MemoryUsageBytes uint64 } // Histogram is a thread-safe histogram that accumulates observations locally @@ -162,9 +169,9 @@ func metricsLoop(ctx context.Context, lgr *slog.Logger, ch chan<- interface{}, p // Metrics() implementation does not prevent uptime from being reported. func metricsOnce(ctx context.Context, lgr *slog.Logger, ch chan<- interface{}, mp MetricsPlugin, startTime time.Time) { uptime := uint64(time.Since(startTime).Seconds()) - latency := collectPluginMetrics(ctx, lgr, mp) + pm := collectPluginMetrics(ctx, lgr, mp) - resp := protocol.NewMetricsResponse(uptime, latency) + resp := protocol.NewMetricsResponse(uptime, pm.MemoryUsageBytes, pm.ClusterInteractionLatency) select { case ch <- resp: default: @@ -173,19 +180,18 @@ func metricsOnce(ctx context.Context, lgr *slog.Logger, ch chan<- interface{}, m } // collectPluginMetrics calls the plugin's Metrics method, recovering from -// panics. Returns nil if the plugin is nil or panics. -func collectPluginMetrics(ctx context.Context, lgr *slog.Logger, mp MetricsPlugin) (latency *protocol.HistogramSample) { +// panics. Returns a zero PluginMetrics if the plugin is nil or panics. +func collectPluginMetrics(ctx context.Context, lgr *slog.Logger, mp MetricsPlugin) (pm PluginMetrics) { if mp == nil { - return nil + return } defer func() { if r := recover(); r != nil { lgr.Error("Panic in plugin Metrics() call", + "plugin", fmt.Sprintf("%T", mp), "panic", r, "stack", string(debug.Stack())) - latency = nil } }() - metrics := mp.Metrics(ctx) - return metrics.ClusterInteractionLatency + return mp.Metrics(ctx) } diff --git a/plugintest/builders.go b/plugintest/builders.go index d5ffe82..5c59923 100644 --- a/plugintest/builders.go +++ b/plugintest/builders.go @@ -479,12 +479,6 @@ func NewClusterOptions() *ClusterOptionsBuilder { } } -// WithName sets the cluster name. -func (b *ClusterOptionsBuilder) WithName(name string) *ClusterOptionsBuilder { - b.opts.Name = name - return b -} - // WithQueue adds a queue. func (b *ClusterOptionsBuilder) WithQueue(queue string) *ClusterOptionsBuilder { b.opts.Queues = append(b.opts.Queues, queue) diff --git a/plugintest/example_test.go b/plugintest/example_test.go index 994e82d..f7d2d25 100644 --- a/plugintest/example_test.go +++ b/plugintest/example_test.go @@ -176,8 +176,8 @@ func TestMockStreamResponseWriter(t *testing.T) { t.Run("captures status updates", func(t *testing.T) { w := plugintest.NewMockStreamResponseWriter() - w.WriteJobStatus("job-1", api.StatusRunning, "Job started") - w.WriteJobStatus("job-1", api.StatusFinished, "Job completed") + w.WriteJobStatus("job-1", "My Job", api.StatusRunning, "", "Job started") + w.WriteJobStatus("job-1", "My Job", api.StatusFinished, "", "Job completed") plugintest.AssertStatusCount(t, w, 2) diff --git a/plugintest/mocks.go b/plugintest/mocks.go index e79e181..782d3f6 100644 --- a/plugintest/mocks.go +++ b/plugintest/mocks.go @@ -28,9 +28,6 @@ type MockResponseWriter struct { // ClusterInfo contains the cluster info written via WriteClusterInfo. ClusterInfo *launcher.ClusterOptions - // Clusters contains all clusters written via WriteClusters (for multicluster). - Clusters []launcher.ClusterOptions - // ConfigReloadResults contains all config reload responses written via WriteConfigReload. ConfigReloadResults []ConfigReloadResult } @@ -54,7 +51,6 @@ func NewMockResponseWriter() *MockResponseWriter { Jobs: [][]*api.Job{}, ControlResults: []ControlResult{}, Networks: []NetworkInfo{}, - Clusters: []launcher.ClusterOptions{}, ConfigReloadResults: []ConfigReloadResult{}, } } @@ -118,14 +114,6 @@ func (m *MockResponseWriter) WriteClusterInfo(opts launcher.ClusterOptions) erro return nil } -// WriteClusters implements launcher.MultiClusterResponseWriter. -func (m *MockResponseWriter) WriteClusters(clusters []launcher.ClusterOptions) error { - m.mu.Lock() - defer m.mu.Unlock() - m.Clusters = clusters - return nil -} - // ConfigReloadResult represents a config reload operation result. type ConfigReloadResult struct { ErrorType api.ConfigReloadErrorType @@ -200,7 +188,6 @@ func (m *MockResponseWriter) Reset() { m.ControlResults = []ControlResult{} m.Networks = []NetworkInfo{} m.ClusterInfo = nil - m.Clusters = []launcher.ClusterOptions{} m.ConfigReloadResults = []ConfigReloadResult{} } @@ -225,9 +212,11 @@ type MockStreamResponseWriter struct { // StatusUpdate represents a job status update. type StatusUpdate struct { - ID api.JobID - Status string - Message string + ID api.JobID + Name string + Status string + StatusCode string + Message string } // OutputChunk represents a chunk of job output. @@ -252,7 +241,6 @@ func NewMockStreamResponseWriter() *MockStreamResponseWriter { Jobs: [][]*api.Job{}, ControlResults: []ControlResult{}, Networks: []NetworkInfo{}, - Clusters: []launcher.ClusterOptions{}, }, Statuses: []StatusUpdate{}, Outputs: []OutputChunk{}, @@ -262,13 +250,15 @@ func NewMockStreamResponseWriter() *MockStreamResponseWriter { } // WriteJobStatus implements launcher.StreamResponseWriter. -func (m *MockStreamResponseWriter) WriteJobStatus(id api.JobID, status, msg string) error { +func (m *MockStreamResponseWriter) WriteJobStatus(id api.JobID, name, status, statusCode, msg string) error { m.mu.Lock() defer m.mu.Unlock() m.Statuses = append(m.Statuses, StatusUpdate{ - ID: id, - Status: status, - Message: msg, + ID: id, + Name: name, + Status: status, + StatusCode: statusCode, + Message: msg, }) return nil }