Skip to content

Commit ca7f679

Browse files
committed
driver(external): change grpc transport to unix sockets
Signed-off-by: Ansuman Sahoo <[email protected]>
1 parent 8027b9c commit ca7f679

File tree

16 files changed

+321
-237
lines changed

16 files changed

+321
-237
lines changed

pkg/driver/driver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ type Driver interface {
8484
}
8585

8686
type Info struct {
87-
DriverName string `json:"driverName"`
88-
CanRunGUI bool `json:"canRunGui,omitempty"`
89-
VsockPort int `json:"vsockPort"`
90-
VirtioPort string `json:"virtioPort"`
87+
DriverName string `json:"driverName"`
88+
CanRunGUI bool `json:"canRunGui,omitempty"`
89+
VsockPort int `json:"vsockPort"`
90+
VirtioPort string `json:"virtioPort"`
91+
InstanceDir string `json:"instanceDir,omitempty"`
9192
}

pkg/driver/external/client/client.go

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ package client
55

66
import (
77
"context"
8-
"io"
9-
"math"
108
"net"
119

1210
pb "github.com/lima-vm/lima/pkg/driver/external"
@@ -16,22 +14,19 @@ import (
1614
)
1715

1816
type DriverClient struct {
19-
Stdin io.WriteCloser
20-
Stdout io.ReadCloser
21-
Conn *grpc.ClientConn
22-
DriverSvc pb.DriverClient
23-
logger *logrus.Logger
17+
socketPath string
18+
Conn *grpc.ClientConn
19+
DriverSvc pb.DriverClient
20+
logger *logrus.Logger
2421
}
2522

26-
func NewDriverClient(stdin io.WriteCloser, stdout io.ReadCloser, logger *logrus.Logger) (*DriverClient, error) {
27-
pipeConn := newPipeConn(stdin, stdout)
23+
func NewDriverClient(socketPath string, logger *logrus.Logger) (*DriverClient, error) {
24+
// pipeConn := newPipeConn(stdin, stdout)
2825
opts := []grpc.DialOption{
29-
grpc.WithDefaultCallOptions(
30-
grpc.MaxCallRecvMsgSize(math.MaxInt64),
31-
grpc.MaxCallSendMsgSize(math.MaxInt64),
32-
),
26+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(16 << 20)),
27+
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(16 << 20)),
3328
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
34-
return pipeConn, nil
29+
return net.Dial("unix", socketPath)
3530
}),
3631
grpc.WithTransportCredentials(insecure.NewCredentials()),
3732
}
@@ -44,7 +39,7 @@ func NewDriverClient(stdin io.WriteCloser, stdout io.ReadCloser, logger *logrus.
4439
// -> ERRO[2025-06-04T21:32:54+05:30] Failed to set config: rpc error: code =
4540
// Unavailable desc = name resolver error: produced zero addresses
4641

47-
conn, err := grpc.Dial("pipe", opts...)
42+
conn, err := grpc.Dial("unix://"+socketPath, opts...)
4843
if err != nil {
4944
logger.Errorf("failed to dial gRPC driver client connection: %v", err)
5045
return nil, err
@@ -53,10 +48,9 @@ func NewDriverClient(stdin io.WriteCloser, stdout io.ReadCloser, logger *logrus.
5348
driverSvc := pb.NewDriverClient(conn)
5449

5550
return &DriverClient{
56-
Stdin: stdin,
57-
Stdout: stdout,
58-
Conn: conn,
59-
DriverSvc: driverSvc,
60-
logger: logger,
51+
socketPath: socketPath,
52+
Conn: conn,
53+
DriverSvc: driverSvc,
54+
logger: logger,
6155
}, nil
6256
}

pkg/driver/external/client/grpchijack/hijack.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ package grpchijack
5050
// return n, nil
5151
// }
5252

53-
// msg := new(pb.BytesMessage)
53+
// msg := new(pb.NetConn)
5454
// msg, err := c.stream.Recv()
5555
// if err != nil {
5656
// c.closed = true
@@ -77,7 +77,7 @@ package grpchijack
7777
// return 0, errors.New("connection closed")
7878
// }
7979

80-
// err := c.stream.Send(&pb.BytesMessage{Data: b})
80+
// err := c.stream.Send(&pb.NetConn{Data: b})
8181
// if err != nil {
8282
// c.closed = true
8383
// return 0, fmt.Errorf("stream send error: %w", err)
@@ -103,7 +103,7 @@ package grpchijack
103103

104104
// c.readMu.Lock()
105105
// for {
106-
// m := new(pb.BytesMessage)
106+
// m := new(pb.)
107107
// m.Data = c.readBuf
108108
// err := c.stream.RecvMsg(m)
109109
// if err != nil {

pkg/driver/external/client/methods.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"net"
1111
"time"
1212

13+
// "google.golang.org/protobuf/proto"
14+
1315
"google.golang.org/protobuf/types/known/emptypb"
1416

1517
"github.com/lima-vm/lima/pkg/driver"

pkg/driver/external/client/pipe.go

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,60 +3,60 @@
33

44
package client
55

6-
import (
7-
"io"
8-
"net"
9-
"os"
10-
"time"
11-
)
12-
13-
type PipeConn struct {
14-
Reader io.Reader
15-
Writer io.Writer
16-
Closer io.Closer
17-
}
18-
19-
func newPipeConn(writer io.WriteCloser, reader io.ReadCloser) net.Conn {
20-
return &PipeConn{
21-
Reader: reader,
22-
Writer: writer,
23-
Closer: os.Stdout,
24-
}
25-
}
26-
27-
func (p *PipeConn) Read(b []byte) (n int, err error) {
28-
return p.Reader.Read(b)
29-
}
30-
31-
func (p *PipeConn) Write(b []byte) (n int, err error) {
32-
return p.Writer.Write(b)
33-
}
34-
35-
func (p *PipeConn) Close() error {
36-
return p.Closer.Close()
37-
}
38-
39-
func (p *PipeConn) LocalAddr() net.Addr {
40-
return pipeAddr{}
41-
}
42-
43-
func (p *PipeConn) RemoteAddr() net.Addr {
44-
return pipeAddr{}
45-
}
46-
47-
func (p *PipeConn) SetDeadline(t time.Time) error {
48-
return nil
49-
}
50-
51-
func (p *PipeConn) SetReadDeadline(t time.Time) error {
52-
return nil
53-
}
54-
55-
func (p *PipeConn) SetWriteDeadline(t time.Time) error {
56-
return nil
57-
}
58-
59-
type pipeAddr struct{}
60-
61-
func (pipeAddr) Network() string { return "pipe" }
62-
func (pipeAddr) String() string { return "pipe" }
6+
// import (
7+
// "io"
8+
// "net"
9+
// "os"
10+
// "time"
11+
// )
12+
13+
// type PipeConn struct {
14+
// Reader io.Reader
15+
// Writer io.Writer
16+
// Closer io.Closer
17+
// }
18+
19+
// func newPipeConn(writer io.WriteCloser, reader io.ReadCloser) net.Conn {
20+
// return &PipeConn{
21+
// Reader: reader,
22+
// Writer: writer,
23+
// Closer: os.Stdout,
24+
// }
25+
// }
26+
27+
// func (p *PipeConn) Read(b []byte) (n int, err error) {
28+
// return p.Reader.Read(b)
29+
// }
30+
31+
// func (p *PipeConn) Write(b []byte) (n int, err error) {
32+
// return p.Writer.Write(b)
33+
// }
34+
35+
// func (p *PipeConn) Close() error {
36+
// return p.Closer.Close()
37+
// }
38+
39+
// func (p *PipeConn) LocalAddr() net.Addr {
40+
// return pipeAddr{}
41+
// }
42+
43+
// func (p *PipeConn) RemoteAddr() net.Addr {
44+
// return pipeAddr{}
45+
// }
46+
47+
// func (p *PipeConn) SetDeadline(t time.Time) error {
48+
// return nil
49+
// }
50+
51+
// func (p *PipeConn) SetReadDeadline(t time.Time) error {
52+
// return nil
53+
// }
54+
55+
// func (p *PipeConn) SetWriteDeadline(t time.Time) error {
56+
// return nil
57+
// }
58+
59+
// type pipeAddr struct{}
60+
61+
// func (pipeAddr) Network() string { return "pipe" }
62+
// func (pipeAddr) String() string { return "pipe" }

pkg/driver/external/driver.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ service Driver {
3131
rpc GetInfo(google.protobuf.Empty) returns (InfoResponse);
3232
}
3333

34+
// message NetConn {
35+
// bytes data = 1;
36+
// }
37+
3438
message InfoResponse{
3539
bytes info_json = 1;
3640
}

pkg/driver/external/server/methods.go

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,20 @@ package server
66
import (
77
"context"
88
"encoding/json"
9+
"net"
910
"path/filepath"
1011

1112
"google.golang.org/grpc/codes"
1213
"google.golang.org/grpc/status"
14+
15+
// "google.golang.org/protobuf/proto"
1316
"google.golang.org/protobuf/types/known/emptypb"
1417

18+
"github.com/lima-vm/lima/pkg/bicopy"
1519
pb "github.com/lima-vm/lima/pkg/driver/external"
1620
"github.com/lima-vm/lima/pkg/store"
1721
"github.com/lima-vm/lima/pkg/store/filenames"
22+
"github.com/sirupsen/logrus"
1823
)
1924

2025
func (s *DriverServer) Start(empty *emptypb.Empty, stream pb.Driver_StartServer) error {
@@ -72,11 +77,68 @@ func (s *DriverServer) GuestAgentConn(ctx context.Context, empty *emptypb.Empty)
7277
return &emptypb.Empty{}, status.Errorf(codes.Internal, "failed to establish guest agent connection: %v", err)
7378
}
7479

75-
go HandleProxyConnection(conn, filepath.Join("/Users/ansumansahoo/.lima/default/", filenames.GuestAgentSock))
80+
unixSocketPath := filepath.Join(s.driver.GetInfo().InstanceDir, filenames.GuestAgentSock)
81+
var d net.Dialer
82+
unixConn, err := d.DialContext(ctx, "unix", unixSocketPath)
83+
if err != nil {
84+
logrus.Errorf("Failed to connect to unix socket %s: %v", unixSocketPath, err)
85+
return nil, status.Errorf(codes.Internal, "failed to connect to unix socket %s: %v", unixSocketPath, err)
86+
}
87+
88+
go bicopy.Bicopy(conn, unixConn, nil)
89+
7690
s.logger.Debug("GuestAgentConn succeeded, connection handled")
7791
return &emptypb.Empty{}, nil
7892
}
7993

94+
// func (s *DriverServer) GuestAgentConn(stream pb.Driver_GuestAgentConnServer) error {
95+
// s.logger.Debug("Received GuestAgentConn request")
96+
// conn, err := s.driver.GuestAgentConn(stream.Context())
97+
// if err != nil {
98+
// s.logger.Errorf("GuestAgentConn failed: %v", err)
99+
// return err
100+
// }
101+
102+
// s.logger.Debug("GuestAgentConn succeeded")
103+
104+
// go func() {
105+
// for {
106+
// msg, err := stream.Recv()
107+
// if err != nil {
108+
// return
109+
// }
110+
// s.logger.Debugf("Received message from stream: %d bytes", len(msg.Data))
111+
// if len(msg.Data) > 0 {
112+
// _, err = conn.Write(msg.Data)
113+
// if err != nil {
114+
// s.logger.Errorf("Error writing to connection: %v", err)
115+
// conn.Close()
116+
// return
117+
// }
118+
// }
119+
// }
120+
// }()
121+
122+
// buf := make([]byte, 32*1<<10)
123+
// for {
124+
// n, err := conn.Read(buf)
125+
// if err != nil {
126+
// if errors.Is(err, io.EOF) {
127+
// s.logger.Debugf("Connection closed by guest agent %v", err)
128+
// return nil
129+
// }
130+
// return status.Errorf(codes.Internal, "error reading: %v", err)
131+
// }
132+
// s.logger.Debugf("Sending %d bytes to stream", n)
133+
134+
// msg := &pb.NetConn{Data: buf[:n]}
135+
// if err := stream.Send(msg); err != nil {
136+
// s.logger.Errorf("Failed to send message to stream: %v", err)
137+
// return err
138+
// }
139+
// }
140+
// }
141+
80142
func (s *DriverServer) GetInfo(ctx context.Context, empty *emptypb.Empty) (*pb.InfoResponse, error) {
81143
s.logger.Debug("Received GetInfo request")
82144
info := s.driver.GetInfo()

0 commit comments

Comments
 (0)