Skip to content

Commit 5fe29d9

Browse files
committed
drpc poc
1 parent bb45dcb commit 5fe29d9

File tree

12 files changed

+1318
-1027
lines changed

12 files changed

+1318
-1027
lines changed

Makefile

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,16 @@ generate-wire: $(WIRE)
122122
@echo "Generating wire code..."
123123
cd ./cmd/api && $(WIRE)
124124

125-
# Generate gRPC code from proto
126-
generate-grpc:
127-
@echo "Generating gRPC code from proto..."
128-
protoc --go_out=. --go_opt=paths=source_relative \
129-
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
125+
# Generate DRPC code from proto
126+
# Note: PATH ordering ensures we use GOPATH/bin versions of protoc-gen-go and protoc-gen-go-drpc
127+
generate-drpc:
128+
@echo "Generating DRPC code from proto..."
129+
PATH=$(shell go env GOPATH)/bin:$$PATH protoc --go_out=. --go_opt=paths=source_relative \
130+
--go-drpc_out=. --go-drpc_opt=paths=source_relative \
130131
lib/guest/guest.proto
131132

132133
# Generate all code
133-
generate-all: oapi-generate generate-vmm-client generate-wire generate-grpc
134+
generate-all: oapi-generate generate-vmm-client generate-wire generate-drpc
134135

135136
# Check if CH binaries exist, download if missing
136137
.PHONY: ensure-ch-binaries

cmd/api/api/cp.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func (s *ApiService) handleCopyTo(ctx context.Context, ws *websocket.Conn, inst
230230
return 0, fmt.Errorf("get grpc connection: %w", err)
231231
}
232232

233-
client := guest.NewGuestServiceClient(grpcConn)
233+
client := guest.NewDRPCGuestServiceClient(grpcConn)
234234
stream, err := client.CopyToGuest(ctx)
235235
if err != nil {
236236
return 0, fmt.Errorf("start copy stream: %w", err)
@@ -340,7 +340,7 @@ func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, ins
340340
return 0, fmt.Errorf("get grpc connection: %w", err)
341341
}
342342

343-
client := guest.NewGuestServiceClient(grpcConn)
343+
client := guest.NewDRPCGuestServiceClient(grpcConn)
344344
stream, err := client.CopyFromGuest(ctx, &guest.CopyFromGuestRequest{
345345
Path: req.GuestPath,
346346
FollowLinks: req.FollowLinks,

cmd/api/api/instances.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func (s *ApiService) StatInstancePath(ctx context.Context, request oapi.StatInst
479479
}, nil
480480
}
481481

482-
client := guest.NewGuestServiceClient(grpcConn)
482+
client := guest.NewDRPCGuestServiceClient(grpcConn)
483483
followLinks := false
484484
if request.Params.FollowLinks != nil {
485485
followLinks = *request.Params.FollowLinks

go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ require (
1212
github.com/ghodss/yaml v1.0.0
1313
github.com/go-chi/chi/v5 v5.2.3
1414
github.com/golang-jwt/jwt/v5 v5.3.0
15-
github.com/golang/protobuf v1.5.4
1615
github.com/google/go-containerregistry v0.20.6
1716
github.com/google/wire v0.7.0
1817
github.com/gorilla/websocket v1.5.3
@@ -43,8 +42,9 @@ require (
4342
go.opentelemetry.io/otel/trace v1.38.0
4443
golang.org/x/sync v0.17.0
4544
golang.org/x/sys v0.38.0
46-
google.golang.org/grpc v1.77.0
45+
google.golang.org/protobuf v1.36.10
4746
gvisor.dev/gvisor v0.0.0-20251125014920-fc40e232ff54
47+
storj.io/drpc v0.0.34
4848
)
4949

5050
require (
@@ -102,6 +102,7 @@ require (
102102
github.com/vbatts/tar-split v0.12.1 // indirect
103103
github.com/vishvananda/netns v0.0.5 // indirect
104104
github.com/woodsbury/decimal128 v1.3.0 // indirect
105+
github.com/zeebo/errs v1.2.2 // indirect
105106
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
106107
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
107108
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
@@ -114,7 +115,7 @@ require (
114115
golang.org/x/tools v0.37.0 // indirect
115116
google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 // indirect
116117
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect
117-
google.golang.org/protobuf v1.36.10 // indirect
118+
google.golang.org/grpc v1.77.0 // indirect
118119
gopkg.in/yaml.v2 v2.4.0 // indirect
119120
gopkg.in/yaml.v3 v3.0.1 // indirect
120121
gotest.tools/v3 v3.5.2 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,10 @@ github.com/woodsbury/decimal128 v1.3.0/go.mod h1:C5UTmyTjW3JftjUFzOVhC20BEQa2a4Z
238238
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
239239
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
240240
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
241+
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
242+
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
243+
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
244+
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
241245
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
242246
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
243247
go.opentelemetry.io/contrib/bridges/otelslog v0.13.0 h1:bwnLpizECbPr1RrQ27waeY2SPIPeccCx/xLuoYADZ9s=
@@ -365,3 +369,5 @@ gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q=
365369
gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA=
366370
gvisor.dev/gvisor v0.0.0-20251125014920-fc40e232ff54 h1:eYMn6Z3T40m4f9vVYRcsjvX4eEv7ng7FgrZTbadSyBs=
367371
gvisor.dev/gvisor v0.0.0-20251125014920-fc40e232ff54/go.mod h1:W1ZgZ/Dh85TgSZWH67l2jKVpDE5bjIaut7rjwwOiHzQ=
372+
storj.io/drpc v0.0.34 h1:q9zlQKfJ5A7x8NQNFk8x7eKUF78FMhmAbZLnFK+og7I=
373+
storj.io/drpc v0.0.34/go.mod h1:Y9LZaa8esL1PW2IDMqJE7CFSNq7d5bQ3RI7mGPtmKMg=

lib/guest/client.go

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ import (
1717

1818
securejoin "github.com/cyphar/filepath-securejoin"
1919
"github.com/onkernel/hypeman/lib/hypervisor"
20-
"google.golang.org/grpc"
21-
"google.golang.org/grpc/credentials/insecure"
20+
"storj.io/drpc/drpcconn"
2221
)
2322

2423
const (
@@ -51,25 +50,31 @@ func IsAgentConnectionError(err error) bool {
5150
errors.As(err, &agentErr))
5251
}
5352

54-
// connPool manages reusable gRPC connections per vsock dialer key
53+
// pooledConn wraps a drpc connection with its underlying net.Conn for cleanup
54+
type pooledConn struct {
55+
drpc *drpcconn.Conn
56+
netConn net.Conn
57+
}
58+
59+
// connPool manages reusable DRPC connections per vsock dialer key
5560
// This avoids the overhead and potential issues of rapidly creating/closing connections
5661
var connPool = struct {
5762
sync.RWMutex
58-
conns map[string]*grpc.ClientConn
63+
conns map[string]*pooledConn
5964
}{
60-
conns: make(map[string]*grpc.ClientConn),
65+
conns: make(map[string]*pooledConn),
6166
}
6267

6368
// GetOrCreateConn returns an existing connection or creates a new one using a VsockDialer.
6469
// This supports multiple hypervisor types (Cloud Hypervisor, QEMU, etc.).
65-
func GetOrCreateConn(ctx context.Context, dialer hypervisor.VsockDialer) (*grpc.ClientConn, error) {
70+
func GetOrCreateConn(ctx context.Context, dialer hypervisor.VsockDialer) (*drpcconn.Conn, error) {
6671
key := dialer.Key()
6772

6873
// Try read lock first for existing connection
6974
connPool.RLock()
70-
if conn, ok := connPool.conns[key]; ok {
75+
if pc, ok := connPool.conns[key]; ok {
7176
connPool.RUnlock()
72-
return conn, nil
77+
return pc.drpc, nil
7378
}
7479
connPool.RUnlock()
7580

@@ -78,40 +83,33 @@ func GetOrCreateConn(ctx context.Context, dialer hypervisor.VsockDialer) (*grpc.
7883
defer connPool.Unlock()
7984

8085
// Double-check after acquiring write lock
81-
if conn, ok := connPool.conns[key]; ok {
82-
return conn, nil
86+
if pc, ok := connPool.conns[key]; ok {
87+
return pc.drpc, nil
8388
}
8489

8590
// Create new connection using the VsockDialer
86-
conn, err := grpc.Dial("passthrough:///vsock",
87-
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
88-
netConn, err := dialer.DialVsock(ctx, vsockGuestPort)
89-
if err != nil {
90-
return nil, &AgentConnectionError{Err: err}
91-
}
92-
return netConn, nil
93-
}),
94-
grpc.WithTransportCredentials(insecure.NewCredentials()),
95-
)
91+
netConn, err := dialer.DialVsock(ctx, vsockGuestPort)
9692
if err != nil {
97-
return nil, fmt.Errorf("create grpc connection: %w", err)
93+
return nil, &AgentConnectionError{Err: err}
9894
}
9995

100-
connPool.conns[key] = conn
101-
slog.Debug("created new gRPC connection", "key", key)
96+
// Wrap in drpc connection
97+
conn := drpcconn.New(netConn)
98+
99+
connPool.conns[key] = &pooledConn{drpc: conn, netConn: netConn}
100+
slog.Debug("created new DRPC connection", "key", key)
102101
return conn, nil
103102
}
104103

105104
// CloseConn removes a connection from the pool by key (call when VM is deleted).
106-
// We only remove from pool, not explicitly close - the connection will fail
107-
// naturally when the VM dies, and grpc will clean up.
108105
func CloseConn(dialerKey string) {
109106
connPool.Lock()
110107
defer connPool.Unlock()
111108

112-
if _, ok := connPool.conns[dialerKey]; ok {
109+
if pc, ok := connPool.conns[dialerKey]; ok {
110+
pc.drpc.Close()
113111
delete(connPool.conns, dialerKey)
114-
slog.Debug("removed gRPC connection from pool", "key", dialerKey)
112+
slog.Debug("removed DRPC connection from pool", "key", dialerKey)
115113
}
116114
}
117115

@@ -132,27 +130,27 @@ type ExecOptions struct {
132130
Timeout int32 // Execution timeout in seconds (0 = no timeout)
133131
}
134132

135-
// ExecIntoInstance executes command in instance via vsock using gRPC.
133+
// ExecIntoInstance executes command in instance via vsock using DRPC.
136134
// The dialer is a hypervisor-specific VsockDialer that knows how to connect to the guest.
137135
func ExecIntoInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts ExecOptions) (*ExitStatus, error) {
138136
start := time.Now()
139137
var bytesSent int64
140138

141-
// Get or create a reusable gRPC connection for this vsock dialer
142-
grpcConn, err := GetOrCreateConn(ctx, dialer)
139+
// Get or create a reusable DRPC connection for this vsock dialer
140+
drpcConn, err := GetOrCreateConn(ctx, dialer)
143141
if err != nil {
144-
return nil, fmt.Errorf("get grpc connection: %w", err)
142+
return nil, fmt.Errorf("get drpc connection: %w", err)
145143
}
146144
// Note: Don't close the connection - it's pooled and reused
147145

148146
// Create guest client
149-
client := NewGuestServiceClient(grpcConn)
147+
client := NewDRPCGuestServiceClient(drpcConn)
150148
stream, err := client.Exec(ctx)
151149
if err != nil {
152150
return nil, fmt.Errorf("start exec stream: %w", err)
153151
}
154152
// Ensure stream is properly closed when we're done
155-
defer stream.CloseSend()
153+
defer stream.Close()
156154

157155
// Send start request
158156
if err := stream.Send(&ExecRequest{
@@ -233,12 +231,12 @@ type CopyToInstanceOptions struct {
233231
// CopyToInstance copies a file or directory to an instance via vsock.
234232
// The dialer is a hypervisor-specific VsockDialer that knows how to connect to the guest.
235233
func CopyToInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts CopyToInstanceOptions) error {
236-
grpcConn, err := GetOrCreateConn(ctx, dialer)
234+
drpcConn, err := GetOrCreateConn(ctx, dialer)
237235
if err != nil {
238-
return fmt.Errorf("get grpc connection: %w", err)
236+
return fmt.Errorf("get drpc connection: %w", err)
239237
}
240238

241-
client := NewGuestServiceClient(grpcConn)
239+
client := NewDRPCGuestServiceClient(drpcConn)
242240

243241
// Stat the source
244242
srcInfo, err := os.Stat(opts.SrcPath)
@@ -253,7 +251,7 @@ func CopyToInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts Cop
253251
}
254252

255253
// copyFileToInstance copies a single file to the instance
256-
func copyFileToInstance(ctx context.Context, client GuestServiceClient, srcPath, dstPath string, mode fs.FileMode) error {
254+
func copyFileToInstance(ctx context.Context, client DRPCGuestServiceClient, srcPath, dstPath string, mode fs.FileMode) error {
257255
srcInfo, err := os.Stat(srcPath)
258256
if err != nil {
259257
return fmt.Errorf("stat source: %w", err)
@@ -329,7 +327,7 @@ func copyFileToInstance(ctx context.Context, client GuestServiceClient, srcPath,
329327
}
330328

331329
// copyDirToInstance copies a directory recursively to the instance
332-
func copyDirToInstance(ctx context.Context, client GuestServiceClient, srcPath, dstPath string) error {
330+
func copyDirToInstance(ctx context.Context, client DRPCGuestServiceClient, srcPath, dstPath string) error {
333331
srcPath = filepath.Clean(srcPath)
334332

335333
// First create the destination directory
@@ -444,12 +442,12 @@ type FileHandler func(header *CopyFromGuestHeader, data io.Reader) error
444442
// CopyFromInstance copies a file or directory from an instance via vsock.
445443
// The dialer is a hypervisor-specific VsockDialer that knows how to connect to the guest.
446444
func CopyFromInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts CopyFromInstanceOptions) error {
447-
grpcConn, err := GetOrCreateConn(ctx, dialer)
445+
drpcConn, err := GetOrCreateConn(ctx, dialer)
448446
if err != nil {
449-
return fmt.Errorf("get grpc connection: %w", err)
447+
return fmt.Errorf("get drpc connection: %w", err)
450448
}
451449

452-
client := NewGuestServiceClient(grpcConn)
450+
client := NewDRPCGuestServiceClient(drpcConn)
453451

454452
stream, err := client.CopyFromGuest(ctx, &CopyFromGuestRequest{
455453
Path: opts.SrcPath,

0 commit comments

Comments
 (0)