Skip to content

Commit 656450a

Browse files
committed
feat(op-acceptor): add timing-based bin-packing for CI test splitting
Add --split-timing-file and --split-timing-output flags to enable balanced CI splitting using a greedy LPT (Longest Processing Time first) algorithm instead of naive round-robin. When a timing hints JSON file is provided, ApplySplitFilter distributes work items across nodes to minimize makespan (slowest node duration). Without timing data, the existing round-robin fallback is preserved. After each run, timing data can be written to a file for caching and reuse on subsequent CI runs. The report-from-events mode also supports extracting timing data from merged test events.
1 parent 3c1b265 commit 656450a

File tree

6 files changed

+382
-22
lines changed

6 files changed

+382
-22
lines changed

op-acceptor/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type Config struct {
4040
ReportFromEvents string // Path to raw events file for report-only mode (empty = normal mode)
4141
SplitTotal int // Total split nodes for CI parallelism (0 = no splitting)
4242
SplitIndex int // This node's index (0-based) for CI parallelism
43+
SplitTimingFile string // Path to JSON timing hints for balanced CI splitting
44+
SplitTimingOutput string // Path to write updated timing data after test execution
4345
Log log.Logger
4446
ExcludeGates []string // List of gate IDs whose tests should be excluded
4547
}
@@ -166,6 +168,8 @@ func NewConfig(ctx *cli.Context, log log.Logger, testDir string, validatorConfig
166168
ReportFromEvents: ctx.String(flags.ReportFromEvents.Name),
167169
SplitTotal: splitTotal,
168170
SplitIndex: splitIndex,
171+
SplitTimingFile: ctx.String(flags.SplitTimingFile.Name),
172+
SplitTimingOutput: ctx.String(flags.SplitTimingOutput.Name),
169173
LogDir: logDir,
170174
Log: log,
171175
ExcludeGates: excludeGates,

op-acceptor/flags/flags.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,18 @@ var (
203203
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "SPLIT_INDEX"),
204204
Usage: "Index of this node (0-based) for CI parallelism.",
205205
}
206+
SplitTimingFile = &cli.StringFlag{
207+
Name: "split-timing-file",
208+
Value: "",
209+
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "SPLIT_TIMING_FILE"),
210+
Usage: "Path to JSON file with package timing hints for balanced CI splitting.",
211+
}
212+
SplitTimingOutput = &cli.StringFlag{
213+
Name: "split-timing-output",
214+
Value: "",
215+
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "SPLIT_TIMING_OUTPUT"),
216+
Usage: "Path to write updated timing data after test execution (for caching).",
217+
}
206218
)
207219

208220
var requiredFlags = []cli.Flag{
@@ -233,6 +245,8 @@ var optionalFlags = []cli.Flag{
233245
ReportFromEvents,
234246
SplitTotal,
235247
SplitIndex,
248+
SplitTimingFile,
249+
SplitTimingOutput,
236250
}
237251
var Flags []cli.Flag
238252

op-acceptor/nat.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ func New(ctx context.Context, config *Config, version string, shutdownCallback f
199199
ProgressInterval: config.ProgressInterval,
200200
SplitTotal: config.SplitTotal,
201201
SplitIndex: config.SplitIndex,
202+
SplitTimingFile: config.SplitTimingFile,
202203
})
203204
if err != nil {
204205
return nil, fmt.Errorf("failed to create test runner: %w", err)
@@ -550,6 +551,18 @@ func (n *nat) runTests(ctx context.Context) error {
550551
}
551552
}
552553

554+
// Write timing output if configured (for CI caching)
555+
if n.config.SplitTimingOutput != "" && !n.config.FlakeShake && n.result != nil {
556+
timingData := n.extractTimingData(n.result)
557+
if err := runner.WriteTimingFile(n.config.SplitTimingOutput, timingData); err != nil {
558+
n.config.Log.Error("Failed to write timing output", "path", n.config.SplitTimingOutput, "error", err)
559+
} else {
560+
n.config.Log.Info("Wrote timing output for CI caching",
561+
"path", n.config.SplitTimingOutput,
562+
"packages", len(timingData))
563+
}
564+
}
565+
553566
// We should have the same runID from the test run result (skip for flake-shake mode)
554567
if !n.config.FlakeShake && n.result.RunID != runID {
555568
n.config.Log.Warn("RunID from result doesn't match expected runID",
@@ -1055,7 +1068,12 @@ func (n *nat) dryRun(ctx context.Context) error {
10551068
})
10561069
}
10571070
}
1058-
filtered := runner.ApplySplitFilter(allWork, n.config.SplitTotal, n.config.SplitIndex)
1071+
timings, err := runner.LoadTimingFile(n.config.SplitTimingFile)
1072+
if err != nil {
1073+
n.config.Log.Warn("Failed to load timing file, falling back to round-robin",
1074+
"path", n.config.SplitTimingFile, "error", err)
1075+
}
1076+
filtered := runner.ApplySplitFilter(allWork, n.config.SplitTotal, n.config.SplitIndex, timings)
10591077

10601078
// Rebuild gateValidators from the filtered work items
10611079
gateValidators = make(map[string][]types.ValidatorMetadata)
@@ -1065,7 +1083,8 @@ func (n *nat) dryRun(ctx context.Context) error {
10651083
n.config.Log.Info("DRY RUN: Applied CI split filter",
10661084
"splitTotal", n.config.SplitTotal,
10671085
"splitIndex", n.config.SplitIndex,
1068-
"workItems", len(filtered))
1086+
"workItems", len(filtered),
1087+
"timingBased", len(timings) > 0)
10691088
}
10701089

10711090
t := table.NewWriter()
@@ -1204,9 +1223,52 @@ func (n *nat) reportFromEvents() error {
12041223
logDir, _ := fileLogger.GetDirectoryForRunID(runID)
12051224
n.config.Log.Info("Consolidated report generated", "path", logDir)
12061225

1226+
// Write timing output if configured (for CI caching)
1227+
if n.config.SplitTimingOutput != "" {
1228+
timingData := extractTimingDataFromParsedResults(results)
1229+
if err := runner.WriteTimingFile(n.config.SplitTimingOutput, timingData); err != nil {
1230+
n.config.Log.Error("Failed to write timing output from events", "error", err)
1231+
} else {
1232+
n.config.Log.Info("Wrote timing output from events",
1233+
"path", n.config.SplitTimingOutput,
1234+
"packages", len(timingData))
1235+
}
1236+
}
1237+
12071238
return nil
12081239
}
12091240

1241+
// extractTimingData builds a TimingKey → duration_seconds map from test results.
1242+
// This data can be cached and used for timing-based CI splitting on subsequent runs.
1243+
func (n *nat) extractTimingData(result *runner.RunnerResult) map[string]float64 {
1244+
timings := make(map[string]float64)
1245+
for gateName, gate := range result.Gates {
1246+
for _, test := range gate.Tests {
1247+
key := runner.TimingKey(gateName, test.Metadata.Package, test.Metadata.FuncName)
1248+
timings[key] = test.Duration.Seconds()
1249+
}
1250+
for _, suite := range gate.Suites {
1251+
for _, test := range suite.Tests {
1252+
key := runner.TimingKey(gateName, test.Metadata.Package, test.Metadata.FuncName)
1253+
timings[key] = test.Duration.Seconds()
1254+
}
1255+
}
1256+
}
1257+
return timings
1258+
}
1259+
1260+
// extractTimingDataFromParsedResults builds a TimingKey → duration_seconds map
1261+
// from parsed test results (used in report-from-events mode).
1262+
func extractTimingDataFromParsedResults(results []*types.TestResult) map[string]float64 {
1263+
timings := make(map[string]float64)
1264+
for _, result := range results {
1265+
// In gateless/report mode, use "gateless" as the gate ID
1266+
key := runner.TimingKey("gateless", result.Metadata.Package, result.Metadata.FuncName)
1267+
timings[key] = result.Duration.Seconds()
1268+
}
1269+
return timings
1270+
}
1271+
12101272
// WaitForShutdown waits for all goroutines to finish
12111273
func (n *nat) WaitForShutdown(ctx context.Context) error {
12121274
timeout := time.NewTimer(time.Second * 5)

op-acceptor/runner/parallel.go

Lines changed: 122 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package runner
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
7+
"os"
68
"sort"
79
"sync"
810
"time"
@@ -289,31 +291,46 @@ func (r *runner) collectTestWork() []TestWork {
289291

290292
// Apply CI split filtering if configured
291293
if r.splitTotal > 0 {
292-
workItems = ApplySplitFilter(workItems, r.splitTotal, r.splitIndex)
294+
timings, err := LoadTimingFile(r.splitTimingFile)
295+
if err != nil {
296+
r.log.Warn("Failed to load timing file, falling back to round-robin",
297+
"path", r.splitTimingFile, "error", err)
298+
}
299+
workItems = ApplySplitFilter(workItems, r.splitTotal, r.splitIndex, timings)
293300
r.log.Info("Applied CI split filter",
294301
"splitTotal", r.splitTotal,
295302
"splitIndex", r.splitIndex,
296-
"workItems", len(workItems))
303+
"workItems", len(workItems),
304+
"timingBased", len(timings) > 0)
297305
}
298306

299307
return workItems
300308
}
301309

302-
// splitKey returns a deterministic key for sorting work items during CI split filtering.
303-
// Includes GateID so that the same package appearing under different gates (via inheritance)
304-
// gets a stable, distinct position in the sort order.
310+
// TimingKey builds the canonical key used for timing-based CI splitting.
311+
// The format is "gate|package|funcName", which uniquely identifies a test
312+
// work item across gates (the same package under different gates via
313+
// inheritance gets a distinct key).
314+
func TimingKey(gate, pkg, funcName string) string {
315+
return gate + "|" + pkg + "|" + funcName
316+
}
317+
318+
// splitKey returns the timing key for a TestWork item.
305319
func splitKey(w TestWork) string {
306-
return w.GateID + "|" + w.Validator.Package + "|" + w.Validator.FuncName
320+
return TimingKey(w.GateID, w.Validator.Package, w.Validator.FuncName)
307321
}
308322

309-
// ApplySplitFilter sorts work items deterministically and returns only those assigned
310-
// to the given split index. Items are distributed round-robin: item i is assigned to
311-
// node i % total.
312-
func ApplySplitFilter(items []TestWork, total, index int) []TestWork {
323+
// ApplySplitFilter distributes work items across split nodes. When timings are
324+
// provided, it uses a greedy bin-packing (LPT) algorithm for balanced splits.
325+
// Otherwise it falls back to deterministic round-robin by sorted key.
326+
func ApplySplitFilter(items []TestWork, total, index int, timings map[string]float64) []TestWork {
327+
if len(timings) > 0 {
328+
return applySplitByTiming(items, total, index, timings)
329+
}
330+
// Existing round-robin fallback
313331
sort.Slice(items, func(i, j int) bool {
314332
return splitKey(items[i]) < splitKey(items[j])
315333
})
316-
317334
var filtered []TestWork
318335
for i, item := range items {
319336
if i%total == index {
@@ -323,6 +340,100 @@ func ApplySplitFilter(items []TestWork, total, index int) []TestWork {
323340
return filtered
324341
}
325342

343+
// applySplitByTiming uses the Longest Processing Time first (LPT) greedy
344+
// bin-packing algorithm to distribute work items across nodes, minimizing
345+
// the makespan (duration of the slowest node).
346+
func applySplitByTiming(items []TestWork, total, index int, timings map[string]float64) []TestWork {
347+
defaultDuration := medianTiming(timings)
348+
349+
// Build a duration lookup for each item
350+
duration := func(w TestWork) float64 {
351+
if d, ok := timings[splitKey(w)]; ok {
352+
return d
353+
}
354+
return defaultDuration
355+
}
356+
357+
// Sort by duration descending (heaviest first -- standard LPT),
358+
// with tie-break by key for determinism.
359+
sort.Slice(items, func(i, j int) bool {
360+
di, dj := duration(items[i]), duration(items[j])
361+
if di != dj {
362+
return di > dj
363+
}
364+
return splitKey(items[i]) < splitKey(items[j])
365+
})
366+
367+
// Greedy assignment: assign each item to the node with the lowest total
368+
nodeTotals := make([]float64, total)
369+
nodeItems := make([][]TestWork, total)
370+
for _, item := range items {
371+
minNode := 0
372+
for n := 1; n < total; n++ {
373+
if nodeTotals[n] < nodeTotals[minNode] {
374+
minNode = n
375+
}
376+
}
377+
nodeItems[minNode] = append(nodeItems[minNode], item)
378+
nodeTotals[minNode] += duration(item)
379+
}
380+
381+
return nodeItems[index]
382+
}
383+
384+
// medianTiming returns the median of the timing values. If the map is empty,
385+
// returns 60.0 as a sensible default for unknown test durations.
386+
func medianTiming(timings map[string]float64) float64 {
387+
if len(timings) == 0 {
388+
return 60.0
389+
}
390+
vals := make([]float64, 0, len(timings))
391+
for _, v := range timings {
392+
vals = append(vals, v)
393+
}
394+
sort.Float64s(vals)
395+
mid := len(vals) / 2
396+
if len(vals)%2 == 0 {
397+
return (vals[mid-1] + vals[mid]) / 2
398+
}
399+
return vals[mid]
400+
}
401+
402+
// LoadTimingFile reads a JSON file mapping TimingKey → duration_seconds.
403+
// Returns nil map (not error) if the path is empty or the file doesn't exist.
404+
func LoadTimingFile(path string) (map[string]float64, error) {
405+
if path == "" {
406+
return nil, nil
407+
}
408+
data, err := os.ReadFile(path)
409+
if err != nil {
410+
if os.IsNotExist(err) {
411+
return nil, nil
412+
}
413+
return nil, fmt.Errorf("reading timing file: %w", err)
414+
}
415+
var timings map[string]float64
416+
if err := json.Unmarshal(data, &timings); err != nil {
417+
return nil, fmt.Errorf("parsing timing file: %w", err)
418+
}
419+
return timings, nil
420+
}
421+
422+
// WriteTimingFile writes a JSON map of TimingKey → duration_seconds.
423+
func WriteTimingFile(path string, timings map[string]float64) error {
424+
if path == "" {
425+
return nil
426+
}
427+
data, err := json.MarshalIndent(timings, "", " ")
428+
if err != nil {
429+
return fmt.Errorf("marshalling timing data: %w", err)
430+
}
431+
if err := os.WriteFile(path, data, 0644); err != nil {
432+
return fmt.Errorf("writing timing file: %w", err)
433+
}
434+
return nil
435+
}
436+
326437
// initializeProgressTracking sets up data structures to concurrently
327438
// track progress for each gate and suite in the scheduled work items
328439
func (pe *ParallelExecutor) initializeProgressTracking(workItems []TestWork) {

0 commit comments

Comments
 (0)