Skip to content

Commit 329bc74

Browse files
committed
Implement workflow engine with YAML support and SSH session management
1 parent 09886e9 commit 329bc74

File tree

9 files changed

+437
-4
lines changed

9 files changed

+437
-4
lines changed

examples/cs-bridge-workflow.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
name: "cs-bridge-hpc-setup"
2+
3+
steps:
4+
- action: "vscode.create_session"
5+
name: "Start SSH server"
6+
params:
7+
password: "test"
8+
outputs:
9+
bind_port: "ssh_port"
10+
11+
- action: "tunnel.devtunnel_create"
12+
name: "Create devtunnel"
13+
params:
14+
tunnel_name: "linkspan-tunnel"
15+
expiration: "1d"
16+
ports:
17+
- "{{.ssh_port}}"
18+
19+
- action: "tunnel.devtunnel_host"
20+
name: "Host devtunnel"
21+
params:
22+
tunnel_name: "linkspan-tunnel"
23+
create_token: true
24+
25+
- action: "shell.exec"
26+
name: "Clone repo"
27+
params:
28+
command: "git clone https://github.com/org/repo.git /workspace"

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/fatedier/frp v0.67.0
99
github.com/gliderlabs/ssh v0.3.8
1010
github.com/gorilla/mux v1.8.1
11+
gopkg.in/yaml.v3 v3.0.1
1112
)
1213

1314
require (

internal/workflow/actions.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package workflow
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"os/exec"
7+
"strings"
8+
9+
tunnel "github.com/cyber-shuttle/linkspan/subsystems/tunnel"
10+
vscode "github.com/cyber-shuttle/linkspan/subsystems/vscode"
11+
utils "github.com/cyber-shuttle/linkspan/utils"
12+
)
13+
14+
// registerBuiltinActions populates a Registry with all built-in action wrappers.
15+
func registerBuiltinActions(r *Registry) {
16+
r.Register("vscode.create_session", actionVSCodeCreateSession)
17+
r.Register("tunnel.devtunnel_create", actionDevTunnelCreate)
18+
r.Register("tunnel.devtunnel_host", actionDevTunnelHost)
19+
r.Register("tunnel.devtunnel_delete", actionDevTunnelDelete)
20+
r.Register("tunnel.frp_proxy_create", actionFrpProxyCreate)
21+
r.Register("shell.exec", actionShellExec)
22+
}
23+
24+
// --- vscode.create_session ---
25+
26+
func actionVSCodeCreateSession(params map[string]any) (*ActionResult, error) {
27+
password, _ := params["password"].(string)
28+
29+
port, err := utils.GetAvailablePort()
30+
if err != nil {
31+
return nil, fmt.Errorf("get available port: %w", err)
32+
}
33+
34+
sessionID := fmt.Sprintf("wf-%d", port)
35+
addr := fmt.Sprintf(":%d", port)
36+
37+
vscode.StartSSHServerForVSCodeConnection(sessionID, addr, password)
38+
39+
result := ActionResult{
40+
"session_id": sessionID,
41+
"bind_port": port,
42+
}
43+
return &result, nil
44+
}
45+
46+
// --- tunnel.devtunnel_create ---
47+
48+
func actionDevTunnelCreate(params map[string]any) (*ActionResult, error) {
49+
tunnelName, _ := params["tunnel_name"].(string)
50+
expiration, _ := params["expiration"].(string)
51+
if expiration == "" {
52+
expiration = "1d"
53+
}
54+
55+
ports := toIntSlice(params["ports"])
56+
57+
info, err := tunnel.DevTunnelCreate(tunnelName, expiration, ports)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
result := ActionResult{
63+
"tunnel_id": info.TunnelID,
64+
"tunnel_name": info.TunnelName,
65+
}
66+
return &result, nil
67+
}
68+
69+
// --- tunnel.devtunnel_host ---
70+
71+
func actionDevTunnelHost(params map[string]any) (*ActionResult, error) {
72+
tunnelName, _ := params["tunnel_name"].(string)
73+
createToken, _ := params["create_token"].(bool)
74+
75+
cmdID, conn, err := tunnel.DevTunnelHost(tunnelName, createToken)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
result := ActionResult{
81+
"command_id": cmdID,
82+
"connection_url": conn.ConnectionURL,
83+
"token": conn.Token,
84+
}
85+
return &result, nil
86+
}
87+
88+
// --- tunnel.devtunnel_delete ---
89+
90+
func actionDevTunnelDelete(params map[string]any) (*ActionResult, error) {
91+
tunnelName, _ := params["tunnel_name"].(string)
92+
if err := tunnel.DevTunnelDelete(tunnelName); err != nil {
93+
return nil, err
94+
}
95+
return &ActionResult{}, nil
96+
}
97+
98+
// --- tunnel.frp_proxy_create ---
99+
100+
func actionFrpProxyCreate(params map[string]any) (*ActionResult, error) {
101+
tunnelName, _ := params["tunnel_name"].(string)
102+
port := toInt(params["port"])
103+
tunnelType, _ := params["tunnel_type"].(string)
104+
tunnelSecret, _ := params["tunnel_secret"].(string)
105+
discoveryHost, _ := params["discovery_host"].(string)
106+
discoveryPort := toInt(params["discovery_port"])
107+
discoveryToken, _ := params["discovery_token"].(string)
108+
109+
info, err := tunnel.FrpTunnelProxyCreate(
110+
tunnelName, port, tunnelType, tunnelSecret,
111+
discoveryHost, discoveryPort, discoveryToken,
112+
)
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
result := ActionResult{
118+
"tunnel_name": info.TunnelName,
119+
"tunnel_type": info.TunnelType,
120+
}
121+
return &result, nil
122+
}
123+
124+
// --- shell.exec ---
125+
126+
func actionShellExec(params map[string]any) (*ActionResult, error) {
127+
command, _ := params["command"].(string)
128+
if command == "" {
129+
return nil, fmt.Errorf("shell.exec: command is required")
130+
}
131+
132+
parts := strings.Fields(command)
133+
cmd := exec.Command(parts[0], parts[1:]...)
134+
135+
output, err := cmd.CombinedOutput()
136+
log.Printf("shell.exec: %s\n%s", command, string(output))
137+
if err != nil {
138+
return nil, fmt.Errorf("shell.exec %q: %w\n%s", command, err, string(output))
139+
}
140+
141+
result := ActionResult{
142+
"output": strings.TrimSpace(string(output)),
143+
}
144+
return &result, nil
145+
}
146+
147+
// --- helpers ---
148+
149+
// toIntSlice converts a param value to []int, handling YAML-decoded []any.
150+
func toIntSlice(v any) []int {
151+
switch val := v.(type) {
152+
case []any:
153+
out := make([]int, 0, len(val))
154+
for _, elem := range val {
155+
out = append(out, toInt(elem))
156+
}
157+
return out
158+
case []int:
159+
return val
160+
default:
161+
return nil
162+
}
163+
}
164+
165+
// toInt converts a param value to int, handling YAML's default float64/int types.
166+
func toInt(v any) int {
167+
switch val := v.(type) {
168+
case int:
169+
return val
170+
case float64:
171+
return int(val)
172+
case string:
173+
// Support template-resolved numeric strings.
174+
var n int
175+
fmt.Sscanf(val, "%d", &n)
176+
return n
177+
default:
178+
return 0
179+
}
180+
}

internal/workflow/registry.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package workflow
2+
3+
import "fmt"
4+
5+
// Registry maps action names (e.g. "vscode.create_session") to handler functions.
6+
type Registry struct {
7+
actions map[string]ActionFunc
8+
}
9+
10+
// NewRegistry creates an empty Registry.
11+
func NewRegistry() *Registry {
12+
return &Registry{actions: make(map[string]ActionFunc)}
13+
}
14+
15+
// Register adds an action handler. Panics on duplicate names to catch
16+
// programming errors early.
17+
func (r *Registry) Register(name string, fn ActionFunc) {
18+
if _, exists := r.actions[name]; exists {
19+
panic(fmt.Sprintf("workflow: duplicate action registered: %s", name))
20+
}
21+
r.actions[name] = fn
22+
}
23+
24+
// Get returns the handler for the given action name, or nil if not found.
25+
func (r *Registry) Get(name string) ActionFunc {
26+
return r.actions[name]
27+
}
28+
29+
// DefaultRegistry returns a Registry pre-loaded with all built-in actions.
30+
func DefaultRegistry() *Registry {
31+
r := NewRegistry()
32+
registerBuiltinActions(r)
33+
return r
34+
}

0 commit comments

Comments
 (0)