Skip to content

Commit a178ef5

Browse files
yasithdevclaude
andcommitted
Add TCP log stream broadcaster for real-time log streaming
- New internal/logstream package: captures Go log output and broadcasts to connected TCP clients via a listener on a random port - DevTunnelCreate now accepts extra ports (log port) for forwarding - Workflow engine exposes LogPort template variable - tunnel.devtunnel_create action passes log_port param and captures it Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7eaf209 commit a178ef5

File tree

4 files changed

+129
-2
lines changed

4 files changed

+129
-2
lines changed

internal/logstream/logstream.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package logstream
2+
3+
import (
4+
"io"
5+
"log"
6+
"net"
7+
"sync"
8+
)
9+
10+
// Broadcaster captures log output and fans it out to connected TCP clients.
11+
// It implements io.Writer so it can be installed via log.SetOutput.
12+
type Broadcaster struct {
13+
mu sync.RWMutex
14+
clients map[net.Conn]struct{}
15+
dest io.Writer // original destination (os.Stderr)
16+
}
17+
18+
// New creates a Broadcaster that tees to dest and any connected TCP clients.
19+
func New(dest io.Writer) *Broadcaster {
20+
return &Broadcaster{
21+
clients: make(map[net.Conn]struct{}),
22+
dest: dest,
23+
}
24+
}
25+
26+
// Write implements io.Writer. Every log line is written to the original
27+
// destination AND broadcast to all connected clients.
28+
func (b *Broadcaster) Write(p []byte) (int, error) {
29+
n, err := b.dest.Write(p)
30+
31+
b.mu.RLock()
32+
for c := range b.clients {
33+
_, writeErr := c.Write(p)
34+
if writeErr != nil {
35+
// Mark for removal but don't modify map during iteration
36+
go b.remove(c)
37+
}
38+
}
39+
b.mu.RUnlock()
40+
41+
return n, err
42+
}
43+
44+
func (b *Broadcaster) remove(c net.Conn) {
45+
b.mu.Lock()
46+
delete(b.clients, c)
47+
b.mu.Unlock()
48+
c.Close()
49+
}
50+
51+
// ListenAndServe starts a TCP listener on addr and accepts clients that
52+
// receive the log stream. Returns the listener so the caller can retrieve
53+
// the bound port.
54+
func (b *Broadcaster) ListenAndServe(addr string) (net.Listener, error) {
55+
ln, err := net.Listen("tcp", addr)
56+
if err != nil {
57+
return nil, err
58+
}
59+
60+
go func() {
61+
for {
62+
conn, err := ln.Accept()
63+
if err != nil {
64+
return // listener closed
65+
}
66+
b.mu.Lock()
67+
b.clients[conn] = struct{}{}
68+
b.mu.Unlock()
69+
log.Printf("[logstream] client connected from %s", conn.RemoteAddr())
70+
71+
// Drain reads so we detect client disconnect
72+
go func(c net.Conn) {
73+
buf := make([]byte, 256)
74+
for {
75+
if _, err := c.Read(buf); err != nil {
76+
b.remove(c)
77+
return
78+
}
79+
}
80+
}(conn)
81+
}
82+
}()
83+
84+
return ln, nil
85+
}
86+
87+
// Install redirects the standard logger to this broadcaster.
88+
func (b *Broadcaster) Install() {
89+
log.SetOutput(b)
90+
}
91+
92+
// Close disconnects all clients.
93+
func (b *Broadcaster) Close() {
94+
b.mu.Lock()
95+
for c := range b.clients {
96+
c.Close()
97+
delete(b.clients, c)
98+
}
99+
b.mu.Unlock()
100+
}

internal/workflow/actions.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ func actionDevTunnelCreate(params map[string]any) (*ActionResult, error) {
3939
}
4040
serverPort := toInt(params["server_port"])
4141
sshPort := toInt(params["ssh_port"])
42+
logPort := toInt(params["log_port"])
4243

43-
conn, err := tunnel.DevTunnelCreate(tunnelName, expiration, authToken, serverPort, sshPort)
44+
conn, err := tunnel.DevTunnelCreate(tunnelName, expiration, authToken, serverPort, sshPort, logPort)
4445
if err != nil {
4546
return nil, err
4647
}
@@ -51,6 +52,7 @@ func actionDevTunnelCreate(params map[string]any) (*ActionResult, error) {
5152
"connection_url": conn.ConnectionURL,
5253
"token": conn.Token,
5354
"ssh_port": sshPort,
55+
"log_port": logPort,
5456
}
5557
return &result, nil
5658
}

main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"syscall"
1717
"time"
1818

19+
"github.com/cyber-shuttle/linkspan/internal/logstream"
1920
pm "github.com/cyber-shuttle/linkspan/internal/process"
2021
"github.com/cyber-shuttle/linkspan/internal/workflow"
2122
jupyter "github.com/cyber-shuttle/linkspan/subsystems/jupyter"
@@ -40,6 +41,11 @@ var (
4041

4142
func main() {
4243

44+
// Install log broadcaster so connected clients receive log output in
45+
// real time. Must happen before any log.* calls.
46+
logBroadcaster := logstream.New(os.Stderr)
47+
logBroadcaster.Install()
48+
4349
// parse CLI flags
4450
tunnelAPI := flag.String("tunnel-api", "devtunnels", "tunnel API provider name (e.g. devtunnels)")
4551
tunnelEnable := flag.Bool("tunnel-enable", false, "enable tunnel startup")
@@ -221,6 +227,14 @@ func main() {
221227
vscode.StartSSHServerForVSCodeConnection(sshSessionID, sshAddr)
222228
log.Printf("SSH server listening on %s", sshAddr)
223229

230+
// Start log stream TCP listener so clients can connect for real-time logs.
231+
logListener, err := logBroadcaster.ListenAndServe("0.0.0.0:0")
232+
if err != nil {
233+
log.Fatalf("failed to start log stream listener: %v", err)
234+
}
235+
logPort := logListener.Addr().(*net.TCPAddr).Port
236+
log.Printf("log stream listening on 0.0.0.0:%d", logPort)
237+
224238
// Run workflow if specified. Use "-" to read from stdin.
225239
if *workflowFile != "" {
226240
var wf *workflow.WorkflowConfig
@@ -237,6 +251,7 @@ func main() {
237251
"Timestamp": time.Now().Unix(),
238252
"ServerPort": serverPort,
239253
"SshPort": sshPort,
254+
"LogPort": logPort,
240255
"ServerHost": serverHost,
241256
"TunnelAuthToken": *tunnelAuthToken,
242257
"LocalTunnelID": os.Getenv("CS_LOCAL_TUNNEL_ID"),

subsystems/tunnel/devtunnel.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
// DevTunnelCreate creates a tunnel, starts hosting the relay, and forwards the
1212
// given serverPort so the client can communicate with linkspan immediately.
1313
// Additional ports (e.g. SSH) can be added later via DevTunnelForward.
14-
func DevTunnelCreate(tunnelName string, expiration string, authToken string, serverPort int, sshPort int) (DevTunnelConnection, error) {
14+
func DevTunnelCreate(tunnelName string, expiration string, authToken string, serverPort int, sshPort int, extraPorts ...int) (DevTunnelConnection, error) {
1515
if err := InitSDK(authToken); err != nil {
1616
return DevTunnelConnection{}, fmt.Errorf("devtunnel create: init SDK: %w", err)
1717
}
@@ -51,6 +51,16 @@ func DevTunnelCreate(tunnelName string, expiration string, authToken string, ser
5151
info.Ports = append(info.Ports, sshPort)
5252
}
5353

54+
// 2c. Register any extra ports (e.g. log stream).
55+
for _, p := range extraPorts {
56+
if p > 0 {
57+
if err := SDKAddPort(ctx, tunnelName, p); err != nil {
58+
return DevTunnelConnection{}, fmt.Errorf("devtunnel create: add extra port %d to %q: %w", p, tunnelName, err)
59+
}
60+
info.Ports = append(info.Ports, p)
61+
}
62+
}
63+
5464
// 3. Obtain host token and start the relay.
5565
// Ports are already registered via SDK above — the CLI doesn't need -p flags
5666
// (which would require manage scope the host token doesn't have).

0 commit comments

Comments
 (0)