Skip to content

Commit b6e6355

Browse files
authored
Add Plugin API v3.7.0 metrics support (#9)
Implement periodic plugin metrics reporting for the Launcher Plugin API v3.7.0. Plugins can now report uptime and cluster interaction latency histograms, which the Launcher exposes on its Prometheus /metrics endpoint. - Add MetricsPlugin optional interface and PluginMetrics type - Add thread-safe Histogram with swap-on-drain pattern for lock-free reads - Add metricsLoop with non-blocking sends, panic recovery, and context cancellation - Add MetricsResponse (message type 203) to the wire protocol - Add RunMetrics conformance scenario for plugin validation - Add --plugin-metrics-interval-seconds flag to DefaultOptions - Bump API version from 3.6.0 to 3.7.0 - Update docs (GUIDE, API, ARCHITECTURE, CHANGELOG) and examples
1 parent 36469a3 commit b6e6355

File tree

16 files changed

+1066
-449
lines changed

16 files changed

+1066
-449
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
## [Unreleased]
99

1010
### Added
11+
- **Plugin API 3.7.0**: Plugin metrics framework. Plugins can now report periodic metrics to the Launcher for Prometheus exposition.
12+
- `MetricsPlugin` optional interface in `launcher` package — plugins implement `Metrics(ctx context.Context) PluginMetrics` to report custom metrics
13+
- `PluginMetrics` struct with `ClusterInteractionLatency` field for scheduler command latency histograms
14+
- `Histogram` type for thread-safe metric accumulation with `Observe()` and `Drain()` methods
15+
- `ClusterInteractionLatencyBuckets` variable with standard bucket boundaries matching the Launcher
16+
- `MetricsInterval` field on `Runtime` and `DefaultOptions` for configuring the collection interval
17+
- `--plugin-metrics-interval-seconds` CLI flag (default: 60, 0 to disable)
18+
- Framework automatically reports `uptimeSeconds`; custom metrics are opt-in via `MetricsPlugin`
19+
- `RunMetrics` conformance test scenario for validating `MetricsPlugin` implementations
20+
- Protocol support for metrics response (message type 203) in `internal/protocol`
21+
- New dependency: `github.com/prometheus/client_golang` for histogram accumulation
1122
- **Plugin API 3.6.0**: Config reload support. The Launcher can now request plugins to reload configuration at runtime without restarting.
1223
- `ConfigReloadablePlugin` optional interface in `launcher` package — plugins implement `ReloadConfig(ctx context.Context) error` to handle reload requests
1324
- `ConfigReloadError` type for classified reload failures (Load, Validate, Save)

api/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ type Version struct {
835835

836836
// APIVersion is the Launcher plugin API version supported by the types defined
837837
// in this package.
838-
var APIVersion = Version{Major: 3, Minor: 6, Patch: 0}
838+
var APIVersion = Version{Major: 3, Minor: 7, Patch: 0}
839839

840840
// ConfigReloadErrorType classifies config reload errors.
841841
type ConfigReloadErrorType int

conformance/conformance_test.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ import (
1919
// testPlugin is a minimal in-memory plugin for testing the conformance suite
2020
// itself. It uses the cache package to handle storage and streaming.
2121
type testPlugin struct {
22-
cache *cache.JobCache
23-
nextID int32
24-
wg sync.WaitGroup
22+
cache *cache.JobCache
23+
nextID int32
24+
wg sync.WaitGroup
25+
latency *launcher.Histogram
2526
}
2627

2728
func newTestPlugin(t *testing.T) *testPlugin {
@@ -31,7 +32,10 @@ func newTestPlugin(t *testing.T) *testPlugin {
3132
if err != nil {
3233
t.Fatalf("failed to create job cache: %v", err)
3334
}
34-
tp := &testPlugin{cache: c}
35+
tp := &testPlugin{
36+
cache: c,
37+
latency: launcher.NewHistogram(launcher.ClusterInteractionLatencyBuckets),
38+
}
3539
t.Cleanup(func() {
3640
tp.wg.Wait()
3741
_ = c.Close()
@@ -183,6 +187,12 @@ func (p *testPlugin) GetJobNetwork(_ context.Context, w launcher.ResponseWriter,
183187
}
184188
}
185189

190+
func (p *testPlugin) Metrics(_ context.Context) launcher.PluginMetrics {
191+
return launcher.PluginMetrics{
192+
ClusterInteractionLatency: p.latency.Drain(),
193+
}
194+
}
195+
186196
func (p *testPlugin) ClusterInfo(_ context.Context, w launcher.ResponseWriter, _ string) {
187197
w.WriteClusterInfo(launcher.ClusterOptions{
188198
Queues: []string{"default"},
@@ -324,6 +334,13 @@ func TestRunFieldFiltering(t *testing.T) {
324334
})
325335
}
326336

337+
func TestRunMetrics(t *testing.T) {
338+
p := newTestPlugin(t)
339+
conformance.RunMetrics(t, p, conformance.MetricsOpts{
340+
Timeout: 2 * time.Second,
341+
})
342+
}
343+
327344
func TestRunControlInvalidState(t *testing.T) {
328345
p := newTestPlugin(t)
329346
conformance.RunControlInvalidState(t, p, "testuser", conformance.InvalidStateOpts{

conformance/scenarios.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,56 @@ func RunControlInvalidState(t *testing.T, p launcher.Plugin, user string, opts I
638638
}
639639
}
640640

641+
// MetricsOpts configures the [RunMetrics] scenario.
642+
type MetricsOpts struct {
643+
// Timeout for the Metrics call. Default: 1s.
644+
Timeout time.Duration
645+
}
646+
647+
// RunMetrics verifies that a [launcher.MetricsPlugin] implementation returns
648+
// promptly and produces valid metrics data.
649+
func RunMetrics(t *testing.T, p launcher.Plugin, opts MetricsOpts) {
650+
t.Helper()
651+
timeout := defaultTimeout(opts.Timeout, time.Second)
652+
653+
mp, ok := p.(launcher.MetricsPlugin)
654+
if !ok {
655+
t.Skip("plugin does not implement MetricsPlugin")
656+
}
657+
658+
t.Run("ReturnsWithinTimeout", func(t *testing.T) {
659+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
660+
defer cancel()
661+
662+
done := make(chan launcher.PluginMetrics, 1)
663+
go func() {
664+
done <- mp.Metrics(ctx)
665+
}()
666+
667+
select {
668+
case <-done:
669+
// Good — Metrics returned promptly.
670+
case <-ctx.Done():
671+
t.Fatal("Metrics() did not return within timeout")
672+
}
673+
})
674+
675+
t.Run("LatencyBucketsNonNegative", func(t *testing.T) {
676+
metrics := mp.Metrics(context.Background())
677+
if metrics.ClusterInteractionLatency == nil {
678+
t.Skip("no cluster interaction latency reported")
679+
}
680+
for i, v := range metrics.ClusterInteractionLatency.Buckets {
681+
if v < 0 {
682+
t.Errorf("bucket[%d] = %v, want >= 0", i, v)
683+
}
684+
}
685+
if metrics.ClusterInteractionLatency.Sum < 0 {
686+
t.Errorf("sum = %v, want >= 0", metrics.ClusterInteractionLatency.Sum)
687+
}
688+
})
689+
}
690+
641691
// assertExitCode checks that the job's exit code is in the list of
642692
// acceptable values.
643693
func assertExitCode(t *testing.T, job *api.Job, acceptable []int) {

docs/API.md

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,34 @@ Every response from the plugin includes:
6666
| 7 | Get Job Resource Util | Stream resource utilization metrics for a job. |
6767
| 8 | Get Job Network | Get network information (hostname, IPs) for a job. |
6868
| 9 | Get Cluster Info | Get cluster capabilities and configuration. |
69+
| 17 | Multi Cluster Info | Get capabilities for multiple clusters. |
70+
| 201 | Set Load Balancer Nodes | Update load-balanced node list. |
71+
| 202 | Config Reload | Request configuration reload. |
72+
| 203 | Metrics Response | Periodic plugin metrics (plugin-initiated, no request). |
73+
74+
### Metrics response (type 203)
75+
76+
Unlike all other protocol messages, the plugin initiates the metrics response. The plugin sends it periodically on a timer (controlled by `--plugin-metrics-interval-seconds`) without any corresponding request from the Launcher. Both `requestId` and `responseId` are zero.
77+
78+
```json
79+
{
80+
"messageType": 203,
81+
"requestId": 0,
82+
"responseId": 0,
83+
"uptimeSeconds": 3600,
84+
"clusterInteractionLatencySample": {
85+
"buckets": [0, 2, 3, 0, 0, 0, 0, 0, 0, 0],
86+
"sum": 1.52
87+
}
88+
}
89+
```
90+
91+
| Field | Type | Description |
92+
|-------|------|-------------|
93+
| `uptimeSeconds` | uint64 | Seconds since the plugin started. Always present. |
94+
| `clusterInteractionLatencySample` | object | Histogram snapshot of cluster interaction latency. Optional. |
95+
| `clusterInteractionLatencySample.buckets` | []float64 | Per-bucket observation counts (non-cumulative). |
96+
| `clusterInteractionLatencySample.sum` | float64 | Sum of all observed values. |
6997

7098
### Stream responses
7199

@@ -239,6 +267,58 @@ Returns information about cluster capabilities (queues, resource limits, contain
239267
- `w` - ResponseWriter to send cluster info
240268
- `user` - Username requesting info
241269

270+
### Type: MetricsPlugin (optional interface)
271+
272+
```go
273+
type MetricsPlugin interface {
274+
Plugin
275+
Metrics(ctx context.Context) PluginMetrics
276+
}
277+
```
278+
279+
Plugins that want to report custom metrics to the Launcher implement this interface. The `Metrics` method is called periodically (controlled by `--plugin-metrics-interval-seconds`). All plugins automatically report `uptimeSeconds`; implement this interface only for additional plugin-specific metrics like cluster interaction latency.
280+
281+
Implementations should return quickly and avoid blocking I/O.
282+
283+
### Type: PluginMetrics
284+
285+
```go
286+
type PluginMetrics struct {
287+
ClusterInteractionLatency *protocol.HistogramSample
288+
}
289+
```
290+
291+
Contains metrics data collected by a plugin.
292+
293+
| Field | Type | Description |
294+
|-------|------|-------------|
295+
| `ClusterInteractionLatency` | `*protocol.HistogramSample` | Histogram snapshot of cluster interaction latency. Nil means no data. |
296+
297+
### Type: Histogram
298+
299+
```go
300+
type Histogram struct { /* unexported fields */ }
301+
302+
func NewHistogram(buckets []float64) *Histogram
303+
func (h *Histogram) Observe(v float64)
304+
func (h *Histogram) Drain() *protocol.HistogramSample
305+
```
306+
307+
A thread-safe histogram that accumulates observations locally and can be drained into a portable snapshot for sending to the Launcher. Use `NewHistogram(ClusterInteractionLatencyBuckets)` to create one with the correct bucket boundaries.
308+
309+
- `Observe` records a single observation (e.g., a latency measurement in seconds). Safe for concurrent use.
310+
- `Drain` collects all accumulated observations since the last drain, resets the histogram, and returns a portable snapshot. Returns nil if no observations have been recorded.
311+
312+
### Variable: ClusterInteractionLatencyBuckets
313+
314+
```go
315+
var ClusterInteractionLatencyBuckets = []float64{
316+
0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0, 10.0, 30.0,
317+
}
318+
```
319+
320+
The histogram bucket upper bounds (in seconds) for cluster interaction latency. These must match the Launcher's bucket boundaries so histogram data can be replayed correctly.
321+
242322
### Type: ResponseWriter
243323

244324
```go
@@ -373,12 +453,18 @@ Closes the stream. Must be called when streaming is complete.
373453

374454
```go
375455
type Runtime struct {
376-
// contains filtered or unexported fields
456+
MaxMessageSize int
457+
MetricsInterval time.Duration
377458
}
378459
```
379460

380461
The Runtime handles the request/response protocol and dispatches to plugin methods.
381462

463+
| Field | Type | Description |
464+
|-------|------|-------------|
465+
| `MaxMessageSize` | `int` | Upper limit on message size for requests and responses. |
466+
| `MetricsInterval` | `time.Duration` | Interval between periodic metrics reports. Zero disables. Typically set from `DefaultOptions.MetricsInterval`. |
467+
382468
#### Function: NewRuntime
383469

384470
```go
@@ -413,6 +499,7 @@ type DefaultOptions struct {
413499
Debug bool
414500
JobExpiry time.Duration
415501
HeartbeatInterval time.Duration
502+
MetricsInterval time.Duration
416503
LauncherConfig string
417504
PluginName string
418505
ScratchPath string

docs/ARCHITECTURE.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,28 @@ type Error struct {
353353
- Better than string parsing
354354
- Follows Launcher API specification
355355

356+
### Metrics collection
357+
358+
The SDK supports periodic metrics reporting to the Launcher (API v3.7.0+). Unlike all other protocol messages, the plugin initiates metrics — it sends `MetricsResponse` messages on a timer without any corresponding request.
359+
360+
```
361+
Bootstrap completes → Start metrics goroutine → Every N seconds:
362+
Collect uptime + plugin metrics → Serialize → Send via response channel
363+
```
364+
365+
**Data flow:**
366+
367+
```
368+
Plugin (local accumulator) → MetricsResponse (JSON/IPC) → Launcher (Prometheus registry)
369+
Observe() Drain + serialize ObserveMultiple()
370+
```
371+
372+
The plugin uses a local prometheus histogram as a cache, accumulating observations on the hot path (e.g., timing each Slurm command). On each metrics tick, the framework drains the histogram (collecting and resetting it) and sends the snapshot to the Launcher, which replays the data into its own Prometheus registry.
373+
374+
**Why push-based?** The Launcher-plugin IPC channel has no QoS. Requesting metrics on-demand could delay time-sensitive messages (job status updates, control operations). Push-based metrics use the existing response channel and are inherently non-blocking from the Launcher's perspective.
375+
376+
**Why swap-on-drain?** The Go prometheus client does not expose a `Reset()` method on individual histograms. The SDK works around this by atomically swapping the current histogram for a fresh one on each drain, then collecting from the old instance.
377+
356378
## Job cache design
357379

358380
### Storage and startup

docs/GUIDE.md

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,89 @@ func (p *MyPlugin) ReloadConfig(ctx context.Context) error {
10451045
}
10461046
```
10471047

1048+
### Plugin metrics
1049+
1050+
The Launcher collects periodic metrics from plugins and exposes them on its Prometheus `/metrics` endpoint. All plugins automatically report `uptimeSeconds`. Plugins that interact with external schedulers can report additional metrics by implementing the `MetricsPlugin` interface.
1051+
1052+
The Launcher passes `--plugin-metrics-interval-seconds <N>` at startup (default: 60, 0 to disable). The SDK handles the timer and IPC automatically.
1053+
1054+
#### Reporting cluster interaction latency
1055+
1056+
If your plugin runs CLI commands or makes API calls to a scheduler, you can measure their latency and report it as a histogram. A cluster interaction is any individual call to the external scheduler — a CLI command invocation, an HTTP/gRPC API request, or an SDK method call. Measure the wall-clock duration of the external call itself, from invocation to response.
1057+
1058+
**What to measure:**
1059+
1060+
- Time every external scheduler call: job submission, control operations (stop/kill/cancel), status queries, output retrieval, resource usage queries, etc.
1061+
- For batch operations (e.g., a single `squeue` call that returns status for many jobs), record one observation for the entire call, not one per job.
1062+
- Measure only the external call duration. Don't include internal cache lookups, response serialization, or in-process logic.
1063+
1064+
**Setup:**
1065+
1066+
```go
1067+
type MyPlugin struct {
1068+
latency *launcher.Histogram
1069+
}
1070+
1071+
func NewMyPlugin() *MyPlugin {
1072+
return &MyPlugin{
1073+
latency: launcher.NewHistogram(launcher.ClusterInteractionLatencyBuckets),
1074+
}
1075+
}
1076+
1077+
// Metrics implements launcher.MetricsPlugin.
1078+
func (p *MyPlugin) Metrics(ctx context.Context) launcher.PluginMetrics {
1079+
return launcher.PluginMetrics{
1080+
ClusterInteractionLatency: p.latency.Drain(),
1081+
}
1082+
}
1083+
```
1084+
1085+
Then record latency wherever your plugin calls the scheduler:
1086+
1087+
```go
1088+
func (p *MyPlugin) SubmitJob(ctx context.Context, w launcher.ResponseWriter, user string, job *api.Job) {
1089+
start := time.Now()
1090+
result, err := runSchedulerCommand(ctx, "submit", job)
1091+
p.latency.Observe(time.Since(start).Seconds())
1092+
// ... handle result ...
1093+
}
1094+
1095+
func (p *MyPlugin) ControlJob(ctx context.Context, w launcher.ResponseWriter, user string, id api.JobID, op api.JobOperation) {
1096+
start := time.Now()
1097+
err := runSchedulerCommand(ctx, string(op), id)
1098+
p.latency.Observe(time.Since(start).Seconds())
1099+
// ... handle result ...
1100+
}
1101+
1102+
func (p *MyPlugin) pollStatuses(ctx context.Context) {
1103+
// Batch status query — one observation for the entire call
1104+
start := time.Now()
1105+
statuses, err := runSchedulerCommand(ctx, "status", "--all")
1106+
p.latency.Observe(time.Since(start).Seconds())
1107+
// ... update cache ...
1108+
}
1109+
```
1110+
1111+
The `Histogram` type is thread-safe. Call `Observe` from any goroutine. The framework calls `Drain` on each metrics tick, which collects all accumulated observations and resets the histogram.
1112+
1113+
#### Wiring metrics into the runtime
1114+
1115+
Pass the metrics interval from `DefaultOptions` to the `Runtime`:
1116+
1117+
```go
1118+
opts := &launcher.DefaultOptions{}
1119+
launcher.MustLoadOptions(opts, "myplugin")
1120+
1121+
rt := launcher.NewRuntime(lgr, plugin)
1122+
rt.MetricsInterval = opts.MetricsInterval
1123+
```
1124+
1125+
#### How it works
1126+
1127+
The plugin accumulates metrics locally (using a prometheus histogram as a cache). On each metrics tick, the framework drains the accumulated data and sends a `MetricsResponse` (message type 203) to the Launcher over the IPC channel. The Launcher replays the histogram data into its own Prometheus registry, which API clients can then query.
1128+
1129+
This design avoids adding request/response overhead to the Launcher-plugin connection for metrics collection. The team rejected the on-demand alternative because there is no QoS on the IPC channel and metrics requests could delay more important messages.
1130+
10481131
### User profiles
10491132

10501133
System administrators may want to set default or maximum values for certain features on a per-user or per-group basis. For example, different groups of users could have different memory limits or CPU counts.

examples/inmemory/main.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,12 @@ func main() {
376376
nextID: 0,
377377
}
378378

379-
// Create the runtime and start handling requests
380-
// This blocks until the context is cancelled (e.g., Ctrl+C)
379+
// Create the runtime and start handling requests.
380+
// This blocks until the context is cancelled (e.g., Ctrl+C).
381381
lgr.Info("Plugin ready to accept requests")
382-
if err := launcher.NewRuntime(lgr, plugin).Run(ctx); err != nil {
382+
rt := launcher.NewRuntime(lgr, plugin)
383+
rt.MetricsInterval = options.MetricsInterval
384+
if err := rt.Run(ctx); err != nil {
383385
lgr.Error("Plugin runtime error", "error", err)
384386
os.Exit(1)
385387
}

0 commit comments

Comments
 (0)