Skip to content

Commit 9719291

Browse files
authored
Merge pull request #11 from posit-dev/fix-missing-protocol-fields
Sync Go SDK with Plugin API v3.7.0 protocol fields
2 parents b6e6355 + 4ed02ba commit 9719291

File tree

9 files changed

+188
-142
lines changed

9 files changed

+188
-142
lines changed

api/types.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,32 @@ type Container struct {
301301
SupplementalGroups []int `json:"supplementalGroupIds,omitempty"`
302302
}
303303

304+
// InitContainer is a container that runs to completion before the main job
305+
// container starts. Init containers are only used when the cluster reports
306+
// supportsInitContainers in its ClusterInfo response.
307+
type InitContainer struct {
308+
// The name of the init container.
309+
Name string `json:"name"`
310+
311+
// The name of the container image to use.
312+
Image string `json:"image"`
313+
314+
// The shell command to run. Mutually exclusive with Exe.
315+
Command string `json:"command,omitempty"`
316+
317+
// The executable to run. Mutually exclusive with Command.
318+
Exe string `json:"exe,omitempty"`
319+
320+
// The arguments to pass to the Command or Exe.
321+
Args []string `json:"args,omitempty"`
322+
323+
// The file system mounts to apply when the init container runs.
324+
Mounts []Mount `json:"mounts,omitempty"`
325+
326+
// The environment variables for the init container.
327+
Env []Env `json:"environment,omitempty"`
328+
}
329+
304330
// Env is an environment variable.
305331
type Env struct {
306332
// The name of the environment variable.
@@ -361,6 +387,10 @@ type Job struct {
361387
// containers. Optional.
362388
Container *Container `json:"container,omitempty"`
363389

390+
// The init containers to run before the main job container. Only used
391+
// when the cluster reports supportsInitContainers. Optional.
392+
InitContainers []InitContainer `json:"initContainers,omitempty"`
393+
364394
// The host on which the Job was (or is being) run.
365395
Host string `json:"host,omitempty"`
366396

@@ -371,7 +401,9 @@ type Job struct {
371401
StatusMsg string `json:"statusMessage,omitempty"`
372402

373403
// The standard code/enum for the current status of the Job, if known.
374-
// Optional.
404+
// Optional. Carried in job state responses as part of the Job payload,
405+
// and forwarded explicitly in job status stream responses via
406+
// [launcher.StreamResponseWriter.WriteJobStatus].
375407
StatusCode string `json:"statusCode,omitempty"`
376408

377409
// The process ID of the Job, if applicable. Optional.
@@ -434,6 +466,11 @@ type Job struct {
434466
// "custom".
435467
Profile string `json:"resourceProfile,omitempty"`
436468

469+
// The persistent identifier for the Launcher instance that submitted
470+
// this Job. Set by the Launcher; plugins must preserve and return
471+
// this value unchanged.
472+
InstanceID string `json:"instanceId,omitempty"`
473+
437474
// Plugin-local storage of job attributes not exposed through Launcher's
438475
// existing API.
439476
Misc map[string]interface{} `json:"-"`
@@ -462,6 +499,8 @@ func (job *Job) WithFields(fields []string) *Job {
462499
scrubbed.WorkDir = job.WorkDir
463500
case "container":
464501
scrubbed.Container = job.Container
502+
case "initContainers":
503+
scrubbed.InitContainers = job.InitContainers
465504
case "host":
466505
scrubbed.Host = job.Host
467506
case "status":
@@ -508,6 +547,8 @@ func (job *Job) WithFields(fields []string) *Job {
508547
scrubbed.Metadata = job.Metadata
509548
case "resourceProfile":
510549
scrubbed.Profile = job.Profile
550+
case "instanceId":
551+
scrubbed.InstanceID = job.InstanceID
511552
}
512553
}
513554
return scrubbed
@@ -584,6 +625,9 @@ type Mount struct {
584625
// The destination path of the mount.
585626
Path string `json:"mountPath"`
586627

628+
// An optional name for the mount.
629+
Name string `json:"name,omitempty"`
630+
587631
// Whether the source path should be mounted with write permissions.
588632
ReadOnly bool `json:"readOnly,omitempty"`
589633

@@ -803,10 +847,10 @@ type ResourceProfile struct {
803847
Limits []ResourceLimit `json:"limits,omitempty"`
804848

805849
// The submission queue for this profile, if applicable. Optional.
806-
Queue string `json:"queue"`
850+
Queue string `json:"queue,omitempty"`
807851

808852
// Placement constraints for this profile. Optional.
809-
Constraints []PlacementConstraint `json:"placementConstraints"`
853+
Constraints []PlacementConstraint `json:"placementConstraints,omitempty"`
810854
}
811855

812856
// Node represents a Launcher/plugin node when running in a load-balanced scenario.

cache/cache.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons
249249
done := false
250250
err := r.Lookup(user, id, func(job *api.Job) {
251251
//nolint:errcheck // fire-and-forget convenience wrapper
252-
w.WriteJobStatus(job.ID, job.Status, job.StatusMsg)
252+
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg)
253253
// Break off early if we know there will be no further updates.
254254
if api.TerminalStatus(job.Status) {
255255
done = true
@@ -275,7 +275,7 @@ func (r *JobCache) StreamJobStatus(ctx context.Context, w launcher.StreamRespons
275275
return
276276
}
277277
//nolint:errcheck // fire-and-forget convenience wrapper
278-
w.WriteJobStatus(j.ID, j.Status, j.StatusMsg)
278+
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg)
279279
}
280280
}
281281
}
@@ -286,7 +286,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo
286286
r.store.JobsForUser(user, nil, func(jobs []*api.Job) {
287287
for _, job := range jobs {
288288
//nolint:errcheck // fire-and-forget convenience wrapper
289-
w.WriteJobStatus(job.ID, job.Status, job.StatusMsg)
289+
w.WriteJobStatus(job.ID, job.Name, job.Status, job.StatusCode, job.StatusMsg)
290290
}
291291
})
292292
ch := make(chan *statusUpdate, 1)
@@ -300,7 +300,7 @@ func (r *JobCache) StreamJobStatuses(ctx context.Context, w launcher.StreamRespo
300300
return
301301
}
302302
//nolint:errcheck // fire-and-forget convenience wrapper
303-
w.WriteJobStatus(j.ID, j.Status, j.StatusMsg)
303+
w.WriteJobStatus(j.ID, j.Name, j.Status, j.StatusCode, j.StatusMsg)
304304
}
305305
}
306306
}
@@ -447,15 +447,17 @@ func (s *subManager) Close() int {
447447
}
448448

449449
type statusUpdate struct {
450-
ID api.JobID
451-
User string
452-
Status string
453-
StatusMsg string
450+
ID api.JobID
451+
User string
452+
Name string
453+
Status string
454+
StatusCode string
455+
StatusMsg string
454456
}
455457

456458
func newStatusUpdateFromJob(job *api.Job) *statusUpdate {
457459
return &statusUpdate{
458-
job.ID, job.User, job.Status, job.StatusMsg,
460+
job.ID, job.User, job.Name, job.Status, job.StatusCode, job.StatusMsg,
459461
}
460462
}
461463

internal/protocol/rpc.go

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ func requestForType(rt requestType) (interface{}, error) {
7373
return &JobNetworkRequest{}, nil
7474
case requestClusterInfo:
7575
return &ClusterInfoRequest{}, nil
76-
case requestMultiClusterInfo:
77-
return &MultiClusterInfoRequest{}, nil
7876
case requestSetLoadBalancerNodes:
7977
return &SetLoadBalancerNodesRequest{}, nil
8078
case requestConfigReload:
@@ -97,7 +95,6 @@ const (
9795
requestJobResourceUtil
9896
requestJobNetwork
9997
requestClusterInfo
100-
requestMultiClusterInfo requestType = 17
10198
requestSetLoadBalancerNodes requestType = 201
10299
requestConfigReload requestType = 202
103100
)
@@ -207,11 +204,6 @@ type SetLoadBalancerNodesRequest struct {
207204
Nodes []api.Node `json:"nodes"`
208205
}
209206

210-
// MultiClusterInfoRequest is an extension mechanism for supporting multiple clusters.
211-
type MultiClusterInfoRequest struct {
212-
BaseUserRequest
213-
}
214-
215207
// ConfigReloadRequest is the config reload request.
216208
type ConfigReloadRequest struct {
217209
BaseUserRequest
@@ -230,7 +222,6 @@ const (
230222
responseJobResourceUtil
231223
responseJobNetwork
232224
responseClusterInfo
233-
responseMultiClusterInfo responseType = 17
234225
responseSetLoadBalancerNodes responseType = 201
235226
responseConfigReload responseType = 202
236227
responseMetrics responseType = 203
@@ -298,18 +289,20 @@ type JobStatusStreamResponse struct {
298289
ID api.JobID `json:"id"`
299290
Name string `json:"name"`
300291
Status string `json:"status"`
301-
Msg string `json:"statusMessage,omitempty"`
302292
Code string `json:"statusCode,omitempty"`
293+
Msg string `json:"statusMessage,omitempty"`
303294
}
304295

305296
// NewJobStatusStreamResponse creates a new job status stream response.
306-
func NewJobStatusStreamResponse(responseID uint64, id, status, msg string) *JobStatusStreamResponse {
297+
func NewJobStatusStreamResponse(responseID uint64, id, name, status, statusCode, msg string) *JobStatusStreamResponse {
307298
base := responseBase{responseJobStatus, 0, responseID}
308299
return &JobStatusStreamResponse{
309300
responseBase: base,
310301
Sequences: []StreamSequence{}, // Ensure we never send null.
311302
ID: api.JobID(id),
303+
Name: name,
312304
Status: status,
305+
Code: statusCode,
313306
Msg: msg,
314307
}
315308
}
@@ -394,20 +387,20 @@ type ClusterInfoResponse struct {
394387
ClusterInfo
395388
}
396389

397-
// ClusterInfo is the body of a cluster info response; reused for the multicluster extension.
390+
// ClusterInfo is the body of a cluster info response.
398391
type ClusterInfo struct {
399-
Containers bool `json:"supportsContainers"`
400-
Configs []api.JobConfig `json:"config"`
401-
Constraints []api.PlacementConstraint `json:"placementConstraints"`
402-
Queues []string `json:"queues,omitempty"`
403-
DefaultQueue string `json:"defaultQueue,omitempty"`
404-
Limits []api.ResourceLimit `json:"resourceLimits"`
405-
Images []string `json:"images,omitempty"`
406-
DefaultImage string `json:"defaultImage,omitempty"`
407-
AllowUnknown bool `json:"allowUnknownImages"`
408-
Profiles []api.ResourceProfile `json:"resourceProfiles"`
409-
HostNetwork bool `json:"containersUseHostNetwork"`
410-
Name string `json:"name,omitempty"`
392+
Containers bool `json:"supportsContainers"`
393+
InitContainers bool `json:"supportsInitContainers"`
394+
Configs []api.JobConfig `json:"config"`
395+
Constraints []api.PlacementConstraint `json:"placementConstraints"`
396+
Queues []string `json:"queues,omitempty"`
397+
DefaultQueue string `json:"defaultQueue,omitempty"`
398+
Limits []api.ResourceLimit `json:"resourceLimits"`
399+
Images []string `json:"images,omitempty"`
400+
DefaultImage string `json:"defaultImage,omitempty"`
401+
AllowUnknown bool `json:"allowUnknownImages"`
402+
Profiles []api.ResourceProfile `json:"resourceProfiles"`
403+
HostNetwork bool `json:"containersUseHostNetwork"`
411404
}
412405

413406
// NewClusterInfoResponse creates a new cluster info response.
@@ -428,21 +421,6 @@ func NewClusterInfoResponse(requestID, responseID uint64, cluster ClusterInfo) *
428421
return &ClusterInfoResponse{responseBase: base, ClusterInfo: cluster}
429422
}
430423

431-
// MultiClusterInfoResponse is an extension mechanism for supporting multiple clusters.
432-
type MultiClusterInfoResponse struct {
433-
responseBase
434-
Clusters []ClusterInfo `json:"clusters"`
435-
}
436-
437-
// NewMultiClusterInfoResponse creates a new multicluster info response.
438-
func NewMultiClusterInfoResponse(requestID, responseID uint64, clusters []ClusterInfo) *MultiClusterInfoResponse {
439-
base := responseBase{responseMultiClusterInfo, requestID, responseID}
440-
if clusters == nil {
441-
clusters = []ClusterInfo{} // Ensure we never send null.
442-
}
443-
return &MultiClusterInfoResponse{responseBase: base, Clusters: clusters}
444-
}
445-
446424
// SetLoadBalancerNodesResponse is the set load balanced nodes response.
447425
type SetLoadBalancerNodesResponse = responseBase
448426

@@ -482,17 +460,19 @@ type HistogramSample struct {
482460
type MetricsResponse struct {
483461
responseBase
484462
UptimeSeconds uint64 `json:"uptimeSeconds"`
463+
MemoryUsageBytes uint64 `json:"memoryUsageBytes"`
485464
ClusterInteractionLatencySample *HistogramSample `json:"clusterInteractionLatencySample,omitempty"`
486465
}
487466

488467
// NewMetricsResponse creates a new metrics response. The requestId and
489468
// responseId are both zero because this message is not a response to a
490469
// request.
491-
func NewMetricsResponse(uptimeSeconds uint64, latency *HistogramSample) *MetricsResponse {
470+
func NewMetricsResponse(uptimeSeconds, memoryBytes uint64, latency *HistogramSample) *MetricsResponse {
492471
base := responseBase{responseMetrics, 0, 0}
493472
return &MetricsResponse{
494473
responseBase: base,
495474
UptimeSeconds: uptimeSeconds,
475+
MemoryUsageBytes: memoryBytes,
496476
ClusterInteractionLatencySample: latency,
497477
}
498478
}

0 commit comments

Comments
 (0)