Skip to content

Commit 2d4aa10

Browse files
yasithdevclaude
andcommitted
Add workflow status API for direct linkspan monitoring
Linkspan now exposes GET /api/v1/status with real-time workflow state (running/complete/failed), current step, and all captured outputs. GET /api/v1/health provides a simple liveness check. The workflow Engine tracks execution state with mutex-protected Status struct. CS-Bridge polls /api/v1/status through the tunnel once the tunnel URL is known, falling back to SSH log parsing when unreachable. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2c6910f commit 2d4aa10

File tree

2 files changed

+103
-6
lines changed

2 files changed

+103
-6
lines changed

internal/workflow/workflow.go

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"log"
88
"os"
9+
"sync"
910
"text/template"
1011

1112
"gopkg.in/yaml.v3"
@@ -57,18 +58,60 @@ func parse(data []byte) (*WorkflowConfig, error) {
5758
return &cfg, nil
5859
}
5960

61+
// WorkflowState represents the current execution status.
62+
type WorkflowState string
63+
64+
const (
65+
StateIdle WorkflowState = "idle"
66+
StateRunning WorkflowState = "running"
67+
StateComplete WorkflowState = "complete"
68+
StateFailed WorkflowState = "failed"
69+
)
70+
71+
// Status is a snapshot of the engine's current execution state,
72+
// safe to serialize as JSON for the /api/v1/status endpoint.
73+
type Status struct {
74+
State WorkflowState `json:"state"`
75+
CurrentStep int `json:"currentStep"`
76+
TotalSteps int `json:"totalSteps"`
77+
StepName string `json:"stepName,omitempty"`
78+
Error string `json:"error,omitempty"`
79+
Outputs map[string]any `json:"outputs"`
80+
}
81+
6082
// Engine executes a workflow using its registry and collected variables.
6183
type Engine struct {
6284
Registry *Registry
6385
Vars map[string]any
86+
87+
mu sync.Mutex
88+
status Status
6489
}
6590

6691
// NewEngine creates an Engine with the given registry and initial variables.
6792
func NewEngine(reg *Registry, vars map[string]any) *Engine {
6893
if vars == nil {
6994
vars = make(map[string]any)
7095
}
71-
return &Engine{Registry: reg, Vars: vars}
96+
return &Engine{
97+
Registry: reg,
98+
Vars: vars,
99+
status: Status{State: StateIdle, Outputs: make(map[string]any)},
100+
}
101+
}
102+
103+
// Status returns a snapshot of the current workflow execution state.
104+
func (e *Engine) Status() Status {
105+
e.mu.Lock()
106+
defer e.mu.Unlock()
107+
// Return a copy of outputs so the caller can't mutate engine state.
108+
out := make(map[string]any, len(e.status.Outputs))
109+
for k, v := range e.status.Outputs {
110+
out[k] = v
111+
}
112+
s := e.status
113+
s.Outputs = out
114+
return s
72115
}
73116

74117
// Run executes every step in the workflow sequentially.
@@ -77,41 +120,73 @@ func NewEngine(reg *Registry, vars map[string]any) *Engine {
77120
func (e *Engine) Run(wf *WorkflowConfig) error {
78121
log.Printf("workflow: starting %q (%d steps)", wf.Name, len(wf.Steps))
79122

123+
e.mu.Lock()
124+
e.status = Status{State: StateRunning, TotalSteps: len(wf.Steps), Outputs: make(map[string]any)}
125+
e.mu.Unlock()
126+
80127
for i, step := range wf.Steps {
81128
log.Printf("workflow: [%d/%d] %s (action=%s)", i+1, len(wf.Steps), step.Name, step.Action)
82129

130+
e.mu.Lock()
131+
e.status.CurrentStep = i + 1
132+
e.status.StepName = step.Name
133+
e.mu.Unlock()
134+
83135
fn := e.Registry.Get(step.Action)
84136
if fn == nil {
85-
return fmt.Errorf("workflow step %d: unknown action %q", i+1, step.Action)
137+
err := fmt.Errorf("workflow step %d: unknown action %q", i+1, step.Action)
138+
e.mu.Lock()
139+
e.status.State = StateFailed
140+
e.status.Error = err.Error()
141+
e.mu.Unlock()
142+
return err
86143
}
87144

88145
// Resolve {{.var}} templates in string params.
89146
resolved, err := e.resolveParams(step.Params)
90147
if err != nil {
91-
return fmt.Errorf("workflow step %d (%s): resolving params: %w", i+1, step.Name, err)
148+
err = fmt.Errorf("workflow step %d (%s): resolving params: %w", i+1, step.Name, err)
149+
e.mu.Lock()
150+
e.status.State = StateFailed
151+
e.status.Error = err.Error()
152+
e.mu.Unlock()
153+
return err
92154
}
93155

94156
result, err := fn(resolved)
95157
if err != nil {
96-
return fmt.Errorf("workflow step %d (%s): %w", i+1, step.Name, err)
158+
err = fmt.Errorf("workflow step %d (%s): %w", i+1, step.Name, err)
159+
e.mu.Lock()
160+
e.status.State = StateFailed
161+
e.status.Error = err.Error()
162+
e.mu.Unlock()
163+
return err
97164
}
98165

99166
// Map action outputs to workflow variables.
100167
if result != nil && step.Outputs != nil {
168+
e.mu.Lock()
101169
for field, varName := range step.Outputs {
102170
val, ok := (*result)[field]
103171
if !ok {
104172
log.Printf("workflow: warning: step %d output field %q not found in result", i+1, field)
105173
continue
106174
}
107175
e.Vars[varName] = val
176+
e.status.Outputs[varName] = val
108177
log.Printf("workflow: captured %s = %v", varName, val)
109178
}
179+
e.mu.Unlock()
110180
}
111181

112182
log.Printf("workflow: [%d/%d] %s completed", i+1, len(wf.Steps), step.Name)
113183
}
114184

185+
e.mu.Lock()
186+
e.status.State = StateComplete
187+
e.status.StepName = ""
188+
e.mu.Unlock()
189+
115190
log.Printf("workflow: %q finished successfully", wf.Name)
116191
return nil
117192
}

main.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"flag"
67
"fmt"
78
"log"
@@ -114,6 +115,27 @@ func main() {
114115
api.HandleFunc("/tunnels/frp", tunnel.CreateFrpTunnelProxy).Methods("POST")
115116
api.HandleFunc("/tunnels/frp/{id}", tunnel.TerminateFrpTunnel).Methods("DELETE")
116117

118+
// Health and workflow status
119+
api.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
120+
w.Header().Set("Content-Type", "application/json")
121+
w.WriteHeader(http.StatusOK)
122+
fmt.Fprintf(w, `{"status":"ok"}`)
123+
}).Methods("GET")
124+
125+
// Workflow status — set up after engine creation below.
126+
var workflowEngine *workflow.Engine
127+
api.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
128+
if workflowEngine == nil {
129+
w.Header().Set("Content-Type", "application/json")
130+
w.WriteHeader(http.StatusOK)
131+
fmt.Fprintf(w, `{"state":"idle","currentStep":0,"totalSteps":0,"outputs":{}}`)
132+
return
133+
}
134+
w.Header().Set("Content-Type", "application/json")
135+
w.WriteHeader(http.StatusOK)
136+
json.NewEncoder(w).Encode(workflowEngine.Status())
137+
}).Methods("GET")
138+
117139
// Use the configured server host and port from CLI flags.
118140
// Port 0 means "let the OS pick a free port".
119141
serverPort := *serverPortFlag
@@ -153,14 +175,14 @@ func main() {
153175
if err != nil {
154176
log.Fatalf("workflow: %v", err)
155177
}
156-
engine := workflow.NewEngine(workflow.DefaultRegistry(), map[string]any{
178+
workflowEngine = workflow.NewEngine(workflow.DefaultRegistry(), map[string]any{
157179
"Timestamp": time.Now().Unix(),
158180
"ServerPort": serverPort,
159181
"ServerHost": serverHost,
160182
"TunnelAuthToken": *tunnelAuthToken,
161183
})
162184
go func() {
163-
if err := engine.Run(wf); err != nil {
185+
if err := workflowEngine.Run(wf); err != nil {
164186
log.Printf("workflow: %v", err)
165187
}
166188
}()

0 commit comments

Comments
 (0)