Skip to content

Commit 93bf862

Browse files
grpc reverse proxy
1 parent 2f71d8c commit 93bf862

File tree

16 files changed

+883
-172
lines changed

16 files changed

+883
-172
lines changed

cmd/agent/container/credentials_server.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import (
1313
"github.com/loft-sh/devpod/pkg/agent/tunnel"
1414
"github.com/loft-sh/devpod/pkg/agent/tunnelserver"
1515
"github.com/loft-sh/devpod/pkg/credentials"
16+
locald "github.com/loft-sh/devpod/pkg/daemon/local"
1617
"github.com/loft-sh/devpod/pkg/dockercredentials"
1718
"github.com/loft-sh/devpod/pkg/gitcredentials"
1819
"github.com/loft-sh/devpod/pkg/gitsshsigning"
1920
"github.com/loft-sh/devpod/pkg/netstat"
2021
portpkg "github.com/loft-sh/devpod/pkg/port"
2122
"github.com/loft-sh/log"
23+
"github.com/sirupsen/logrus"
2224
"github.com/spf13/cobra"
2325
)
2426

@@ -69,10 +71,23 @@ func NewCredentialsServerCmd(flags *flags.GlobalFlags) *cobra.Command {
6971

7072
// Run runs the command logic
7173
func (cmd *CredentialsServerCmd) Run(ctx context.Context, port int) error {
74+
var tunnelClient tunnel.TunnelClient
75+
var err error
76+
fileLogger := log.NewFileLogger("/tmp/credentials_server_cmd.log", logrus.DebugLevel)
7277
// create a grpc client
73-
tunnelClient, err := tunnelserver.NewTunnelClient(os.Stdin, os.Stdout, true, ExitCodeIO)
74-
if err != nil {
75-
return fmt.Errorf("error creating tunnel client: %w", err)
78+
// if we have client address, lets use the http client
79+
if cmd.Client != "" {
80+
// address := ts.EnsureURL(cmd.Client, locald.LocalCredentialsServerPort)
81+
tunnelClient, err = tunnelserver.NewHTTPTunnelClient(cmd.Client, fmt.Sprintf("%d", locald.LocalCredentialsServerPort), fileLogger)
82+
if err != nil {
83+
return fmt.Errorf("error creating tunnel client: %w", err)
84+
}
85+
} else {
86+
// otherwise we fallback to stdio client
87+
tunnelClient, err = tunnelserver.NewTunnelClient(os.Stdin, os.Stdout, true, ExitCodeIO)
88+
if err != nil {
89+
return fmt.Errorf("error creating tunnel client: %w", err)
90+
}
7691
}
7792

7893
// this message serves as a ping to the client

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ require (
4141
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
4242
github.com/moby/buildkit v0.20.1
4343
github.com/moby/term v0.5.2
44+
github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9
4445
github.com/onsi/ginkgo/v2 v2.21.0
4546
github.com/onsi/gomega v1.35.1
4647
github.com/otiai10/copy v1.7.0

go.sum

Lines changed: 80 additions & 4 deletions
Large diffs are not rendered by default.

pkg/agent/tunnelserver/client.go

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ package tunnelserver
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"net"
78

89
"github.com/loft-sh/devpod/pkg/agent/tunnel"
910
"github.com/loft-sh/devpod/pkg/daemon/workspace/network"
1011
"github.com/loft-sh/devpod/pkg/stdio"
12+
"github.com/loft-sh/log"
1113
"google.golang.org/grpc"
1214
"google.golang.org/grpc/credentials/insecure"
15+
"google.golang.org/grpc/metadata"
1316
"google.golang.org/grpc/resolver"
1417
)
1518

@@ -36,23 +39,62 @@ func NewTunnelClient(reader io.Reader, writer io.WriteCloser, exitOnClose bool,
3639
return c, nil
3740
}
3841

39-
// NewTunnelClient creates a gRPC tunnel client that connects via the Unix domain socket,
40-
// using the shared dialer from the network package.
41-
func NewHTTPTunnelClient(_ io.Reader, _ io.WriteCloser, _ bool, _ int) (tunnel.TunnelClient, error) {
42-
// After moving from deprecated grpc.Dial to grpc.NewClient we need to setup resolver first
43-
// https://github.com/grpc/grpc-go/issues/1786#issuecomment-2119088770
42+
// NewHTTPTunnelClient creates a new gRPC client that connects via the network proxy.
43+
func NewHTTPTunnelClient(targetHost string, targetPort string, log log.Logger) (tunnel.TunnelClient, error) {
4444
resolver.SetDefaultScheme("passthrough")
45+
log.Infof("Starting tunnel client targeting %s:%s via proxy", targetHost, targetPort)
4546

46-
// Set up a connection to the server.
47-
conn, err := grpc.NewClient("",
48-
grpc.WithTransportCredentials(insecure.NewCredentials()),
49-
grpc.WithContextDialer(network.GetContextDialer()),
47+
// Create a unary interceptor to attach the target metadata.
48+
unaryInterceptor := func(
49+
ctx context.Context,
50+
method string,
51+
req, reply interface{},
52+
cc *grpc.ClientConn,
53+
invoker grpc.UnaryInvoker,
54+
opts ...grpc.CallOption,
55+
) error {
56+
md := metadata.New(map[string]string{
57+
"x-target-host": targetHost,
58+
"x-target-port": targetPort,
59+
})
60+
// Create a new outgoing context with the metadata attached.
61+
ctx = metadata.NewOutgoingContext(ctx, md)
62+
log.Debugf("Unary interceptor adding metadata: host=%s, port=%s", targetHost, targetPort)
63+
return invoker(ctx, method, req, reply, cc, opts...)
64+
}
65+
66+
streamInterceptor := func(
67+
ctx context.Context,
68+
desc *grpc.StreamDesc,
69+
cc *grpc.ClientConn,
70+
method string,
71+
streamer grpc.Streamer,
72+
opts ...grpc.CallOption,
73+
) (grpc.ClientStream, error) {
74+
md := metadata.New(map[string]string{
75+
"x-target-host": targetHost,
76+
"x-target-port": targetPort,
77+
})
78+
// Create a new outgoing context with the metadata attached.
79+
ctx = metadata.NewOutgoingContext(ctx, md)
80+
log.Debugf("Stream interceptor adding metadata: host=%s, port=%s", targetHost, targetPort)
81+
return streamer(ctx, desc, cc, method, opts...)
82+
}
83+
84+
target := "passthrough:///proxy-socket-target"
85+
86+
conn, err := grpc.NewClient(target,
87+
grpc.WithTransportCredentials(insecure.NewCredentials()), // Connect to proxy socket without TLS
88+
grpc.WithContextDialer(network.GetContextDialer()), // Use our custom dialer
89+
grpc.WithUnaryInterceptor(unaryInterceptor), // Add metadata for unary calls
90+
grpc.WithStreamInterceptor(streamInterceptor), // Add metadata for streaming calls
5091
)
5192
if err != nil {
52-
return nil, err
93+
log.Errorf("Failed to create gRPC client connection via proxy: %v", err)
94+
return nil, fmt.Errorf("failed to create gRPC client via proxy: %w", err)
5395
}
5496

97+
log.Infof("Successfully connected tunnel client via proxy socket")
5598
c := tunnel.NewTunnelClient(conn)
56-
5799
return c, nil
58100
}

pkg/agent/tunnelserver/tunnelserver.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/base64"
77
"encoding/json"
88
"fmt"
9+
"io"
910
"net"
1011
"os"
1112
"path/filepath"
@@ -23,13 +24,29 @@ import (
2324
"github.com/loft-sh/devpod/pkg/netstat"
2425
"github.com/loft-sh/devpod/pkg/platform"
2526
provider2 "github.com/loft-sh/devpod/pkg/provider"
27+
"github.com/loft-sh/devpod/pkg/stdio"
2628
"github.com/loft-sh/log"
2729
"github.com/moby/patternmatcher/ignorefile"
2830
perrors "github.com/pkg/errors"
2931
"google.golang.org/grpc"
3032
"google.golang.org/grpc/reflection"
3133
)
3234

35+
// GetListener returns correct listener for services server - either stdio or tcp
36+
func GetListener(client string, reader io.Reader, writer io.WriteCloser, exitOnClose bool, log log.Logger) (net.Listener, error) {
37+
if client == "" {
38+
log.Info("GetListener - returning stdio listener")
39+
return stdio.NewStdioListener(reader, writer, exitOnClose), nil
40+
}
41+
log.Info("GetListener - returning tcp listener")
42+
listener, err := net.Listen("tcp", ":4795") // FIXME
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
return listener, nil
48+
}
49+
3350
func RunServicesServer(ctx context.Context, lis net.Listener, allowGitCredentials, allowDockerCredentials bool, forwarder netstat.Forwarder, workspace *provider2.Workspace, log log.Logger, options ...Option) error {
3451
opts := append(options, []Option{
3552
WithForwarder(forwarder),

pkg/daemon/local/credentials_proxy.go

Lines changed: 85 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,123 +4,128 @@ import (
44
"context"
55
"fmt"
66
"net"
7-
"net/http"
8-
"net/http/httputil"
9-
"net/url"
107
"time"
118

129
"github.com/loft-sh/log"
10+
"github.com/mwitkow/grpc-proxy/proxy"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/codes"
13+
"google.golang.org/grpc/credentials/insecure"
14+
"google.golang.org/grpc/metadata"
15+
"google.golang.org/grpc/status"
1316
"tailscale.com/tsnet"
1417
)
1518

1619
const (
17-
// Listen on this port via tsnet.
18-
LocalCredentialsServerPort = 9999 // FIXME - use random prot
19-
// Target server: local gRPC server running on port 5555.
20-
TargetServer = "http://localhost:5555" // FIXME - get port from request
20+
LocalCredentialsServerPort int = 9999
21+
DefaultTargetHost string = "localhost"
2122
)
2223

23-
// LocalCredentialsServerProxy acts as a reverse proxy that blindly forwards
24-
// all incoming traffic to the local gRPC server on port 5555.
2524
type LocalCredentialsServerProxy struct {
26-
log log.Logger
27-
tsServer *tsnet.Server
28-
29-
ln net.Listener
30-
srv *http.Server
25+
log log.Logger
26+
tsServer *tsnet.Server
27+
grpcServer *grpc.Server
28+
ln net.Listener
3129
}
3230

33-
// NewLocalCredentialsServerProxy initializes a new LocalCredentialsServerProxy.
34-
func NewLocalCredentialsServerProxy(tsServer *tsnet.Server, log log.Logger) (*LocalCredentialsServerProxy, error) {
31+
func NewLocalCredentialsServerProxy(tsServer *tsnet.Server, logger log.Logger) (*LocalCredentialsServerProxy, error) {
32+
logger.Infof("NewLocalCredentialsServerProxy: initializing local reverse proxy")
33+
if tsServer == nil {
34+
return nil, fmt.Errorf("tsnet.Server cannot be nil")
35+
}
3536
return &LocalCredentialsServerProxy{
36-
log: log,
37+
log: logger,
3738
tsServer: tsServer,
3839
}, nil
3940
}
4041

41-
// Listen creates the tsnet listener and HTTP server,
42-
// and registers a catch-all handler that acts as the reverse proxy.
4342
func (s *LocalCredentialsServerProxy) Listen(ctx context.Context) error {
44-
s.log.Info("Starting reverse proxy for local gRPC server")
43+
s.log.Infof("LocalCredentialsServerProxy: Starting reverse proxy on tsnet port %d", LocalCredentialsServerPort)
4544

46-
// Create a tsnet listener.
47-
ln, err := s.tsServer.Listen("tcp", fmt.Sprintf(":%d", LocalCredentialsServerPort))
45+
listenAddr := fmt.Sprintf(":%d", LocalCredentialsServerPort)
46+
ln, err := s.tsServer.Listen("tcp", listenAddr)
4847
if err != nil {
49-
s.log.Infof("Failed to listen on tsnet port %d: %v", LocalCredentialsServerPort, err)
50-
return fmt.Errorf("failed to listen on tsnet port %d: %w", LocalCredentialsServerPort, err)
48+
s.log.Errorf("LocalCredentialsServerProxy: Failed to listen on tsnet %s: %v", listenAddr, err)
49+
return fmt.Errorf("failed to listen on tsnet %s: %w", listenAddr, err)
5150
}
5251
s.ln = ln
5352

54-
mux := http.NewServeMux()
55-
mux.HandleFunc("/", s.handleReverseProxy)
53+
s.log.Infof("LocalCredentialsServerProxy: tsnet listener started on %s", ln.Addr().String())
5654

57-
// Create the HTTP server.
58-
s.srv = &http.Server{
59-
Handler: mux,
60-
}
55+
director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
56+
md, ok := metadata.FromIncomingContext(ctx)
57+
if !ok {
58+
return nil, nil, status.Errorf(codes.InvalidArgument, "missing metadata")
59+
}
6160

62-
go func() {
63-
<-ctx.Done()
64-
s.log.Info("Context canceled, shutting down reverse proxy")
65-
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
66-
defer cancel()
67-
if err := s.srv.Shutdown(shutdownCtx); err != nil {
68-
s.log.Errorf("Error shutting down reverse proxy: %v", err)
61+
// Get the target port from metadata. Host is always localhost.
62+
targetPorts := md.Get("x-target-port")
63+
if len(targetPorts) == 0 {
64+
s.log.Error("LocalCredentialsServerProxy: Director missing x-target-port metadata")
65+
return nil, nil, status.Errorf(codes.InvalidArgument, "missing x-target-port metadata")
6966
}
70-
}()
67+
// targetPort := targetPorts[0]
68+
targetPort := "4795" // FIXME
7169

72-
s.log.Infof("Reverse proxy listening on tsnet port %d", LocalCredentialsServerPort)
73-
err = s.srv.Serve(ln)
74-
if err != nil && err != http.ErrServerClosed {
75-
s.log.Errorf("Reverse proxy error: %v", err)
76-
return err
77-
}
70+
targetAddr := net.JoinHostPort(DefaultTargetHost, targetPort)
7871

79-
return nil
80-
}
72+
s.log.Infof("[LocalCredentialsServerProxy] [gRPC] Proxying call %q to target %s", fullMethodName, targetAddr)
8173

82-
// handleReverseProxy forwards every request to the target gRPC server.
83-
func (s *LocalCredentialsServerProxy) handleReverseProxy(w http.ResponseWriter, r *http.Request) {
84-
s.log.Infof("Forwarding request %s %s to target server", r.Method, r.URL.String())
74+
conn, err := grpc.DialContext(ctx, targetAddr,
75+
grpc.WithTransportCredentials(insecure.NewCredentials()),
76+
grpc.WithCodec(proxy.Codec()), // Use proxy codec for transparency
77+
)
78+
if err != nil {
79+
s.log.Errorf("[LocalCredentialsServerProxy] [gRPC] Failed to dial local target backend %s: %v", targetAddr, err)
80+
return nil, nil, status.Errorf(codes.Internal, "failed to dial local target backend: %v", err)
81+
}
8582

86-
// Parse the target URL.
87-
targetURL, err := url.Parse(TargetServer)
88-
if err != nil {
89-
s.log.Errorf("Error parsing target URL %s: %v", TargetServer, err)
90-
http.Error(w, "Bad Gateway", http.StatusBadGateway)
91-
return
83+
return ctx, conn, nil
9284
}
9385

94-
// Create the reverse proxy.
95-
proxy := httputil.NewSingleHostReverseProxy(targetURL)
86+
// Create the gRPC server using the transparent handler.
87+
// It will forward any unknown service call based on the director logic.
88+
s.grpcServer = grpc.NewServer(
89+
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
90+
)
9691

97-
// Customize the director to forward the Host header to the target.
98-
originalDirector := proxy.Director
99-
proxy.Director = func(req *http.Request) {
100-
originalDirector(req)
101-
req.Host = targetURL.Host
102-
}
92+
s.log.Infof("LocalCredentialsServerProxy: gRPC reverse proxy configured, starting server on %s", ln.Addr().String())
10393

104-
// Use an error handler to log any errors that occur.
105-
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
106-
s.log.Errorf("Reverse proxy error: %v", err)
107-
http.Error(w, "Bad Gateway", http.StatusBadGateway)
94+
if err := s.grpcServer.Serve(s.ln); err != nil {
95+
if err.Error() != "grpc: the server has been stopped" {
96+
s.log.Errorf("LocalCredentialsServerProxy: failed to serve: %v", err)
97+
return fmt.Errorf("gRPC server error: %w", err)
98+
} else {
99+
s.log.Infof("LocalCredentialsServerProxy: gRPC server stopped gracefully.")
100+
}
108101
}
109-
110-
// Forward the request.
111-
proxy.ServeHTTP(w, r)
102+
return nil
112103
}
113104

114-
// Close gracefully shuts down the reverse proxy.
115-
func (s *LocalCredentialsServerProxy) Close() error {
116-
s.log.Info("Closing reverse proxy")
117-
if s.srv != nil {
118-
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
119-
defer cancel()
120-
if err := s.srv.Shutdown(shutdownCtx); err != nil {
121-
s.log.Errorf("Error during reverse proxy shutdown: %v", err)
122-
return err
105+
func (s *LocalCredentialsServerProxy) Stop() {
106+
s.log.Info("LocalCredentialsServerProxy: Stopping reverse proxy...")
107+
if s.grpcServer != nil {
108+
stopped := make(chan struct{})
109+
go func() {
110+
s.grpcServer.GracefulStop()
111+
close(stopped)
112+
}()
113+
114+
select {
115+
case <-time.After(10 * time.Second):
116+
s.log.Warnf("LocalCredentialsServerProxy: Graceful shutdown timed out after 10 seconds, forcing stop.")
117+
s.grpcServer.Stop()
118+
case <-stopped:
119+
s.log.Infof("LocalCredentialsServerProxy: gRPC server stopped gracefully.")
123120
}
124121
}
125-
return nil
122+
123+
if s.ln != nil {
124+
if err := s.ln.Close(); err != nil {
125+
s.log.Errorf("LocalCredentialsServerProxy: Error closing listener: %v", err)
126+
} else {
127+
s.log.Infof("LocalCredentialsServerProxy: Listener closed.")
128+
}
129+
}
130+
s.log.Info("LocalCredentialsServerProxy: Reverse proxy stopped.")
126131
}

0 commit comments

Comments
 (0)