Skip to content

Commit 1ec0c40

Browse files
committed
feat(op-acceptor): add CI parallelism splitting and report-from-events mode
Add --split-total / --split-index flags for deterministic round-robin test distribution across CircleCI parallel nodes. Each node independently discovers all tests, sorts by package|function name, and runs only its assigned subset. Add --report-from-events flag that reads a merged raw_go_events.log file and generates a consolidated HTML report through the standard FileLogger pipeline, enabling a post-parallelism report consolidation job. Also consolidate duplicate CSV parsing helpers into a single parseCSV function and remove unreachable gate validation code.
1 parent 9e3b5d0 commit 1ec0c40

File tree

8 files changed

+473
-45
lines changed

8 files changed

+473
-45
lines changed

op-acceptor/config.go

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -37,48 +37,50 @@ type Config struct {
3737
FlakeShake bool // Enable flake-shake mode for test stability validation
3838
FlakeShakeIterations int // Number of times to run each test in flake-shake mode
3939
DryRun bool // If true, show what tests would be run without executing them
40+
ReportFromEvents string // Path to raw events file for report-only mode (empty = normal mode)
41+
SplitTotal int // Total split nodes for CI parallelism (0 = no splitting)
42+
SplitIndex int // This node's index (0-based) for CI parallelism
4043
Log log.Logger
4144
ExcludeGates []string // List of gate IDs whose tests should be excluded
4245
}
4346

44-
// NewConfig creates a new Config from cli context
45-
// parseGates parses a comma-separated string of gate IDs into a slice
46-
func parseGates(gateStr string) []string {
47-
if gateStr == "" {
47+
// parseCSV splits a comma-separated string into a trimmed, non-empty slice.
48+
func parseCSV(s string) []string {
49+
if s == "" {
4850
return nil
4951
}
50-
var gates []string
51-
for _, g := range strings.Split(gateStr, ",") {
52-
g = strings.TrimSpace(g)
53-
if g != "" {
54-
gates = append(gates, g)
52+
var out []string
53+
for _, v := range strings.Split(s, ",") {
54+
v = strings.TrimSpace(v)
55+
if v != "" {
56+
out = append(out, v)
5557
}
5658
}
57-
return gates
59+
return out
5860
}
5961

6062
// NewConfig creates a new Config from cli context
6163
func NewConfig(ctx *cli.Context, log log.Logger, testDir string, validatorConfig string, gate string) (*Config, error) {
62-
gates := parseGates(gate)
63-
// Parse flags
64-
if err := flags.CheckRequired(ctx); err != nil {
65-
return nil, fmt.Errorf("missing required flags: %w", err)
66-
}
67-
if testDir == "" {
68-
return nil, errors.New("test directory is required")
64+
gates := parseCSV(gate)
65+
66+
reportFromEvents := ctx.String(flags.ReportFromEvents.Name)
67+
68+
// In report-from-events mode, testdir is not required
69+
if reportFromEvents == "" {
70+
if err := flags.CheckRequired(ctx); err != nil {
71+
return nil, fmt.Errorf("missing required flags: %w", err)
72+
}
73+
if testDir == "" {
74+
return nil, errors.New("test directory is required")
75+
}
6976
}
7077

7178
// Determine if we're in gateless mode (gate not specified). Validator config is optional in gateless mode
7279
gatelessMode := len(gates) == 0
7380

74-
// In gateless mode, we don't require validator config or gate
75-
if !gatelessMode {
76-
if validatorConfig == "" {
77-
return nil, errors.New("validator configuration file is required when not in gateless mode")
78-
}
79-
if len(gates) == 0 {
80-
return nil, errors.New("gate is required when not in gateless mode")
81-
}
81+
// In gate mode, validator config is required
82+
if !gatelessMode && validatorConfig == "" {
83+
return nil, errors.New("validator configuration file is required when not in gateless mode")
8284
}
8385

8486
var absValidatorConfig string
@@ -120,7 +122,15 @@ func NewConfig(ctx *cli.Context, log log.Logger, testDir string, validatorConfig
120122

121123
devnetEnvURL := ctx.String(flags.DevnetEnvURL.Name)
122124

123-
excludeGates := parseExcludeGates(ctx.String(flags.ExcludeGates.Name))
125+
splitTotal := ctx.Int(flags.SplitTotal.Name)
126+
splitIndex := ctx.Int(flags.SplitIndex.Name)
127+
if splitTotal > 0 {
128+
if splitIndex < 0 || splitIndex >= splitTotal {
129+
return nil, fmt.Errorf("split-index must be >= 0 and < split-total (%d), got %d", splitTotal, splitIndex)
130+
}
131+
}
132+
133+
excludeGates := parseCSV(ctx.String(flags.ExcludeGates.Name))
124134

125135
// Conflict: selected gates are also excluded
126136
for _, g := range gates {
@@ -153,26 +163,11 @@ func NewConfig(ctx *cli.Context, log log.Logger, testDir string, validatorConfig
153163
FlakeShake: ctx.Bool(flags.FlakeShake.Name),
154164
FlakeShakeIterations: ctx.Int(flags.FlakeShakeIterations.Name),
155165
DryRun: ctx.Bool(flags.DryRun.Name),
166+
ReportFromEvents: ctx.String(flags.ReportFromEvents.Name),
167+
SplitTotal: splitTotal,
168+
SplitIndex: splitIndex,
156169
LogDir: logDir,
157170
Log: log,
158171
ExcludeGates: excludeGates,
159172
}, nil
160173
}
161-
162-
// parseExcludeGates determines which gates to exclude based on env/flag.
163-
164-
func parseExcludeGates(value string) []string {
165-
val := strings.TrimSpace(value)
166-
if val == "" {
167-
// No default exclusions: empty means no skip gates
168-
return nil
169-
}
170-
parts := strings.Split(val, ",")
171-
var gates []string
172-
for _, p := range parts {
173-
if s := strings.TrimSpace(p); s != "" {
174-
gates = append(gates, s)
175-
}
176-
}
177-
return gates
178-
}

op-acceptor/flags/flags.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,24 @@ var (
185185
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "EXCLUDE_GATES"),
186186
Usage: "Comma-separated list of gate IDs to blacklist globally across all modes.",
187187
}
188+
ReportFromEvents = &cli.StringFlag{
189+
Name: "report-from-events",
190+
Value: "",
191+
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "REPORT_FROM_EVENTS"),
192+
Usage: "Path to a raw_go_events.log file (or merged file). Generates an HTML report from the events without running tests.",
193+
}
194+
SplitTotal = &cli.IntFlag{
195+
Name: "split-total",
196+
Value: 0,
197+
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "SPLIT_TOTAL"),
198+
Usage: "Total number of split nodes for CI parallelism. 0 = no splitting.",
199+
}
200+
SplitIndex = &cli.IntFlag{
201+
Name: "split-index",
202+
Value: 0,
203+
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "SPLIT_INDEX"),
204+
Usage: "Index of this node (0-based) for CI parallelism.",
205+
}
188206
)
189207

190208
var requiredFlags = []cli.Flag{
@@ -212,6 +230,9 @@ var optionalFlags = []cli.Flag{
212230
FlakeShakeIterations,
213231
ExcludeGates,
214232
DryRun,
233+
ReportFromEvents,
234+
SplitTotal,
235+
SplitIndex,
215236
}
216237
var Flags []cli.Flag
217238

op-acceptor/nat.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,21 @@ func New(ctx context.Context, config *Config, version string, shutdownCallback f
105105
return nil, errors.New("config is required")
106106
}
107107

108+
// In report-from-events mode, we only need config + log dir.
109+
// Skip registry, runner, orchestrator, and addons setup.
110+
if config.ReportFromEvents != "" {
111+
config.Log.Info("Report-from-events mode: skipping test runner setup")
112+
return &nat{
113+
ctx: ctx,
114+
config: config,
115+
version: version,
116+
done: make(chan struct{}),
117+
shutdownCallback: shutdownCallback,
118+
networkName: "report",
119+
tracer: otel.Tracer("op-acceptor"),
120+
}, nil
121+
}
122+
108123
reg, err := registry.NewRegistry(registry.Config{
109124
Log: config.Log,
110125
ValidatorConfigFile: config.ValidatorConfig,
@@ -182,6 +197,8 @@ func New(ctx context.Context, config *Config, version string, shutdownCallback f
182197
Concurrency: config.Concurrency,
183198
ShowProgress: config.ShowProgress,
184199
ProgressInterval: config.ProgressInterval,
200+
SplitTotal: config.SplitTotal,
201+
SplitIndex: config.SplitIndex,
185202
})
186203
if err != nil {
187204
return nil, fmt.Errorf("failed to create test runner: %w", err)
@@ -307,6 +324,8 @@ func (n *nat) Start(ctx context.Context) error {
307324
"concurrency", snap.Runner.Concurrency,
308325
"show_progress", snap.Runner.ShowProgress,
309326
"progress_interval", snap.Runner.ProgressInterval,
327+
"split_total", n.config.SplitTotal,
328+
"split_index", n.config.SplitIndex,
310329
"test_log_level", snap.Logging.TestLogLevel,
311330
"output_realtime_logs", snap.Logging.OutputRealtimeLogs,
312331
"network", snap.NetworkName,
@@ -318,6 +337,18 @@ func (n *nat) Start(ctx context.Context) error {
318337
"config.ValidatorConfig", n.config.ValidatorConfig,
319338
"config.LogDir", n.config.LogDir)
320339

340+
// Report-from-events mode: generate HTML report from a raw events file, then exit.
341+
if n.config.ReportFromEvents != "" {
342+
err := n.reportFromEvents()
343+
if err != nil {
344+
return err
345+
}
346+
go func() {
347+
n.shutdownCallback(nil)
348+
}()
349+
return nil
350+
}
351+
321352
// Run tests immediately on startup (or dry-run)
322353
if n.config.DryRun {
323354
err := n.dryRun(ctx)
@@ -1011,9 +1042,39 @@ func (n *nat) dryRun(ctx context.Context) error {
10111042
gateValidators[gateID] = gateTests
10121043
}
10131044

1045+
// Apply CI split filtering if configured (mirrors runner.collectTestWork behavior)
1046+
if n.config.SplitTotal > 0 {
1047+
// Collect all work items in the same way as the runner
1048+
var allWork []runner.TestWork
1049+
for gateName, gateTests := range gateValidators {
1050+
for _, v := range gateTests {
1051+
allWork = append(allWork, runner.TestWork{
1052+
Validator: v,
1053+
GateID: gateName,
1054+
SuiteID: v.Suite,
1055+
})
1056+
}
1057+
}
1058+
filtered := runner.ApplySplitFilter(allWork, n.config.SplitTotal, n.config.SplitIndex)
1059+
1060+
// Rebuild gateValidators from the filtered work items
1061+
gateValidators = make(map[string][]types.ValidatorMetadata)
1062+
for _, w := range filtered {
1063+
gateValidators[w.GateID] = append(gateValidators[w.GateID], w.Validator)
1064+
}
1065+
n.config.Log.Info("DRY RUN: Applied CI split filter",
1066+
"splitTotal", n.config.SplitTotal,
1067+
"splitIndex", n.config.SplitIndex,
1068+
"workItems", len(filtered))
1069+
}
1070+
10141071
t := table.NewWriter()
10151072
t.SetOutputMirror(os.Stdout)
1016-
t.SetTitle("DRY RUN: Planned Test Execution (network: " + n.networkName + ")")
1073+
titleSuffix := ""
1074+
if n.config.SplitTotal > 0 {
1075+
titleSuffix = fmt.Sprintf(" [split %d/%d]", n.config.SplitIndex, n.config.SplitTotal)
1076+
}
1077+
t.SetTitle("DRY RUN: Planned Test Execution (network: " + n.networkName + ")" + titleSuffix)
10171078

10181079
headers := []interface{}{"TYPE", "ID", "TESTS", "STATUS"}
10191080
t.AppendHeader(table.Row(headers))
@@ -1100,6 +1161,52 @@ func (n *nat) dryRun(ctx context.Context) error {
11001161
return nil
11011162
}
11021163

1164+
// reportFromEvents reads a raw_go_events.log file (possibly merged from multiple
1165+
// parallel nodes) and generates a consolidated report without running any tests.
1166+
// It reuses the standard FileLogger pipeline so that all sinks (HTML, text summary,
1167+
// per-test logs, etc.) are exercised through the same code path as normal execution.
1168+
func (n *nat) reportFromEvents() error {
1169+
eventsPath := n.config.ReportFromEvents
1170+
n.config.Log.Info("Generating report from events file", "path", eventsPath)
1171+
1172+
f, err := os.Open(eventsPath)
1173+
if err != nil {
1174+
return fmt.Errorf("failed to open events file %s: %w", eventsPath, err)
1175+
}
1176+
defer f.Close()
1177+
1178+
results, err := runner.ParseMultiPackageEvents(f)
1179+
if err != nil {
1180+
return fmt.Errorf("failed to parse events: %w", err)
1181+
}
1182+
1183+
n.config.Log.Info("Parsed test results from events", "packages", len(results))
1184+
1185+
// Use the same FileLogger pipeline as normal test execution
1186+
runID := uuid.New().String()
1187+
fileLogger, err := logging.NewFileLogger(n.config.LogDir, runID, n.networkName, n.config.TargetGate)
1188+
if err != nil {
1189+
return fmt.Errorf("failed to create file logger: %w", err)
1190+
}
1191+
1192+
// Feed all parsed results through the standard sink pipeline
1193+
for _, result := range results {
1194+
if err := fileLogger.LogTestResult(result, runID); err != nil {
1195+
return fmt.Errorf("failed to log result for %s: %w", result.Metadata.Package, err)
1196+
}
1197+
}
1198+
1199+
// Finalize all sinks (generates HTML, text summary, etc.)
1200+
if err := fileLogger.Complete(runID); err != nil {
1201+
return fmt.Errorf("failed to complete report: %w", err)
1202+
}
1203+
1204+
logDir, _ := fileLogger.GetDirectoryForRunID(runID)
1205+
n.config.Log.Info("Consolidated report generated", "path", logDir)
1206+
1207+
return nil
1208+
}
1209+
11031210
// WaitForShutdown waits for all goroutines to finish
11041211
func (n *nat) WaitForShutdown(ctx context.Context) error {
11051212
timeout := time.NewTimer(time.Second * 5)

0 commit comments

Comments
 (0)