Skip to content

Commit c6f25b6

Browse files
Split workspace network server into separate services
1 parent b7ae0b2 commit c6f25b6

File tree

13 files changed

+781
-507
lines changed

13 files changed

+781
-507
lines changed

cmd/agent/git_credentials.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/loft-sh/devpod/cmd/flags"
1616
workspaced "github.com/loft-sh/devpod/pkg/daemon/workspace"
17+
"github.com/loft-sh/devpod/pkg/daemon/workspace/network"
1718
"github.com/loft-sh/devpod/pkg/gitcredentials"
1819
devpodhttp "github.com/loft-sh/devpod/pkg/http"
1920
"github.com/loft-sh/log"
@@ -78,15 +79,15 @@ func (cmd *GitCredentialsCmd) Run(ctx context.Context, args []string, log log.Lo
7879
}
7980

8081
func getCredentialsFromWorkspaceServer(credentials *gitcredentials.GitCredentials) *gitcredentials.GitCredentials {
81-
if _, err := os.Stat(filepath.Join(workspaced.RootDir, workspaced.RunnerProxySocket)); err != nil {
82+
if _, err := os.Stat(filepath.Join(workspaced.RootDir, network.RunnerProxySocket)); err != nil {
8283
// workspace server is not running
8384
return nil
8485
}
8586

8687
httpClient := &http.Client{
8788
Transport: &http.Transport{
8889
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
89-
return net.Dial("unix", filepath.Join(workspaced.RootDir, workspaced.RunnerProxySocket))
90+
return net.Dial("unix", filepath.Join(workspaced.RootDir, network.RunnerProxySocket))
9091
},
9192
},
9293
}

pkg/credentials/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/loft-sh/devpod/pkg/agent/tunnel"
1717
locald "github.com/loft-sh/devpod/pkg/daemon/platform"
1818
workspaced "github.com/loft-sh/devpod/pkg/daemon/workspace"
19+
network "github.com/loft-sh/devpod/pkg/daemon/workspace/network"
1920
devpodlog "github.com/loft-sh/devpod/pkg/log"
2021
"github.com/loft-sh/devpod/pkg/ts"
2122
"github.com/loft-sh/log"
@@ -159,7 +160,7 @@ func handleGitCredentialsOverTSNet(ctx context.Context, writer http.ResponseWrit
159160

160161
log.Infof("Received git credentials post data: %s", string(bodyBytes))
161162
// Set up HTTP transport that uses our network socket.
162-
socketPath := filepath.Join(workspaced.RootDir, workspaced.TSNetProxySocket)
163+
socketPath := filepath.Join(workspaced.RootDir, network.TSNetProxySocket)
163164
transport := &http.Transport{
164165
Dial: func(network, addr string) (net.Conn, error) {
165166
return net.Dial("unix", socketPath)

pkg/daemon/workspace/network.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"sync"
99

10+
"github.com/loft-sh/devpod/pkg/daemon/workspace/network"
1011
"github.com/loft-sh/devpod/pkg/platform/client"
1112
"github.com/loft-sh/devpod/pkg/ts"
1213
"github.com/loft-sh/log"
@@ -30,7 +31,7 @@ func RunNetworkServer(ctx context.Context, d *Daemon, errChan chan<- error, wg *
3031
errChan <- fmt.Errorf("failed to refresh client: %w", err)
3132
return
3233
}
33-
tsServer := NewWorkspaceServer(&WorkspaceServerConfig{
34+
networkServer := network.NewWorkspaceServer(&network.WorkspaceServerConfig{
3435
AccessKey: d.Config.Platform.AccessKey,
3536
PlatformHost: ts.RemoveProtocol(d.Config.Platform.PlatformHost),
3637
WorkspaceHost: d.Config.Platform.WorkspaceHost,
@@ -40,7 +41,7 @@ func RunNetworkServer(ctx context.Context, d *Daemon, errChan chan<- error, wg *
4041
logger.Infof(format, args...)
4142
},
4243
}, logger)
43-
if err := tsServer.Start(ctx); err != nil {
44+
if err := networkServer.Start(ctx); err != nil {
4445
errChan <- fmt.Errorf("network server: %w", err)
4546
}
4647
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package network
2+
3+
import "sync"
4+
5+
// ConnTracker is a simple connection counter used by several services.
6+
type ConnTracker struct {
7+
mu sync.Mutex
8+
count int
9+
}
10+
11+
func (c *ConnTracker) Add() {
12+
c.mu.Lock()
13+
defer c.mu.Unlock()
14+
c.count++
15+
}
16+
17+
func (c *ConnTracker) Remove() {
18+
c.mu.Lock()
19+
defer c.mu.Unlock()
20+
c.count--
21+
}
22+
23+
func (c *ConnTracker) Count() int {
24+
c.mu.Lock()
25+
defer c.mu.Unlock()
26+
return c.count
27+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package network
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"github.com/loft-sh/log"
10+
"tailscale.com/client/tailscale"
11+
"tailscale.com/tsnet"
12+
)
13+
14+
// HeartbeatService sends periodic heartbeats when there are active connections.
15+
type HeartbeatService struct {
16+
tsServer *tsnet.Server
17+
lc *tailscale.LocalClient
18+
config *WorkspaceServerConfig
19+
projectName string
20+
workspaceName string
21+
log log.Logger
22+
tracker *ConnTracker
23+
}
24+
25+
// NewHeartbeatService creates a new HeartbeatService.
26+
func NewHeartbeatService(config *WorkspaceServerConfig, tsServer *tsnet.Server, lc *tailscale.LocalClient, projectName, workspaceName string, tracker *ConnTracker, log log.Logger) *HeartbeatService {
27+
return &HeartbeatService{
28+
tsServer: tsServer,
29+
lc: lc,
30+
config: config,
31+
projectName: projectName,
32+
workspaceName: workspaceName,
33+
log: log,
34+
tracker: tracker,
35+
}
36+
}
37+
38+
// Start begins the heartbeat loop.
39+
func (s *HeartbeatService) Start(ctx context.Context) {
40+
transport := &http.Transport{DialContext: s.tsServer.Dial}
41+
client := &http.Client{Transport: transport, Timeout: 10 * time.Second}
42+
ticker := time.NewTicker(10 * time.Second)
43+
defer ticker.Stop()
44+
for {
45+
select {
46+
case <-ctx.Done():
47+
return
48+
case <-ticker.C:
49+
if s.tracker.Count() > 0 {
50+
if err := s.sendHeartbeat(ctx, client); err != nil {
51+
s.log.Errorf("HeartbeatService: failed to send heartbeat: %v", err)
52+
}
53+
}
54+
}
55+
}
56+
}
57+
58+
func (s *HeartbeatService) sendHeartbeat(ctx context.Context, client *http.Client) error {
59+
hbCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
60+
defer cancel()
61+
discoveredRunner, err := discoverRunner(hbCtx, s.lc, s.log)
62+
if err != nil {
63+
return fmt.Errorf("failed to discover runner: %w", err)
64+
}
65+
heartbeatURL := fmt.Sprintf("http://%s.ts.loft/devpod/%s/%s/heartbeat", discoveredRunner, s.projectName, s.workspaceName)
66+
s.log.Infof("HeartbeatService: sending heartbeat to %s, active connections: %d", heartbeatURL, s.tracker.Count())
67+
req, err := http.NewRequestWithContext(hbCtx, "GET", heartbeatURL, nil)
68+
if err != nil {
69+
return fmt.Errorf("failed to create request for %s: %w", heartbeatURL, err)
70+
}
71+
req.Header.Set("Authorization", "Bearer "+s.config.AccessKey)
72+
resp, err := client.Do(req)
73+
if err != nil {
74+
return fmt.Errorf("request to %s failed: %w", heartbeatURL, err)
75+
}
76+
defer resp.Body.Close()
77+
if resp.StatusCode != http.StatusOK {
78+
return fmt.Errorf("received response from %s - Status: %d", heartbeatURL, resp.StatusCode)
79+
}
80+
s.log.Infof("HeartbeatService: received response from %s - Status: %d", heartbeatURL, resp.StatusCode)
81+
return nil
82+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package network
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"os"
7+
"path/filepath"
8+
"time"
9+
10+
"github.com/loft-sh/devpod/pkg/ts"
11+
"github.com/loft-sh/log"
12+
"tailscale.com/client/tailscale"
13+
"tailscale.com/types/netmap"
14+
)
15+
16+
// NetmapWatcherService watches the Tailscale netmap and writes it to a file.
17+
type NetmapWatcherService struct {
18+
rootDir string
19+
lc *tailscale.LocalClient
20+
log log.Logger
21+
}
22+
23+
// NewNetmapWatcherService creates a new NetmapWatcherService.
24+
func NewNetmapWatcherService(rootDir string, lc *tailscale.LocalClient, log log.Logger) *NetmapWatcherService {
25+
return &NetmapWatcherService{
26+
rootDir: rootDir,
27+
lc: lc,
28+
log: log,
29+
}
30+
}
31+
32+
// Start begins watching the netmap.
33+
func (s *NetmapWatcherService) Start(ctx context.Context) {
34+
go func() {
35+
lastUpdate := time.Now()
36+
if err := ts.WatchNetmap(ctx, s.lc, func(netMap *netmap.NetworkMap) {
37+
if time.Since(lastUpdate) < netMapCooldown {
38+
return
39+
}
40+
lastUpdate = time.Now()
41+
nm, err := json.Marshal(netMap)
42+
if err != nil {
43+
s.log.Errorf("NetmapWatcherService: failed to marshal netmap: %v", err)
44+
} else {
45+
_ = os.WriteFile(filepath.Join(s.rootDir, "netmap.json"), nm, 0644)
46+
}
47+
}); err != nil {
48+
s.log.Errorf("NetmapWatcherService: failed to watch netmap: %v", err)
49+
}
50+
}()
51+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package network
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"io"
8+
"net"
9+
"net/http"
10+
"os"
11+
"strings"
12+
13+
"github.com/loft-sh/log"
14+
"tailscale.com/tsnet"
15+
)
16+
17+
// NetworkProxyService listens on a Unix socket and proxies requests to TSNet peers.
18+
type NetworkProxyService struct {
19+
listener net.Listener
20+
tsServer *tsnet.Server
21+
log log.Logger
22+
}
23+
24+
// NewNetworkProxyService creates a new NetworkProxyService.
25+
func NewNetworkProxyService(socketPath string, tsServer *tsnet.Server, log log.Logger) (*NetworkProxyService, error) {
26+
_ = os.Remove(socketPath)
27+
l, err := net.Listen("unix", socketPath)
28+
if err != nil {
29+
return nil, fmt.Errorf("failed to listen on socket %s: %w", socketPath, err)
30+
}
31+
if err := os.Chmod(socketPath, 0777); err != nil {
32+
log.Errorf("failed to set socket permissions on %s: %v", socketPath, err)
33+
}
34+
log.Infof("TSProxyServer: network proxy listening on socket %s", socketPath)
35+
return &NetworkProxyService{
36+
listener: l,
37+
tsServer: tsServer,
38+
log: log,
39+
}, nil
40+
}
41+
42+
// Start begins accepting connections on the TS proxy socket.
43+
func (s *NetworkProxyService) Start(ctx context.Context) {
44+
go func() {
45+
defer s.listener.Close()
46+
for {
47+
conn, err := s.listener.Accept()
48+
if err != nil {
49+
select {
50+
case <-ctx.Done():
51+
s.log.Infof("TSProxyServer: listener shutting down (context cancelled)")
52+
return
53+
default:
54+
s.log.Errorf("TSProxyServer: error accepting connection: %v", err)
55+
continue
56+
}
57+
}
58+
go s.handleConnection(ctx, conn)
59+
}
60+
}()
61+
}
62+
63+
func (s *NetworkProxyService) handleConnection(ctx context.Context, conn net.Conn) {
64+
defer conn.Close()
65+
reader := bufio.NewReader(conn)
66+
req, err := http.ReadRequest(reader)
67+
if err != nil {
68+
s.log.Errorf("TSProxyServer: failed to read HTTP request: %v", err)
69+
return
70+
}
71+
target := req.Host
72+
if target == "" {
73+
s.log.Errorf("TSProxyServer: HTTP request does not contain a Host header")
74+
return
75+
}
76+
if !strings.Contains(target, ":") {
77+
target = target + ":80"
78+
}
79+
s.log.Infof("TSProxyServer: proxying request to target %s", target)
80+
tsConn, err := s.tsServer.Dial(ctx, "tcp", target)
81+
if err != nil {
82+
s.log.Errorf("TSProxyServer: error dialing target %s: %v", target, err)
83+
return
84+
}
85+
defer tsConn.Close()
86+
if err := req.Write(tsConn); err != nil {
87+
s.log.Errorf("TSProxyServer: error forwarding request to target %s: %v", target, err)
88+
return
89+
}
90+
if _, err := io.Copy(conn, tsConn); err != nil {
91+
s.log.Errorf("TSProxyServer: error forwarding response from target: %v", err)
92+
}
93+
}
94+
95+
// Stop stops the TSProxyServer by closing its listener.
96+
func (s *NetworkProxyService) Stop() {
97+
if s.listener != nil {
98+
s.listener.Close()
99+
}
100+
}

0 commit comments

Comments
 (0)