Skip to content

Commit 0d7816a

Browse files
yasithdevclaude
andcommitted
Separate devtunnel lifecycle into create/host/forward phases
Tunnel creation no longer registers ports — hosting starts the relay without forwarding, and ports are added individually via DevTunnelForward which restarts the host CLI with -p flags. This fixes the SSH banner timeout caused by the missing -p flag on `devtunnel host`. Other changes: - Remove dead code: conda stubs, FRP visitor stub, kernel conda helper - Implement CloseDevTunnel API (was a no-op stub) - Fix KillAll zombie leak by waiting on done channel after Kill - Add SDKAddPort for registering ports on existing tunnels - Reorder workflow: create → host → create_session → forward Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent edeb476 commit 0d7816a

File tree

16 files changed

+272
-106
lines changed

16 files changed

+272
-106
lines changed

examples/cs-bridge-workflow.yaml

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
name: "cs-bridge-hpc-setup"
22

33
steps:
4-
- action: "vscode.create_session"
5-
name: "Start SSH server"
6-
outputs:
7-
bind_port: "ssh_port"
8-
94
- action: "tunnel.devtunnel_create"
105
name: "Create devtunnel"
116
params:
127
tunnel_name: "linkspan-tunnel"
138
expiration: "1d"
149
auth_token: "{{.TunnelAuthToken}}"
15-
ports:
16-
- "{{.ssh_port}}"
10+
outputs:
11+
tunnel_id: "tunnel_id"
1712

1813
- action: "tunnel.devtunnel_host"
1914
name: "Host devtunnel"
2015
params:
2116
tunnel_name: "linkspan-tunnel"
2217
auth_token: "{{.TunnelAuthToken}}"
18+
outputs:
19+
connection_url: "tunnel_url"
20+
token: "tunnel_token"
2321

24-
- action: "shell.exec"
25-
name: "Clone repo"
22+
- action: "vscode.create_session"
23+
name: "Start SSH server"
24+
outputs:
25+
bind_port: "ssh_port"
26+
27+
- action: "tunnel.devtunnel_forward"
28+
name: "Forward SSH port"
2629
params:
27-
command: "git clone https://github.com/org/repo.git /workspace"
30+
tunnel_name: "linkspan-tunnel"
31+
auth_token: "{{.TunnelAuthToken}}"
32+
port: "{{.ssh_port}}"

internal/process/process_manager.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,33 @@ func (pm *ProcessManager) Wait(id string) error {
176176

177177

178178

179-
// KillAll forcefully kills all managed processes.
179+
// KillAll forcefully kills all managed processes, waiting up to 2 seconds for
180+
// each to exit so the OS can reap them (avoiding zombies).
180181
func (pm *ProcessManager) KillAll() {
181182
pm.mu.Lock()
182-
defer pm.mu.Unlock()
183+
// Snapshot the map so we can release the lock before waiting.
184+
snapshot := make(map[string]*ManagedProcess, len(pm.procs))
183185
for id, mp := range pm.procs {
186+
snapshot[id] = mp
187+
}
188+
pm.mu.Unlock()
189+
190+
for id, mp := range snapshot {
184191
if mp.Cmd.Process != nil {
185192
_ = mp.Cmd.Process.Kill()
193+
194+
// Wait for the process to be reaped via the done channel (which
195+
// is closed by the background goroutine in Start after cmd.Wait
196+
// completes). Use a short timeout so KillAll doesn't hang if a
197+
// process refuses to exit.
198+
select {
199+
case <-mp.done:
200+
case <-time.After(2 * time.Second):
201+
}
186202
}
203+
204+
pm.mu.Lock()
187205
delete(pm.procs, id)
206+
pm.mu.Unlock()
188207
}
189208
}

internal/workflow/actions.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func registerBuiltinActions(r *Registry) {
1616
r.Register("vscode.create_session", actionVSCodeCreateSession)
1717
r.Register("tunnel.devtunnel_create", actionDevTunnelCreate)
1818
r.Register("tunnel.devtunnel_host", actionDevTunnelHost)
19+
r.Register("tunnel.devtunnel_forward", actionDevTunnelForward)
1920
r.Register("tunnel.devtunnel_delete", actionDevTunnelDelete)
2021
r.Register("tunnel.devtunnel_connect", actionDevTunnelConnect)
2122
r.Register("tunnel.frp_proxy_create", actionFrpProxyCreate)
@@ -55,9 +56,7 @@ func actionDevTunnelCreate(params map[string]any) (*ActionResult, error) {
5556
return nil, fmt.Errorf("tunnel.devtunnel_create: auth_token is required")
5657
}
5758

58-
ports := toIntSlice(params["ports"])
59-
60-
info, err := tunnel.DevTunnelCreate(tunnelName, expiration, ports, authToken)
59+
info, err := tunnel.DevTunnelCreate(tunnelName, expiration, authToken)
6160
if err != nil {
6261
return nil, err
6362
}
@@ -91,6 +90,29 @@ func actionDevTunnelHost(params map[string]any) (*ActionResult, error) {
9190
return &result, nil
9291
}
9392

93+
// --- tunnel.devtunnel_forward ---
94+
95+
func actionDevTunnelForward(params map[string]any) (*ActionResult, error) {
96+
tunnelName, _ := params["tunnel_name"].(string)
97+
if tunnelName == "" {
98+
return nil, fmt.Errorf("tunnel.devtunnel_forward: tunnel_name is required")
99+
}
100+
authToken, _ := params["auth_token"].(string)
101+
if authToken == "" {
102+
return nil, fmt.Errorf("tunnel.devtunnel_forward: auth_token is required")
103+
}
104+
port := toInt(params["port"])
105+
if port == 0 {
106+
return nil, fmt.Errorf("tunnel.devtunnel_forward: port is required")
107+
}
108+
109+
if err := tunnel.DevTunnelForward(tunnelName, port, authToken); err != nil {
110+
return nil, err
111+
}
112+
113+
return &ActionResult{"port": port}, nil
114+
}
115+
94116
// --- tunnel.devtunnel_delete ---
95117

96118
func actionDevTunnelDelete(params map[string]any) (*ActionResult, error) {

main.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,25 @@ func main() {
178178
go func() {
179179
tunnelName := fmt.Sprintf("aget-tunnel-%d", time.Now().UnixNano())
180180

181+
// cleanupAttempt kills any host CLI process and removes the tunnel
182+
// from the manager so a timed-out or failed attempt doesn't leak.
183+
cleanupAttempt := func() {
184+
info, err := tunnel.GlobalDevTunnelManager.Find(tunnelName)
185+
if err != nil {
186+
return // not registered yet, nothing to clean up
187+
}
188+
if info.HostCmdID != "" {
189+
_ = pm.GlobalProcessManager.Kill(info.HostCmdID)
190+
}
191+
tunnel.GlobalDevTunnelManager.Remove(tunnelName)
192+
}
193+
181194
for attempt := 1; attempt <= *tunnelRetries; attempt++ {
182195
log.Printf("devtunnel: attempt %d/%d to create tunnel %s", attempt, *tunnelRetries, tunnelName)
183196

184197
ch := make(chan error, 1)
185198
go func() {
186-
_, err := tunnel.DevTunnelCreate(tunnelName, "1d", []int{serverPort}, authToken)
199+
_, err := tunnel.DevTunnelCreate(tunnelName, "1d", authToken)
187200
if err != nil {
188201
ch <- err
189202
return
@@ -193,6 +206,10 @@ func main() {
193206
ch <- err
194207
return
195208
}
209+
if err := tunnel.DevTunnelForward(tunnelName, serverPort, authToken); err != nil {
210+
ch <- err
211+
return
212+
}
196213

197214
log.Printf("Connect to agent using the URL: %s", tunnelConnection.ConnectionURL)
198215
log.Printf("DevTunnel ID: %s", tunnelConnection.DevTunnelInfo.TunnelID)
@@ -209,10 +226,12 @@ func main() {
209226
return
210227
}
211228
log.Printf("devtunnel: attempt %d failed: %v", attempt, err)
229+
cleanupAttempt()
212230
case <-attemptCtx.Done():
213231
log.Printf("devtunnel: attempt %d timed out after %s", attempt, tunnelAttemptTimeout.String())
232+
cancel()
233+
cleanupAttempt()
214234
}
215-
cancel()
216235

217236
if attempt < *tunnelRetries {
218237
time.Sleep(*tunnelRetryDelay)

subsystems/env/conda/conda.go

Lines changed: 0 additions & 6 deletions
This file was deleted.

subsystems/env/conda/conda_test.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

subsystems/jupyter/api.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ func ListKernels(w http.ResponseWriter, r *http.Request) {
7171
func ProvisionKernel(w http.ResponseWriter, r *http.Request) {
7272
// placeholder: parse request body to create kernel
7373
provisionReq := KernelProvisionRequest{}
74-
_ = json.NewDecoder(r.Body).Decode(&provisionReq)
74+
if err := json.NewDecoder(r.Body).Decode(&provisionReq); err != nil {
75+
utils.RespondJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON: " + err.Error()})
76+
return
77+
}
7578
_ = r.Body.Close()
7679

7780
if provisionReq.KernelName == "" {
@@ -114,7 +117,10 @@ func ProvisionKernel(w http.ResponseWriter, r *http.Request) {
114117
func ShutdownKernel(w http.ResponseWriter, r *http.Request) {
115118
// placeholder: shutdown by id
116119
shutdownReq := KernelShutdownRequest{}
117-
_ = json.NewDecoder(r.Body).Decode(&shutdownReq)
120+
if err := json.NewDecoder(r.Body).Decode(&shutdownReq); err != nil {
121+
utils.RespondJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON: " + err.Error()})
122+
return
123+
}
118124
_ = r.Body.Close()
119125

120126
err := stopKernel(shutdownReq.KernelID)
@@ -133,7 +139,10 @@ func ShutdownKernel(w http.ResponseWriter, r *http.Request) {
133139
func DeleteKernel(w http.ResponseWriter, r *http.Request) {
134140
// placeholder: delete by id
135141
deleteReq := KernelShutdownRequest{}
136-
_ = json.NewDecoder(r.Body).Decode(&deleteReq)
142+
if err := json.NewDecoder(r.Body).Decode(&deleteReq); err != nil {
143+
utils.RespondJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON: " + err.Error()})
144+
return
145+
}
137146
_ = r.Body.Close()
138147

139148
err := stopKernel(deleteReq.KernelID)

subsystems/jupyter/kernel.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,6 @@ func getKernelConnectionFile(kernelInternalID string) (string, error) {
8888
return "", fmt.Errorf("connection file not found in kernel output: %v", err)
8989
}
9090

91-
func startKernelWithCondaEnv(kernelName string) (string, int, error) {
92-
return "", -1, fmt.Errorf("Not implemented")
93-
}
94-
9591
func getKernelStatus(kernelInternalID string) (string, error) {
9692
// placeholder: get the Jupyter kernel process status
9793
info, err := pm.GlobalProcessManager.GetInfo(kernelInternalID)

subsystems/tunnel/api.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"net/http"
66

7+
pm "github.com/cyber-shuttle/linkspan/internal/process"
78
utils "github.com/cyber-shuttle/linkspan/utils"
89
"github.com/gorilla/mux"
910
)
@@ -20,7 +21,6 @@ type Tunnel struct {
2021
type DevTunnelCreateRequest struct {
2122
TunnelName string `json:"tunnelName"`
2223
Expiration string `json:"expiration"`
23-
Ports []int `json:"ports"`
2424
// AuthToken is the Microsoft Entra ID (Azure AD) bearer token used to
2525
// authenticate against the Dev Tunnels service. It is required for all
2626
// devtunnel operations.
@@ -74,15 +74,18 @@ func ListDevTunnels(w http.ResponseWriter, r *http.Request) {
7474
// It creates the tunnel via the SDK and immediately starts hosting it via the CLI.
7575
func CreateDevTunnel(w http.ResponseWriter, r *http.Request) {
7676
var req DevTunnelCreateRequest
77-
_ = json.NewDecoder(r.Body).Decode(&req)
77+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
78+
utils.RespondJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON: " + err.Error()})
79+
return
80+
}
7881
_ = r.Body.Close()
7982

8083
if req.AuthToken == "" {
8184
utils.RespondJSON(w, http.StatusBadRequest, map[string]string{"error": "authToken is required"})
8285
return
8386
}
8487

85-
info, err := DevTunnelCreate(req.TunnelName, req.Expiration, req.Ports, req.AuthToken)
88+
info, err := DevTunnelCreate(req.TunnelName, req.Expiration, req.AuthToken)
8689
if err != nil {
8790
utils.RespondJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
8891
return
@@ -101,15 +104,46 @@ func CreateDevTunnel(w http.ResponseWriter, r *http.Request) {
101104
})
102105
}
103106

104-
// CloseDevTunnel handles DELETE /tunnels/devtunnel/{id} (stub).
107+
// CloseDevTunnel handles DELETE /tunnels/devtunnel/{id}.
108+
// It kills the host CLI process, deletes the tunnel via the SDK, and removes
109+
// it from the in-memory manager.
105110
func CloseDevTunnel(w http.ResponseWriter, r *http.Request) {
111+
vars := mux.Vars(r)
112+
tunnelName := vars["id"]
113+
if tunnelName == "" {
114+
utils.RespondJSON(w, http.StatusBadRequest, map[string]string{"error": "tunnel name required"})
115+
return
116+
}
117+
118+
info, err := GlobalDevTunnelManager.Find(tunnelName)
119+
if err != nil {
120+
utils.RespondJSON(w, http.StatusNotFound, map[string]string{"error": err.Error()})
121+
return
122+
}
123+
124+
// Kill the host CLI process if one is running.
125+
if info.HostCmdID != "" {
126+
_ = pm.GlobalProcessManager.Kill(info.HostCmdID)
127+
}
128+
129+
// Delete the tunnel on the service.
130+
if err := DevTunnelDelete(tunnelName, info.AuthToken); err != nil {
131+
utils.RespondJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
132+
return
133+
}
134+
135+
GlobalDevTunnelManager.Remove(tunnelName)
136+
106137
utils.RespondJSON(w, http.StatusNoContent, nil)
107138
}
108139

109140
// CreateFrpTunnelProxy handles POST /tunnels/frp.
110141
func CreateFrpTunnelProxy(w http.ResponseWriter, r *http.Request) {
111142
var req FrpTunnelProxyCreateRequest
112-
_ = json.NewDecoder(r.Body).Decode(&req)
143+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
144+
utils.RespondJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid JSON: " + err.Error()})
145+
return
146+
}
113147
_ = r.Body.Close()
114148

115149
info, err := FrpTunnelProxyCreate(
@@ -136,11 +170,6 @@ func ListFrpTunnels(w http.ResponseWriter, r *http.Request) {
136170
utils.RespondJSON(w, http.StatusOK, FrpTunnelListResponse{FrpTunnelInfos: ts})
137171
}
138172

139-
// GetFrpTunnelStatus handles GET /tunnels/frp/{id}/status (stub).
140-
func GetFrpTunnelStatus(w http.ResponseWriter, r *http.Request) {
141-
utils.RespondJSON(w, http.StatusOK, map[string]string{"status": "unknown"})
142-
}
143-
144173
// TerminateFrpTunnel handles DELETE /tunnels/frp/{id}.
145174
func TerminateFrpTunnel(w http.ResponseWriter, r *http.Request) {
146175
vars := mux.Vars(r)

0 commit comments

Comments
 (0)