Skip to content

Commit d49ac31

Browse files
committed
Fix packet forwarding between vz and socket_vmnet
We used external package (tcpproxy) for proxying between unix stream and datagram sockets. This package cannot handle ENOBUFS error, expected condition on BSD based systems, and worse, it hides errors and stop forwarding packets silently when write to vz socket fails with ENOBUFS[1]. Fix the issues by replacing tcpproxy with a simpler and more direct implementation that will be easier to maintain. Fixes: - Fix error handling if write to vz datagram socket fail with ENOBUFS. We retry the write until it succeeds with a very short sleep between retries. Similar solution is used in gvisor-tap-vsock[2]. - Fix error handling if we could not read packet header or body from socket_vmnet stream socket. Previously we logged an error and continue to send corrupted packet to vz from the point of the failure. - Fix error handling if writing a packet to socket_vmnet stream socket returned after writing partial packet. Now we handle short writes and write the complete packet. Previously would break the protocol and continue to send corrupted packet from the point of the failure. - Log error if forwarding packets from vz to socket_vmnet or from socket_vmnet to vz failed. Simplification: - Use binary.Read() and binary.Write() to read and write qemu packet header. Visibility: - Make QEMUPacketConn private since it is an implementation detail of vz when using socket_vmnet. Testing: - Add a packet forwarding test covering the happy path in 10 milliseconds. [1] lima-vm/socket_vmnet#39 [2] containers/gvisor-tap-vsock#370 Signed-off-by: Nir Soffer <[email protected]>
1 parent 5a98e62 commit d49ac31

File tree

3 files changed

+245
-53
lines changed

3 files changed

+245
-53
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ require (
4747
google.golang.org/protobuf v1.35.1
4848
gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473
4949
gotest.tools/v3 v3.5.1
50-
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
5150
k8s.io/api v0.31.1
5251
k8s.io/apimachinery v0.31.1
5352
k8s.io/client-go v0.31.1
@@ -127,6 +126,7 @@ require (
127126
gopkg.in/yaml.v2 v2.4.0 // indirect
128127
gopkg.in/yaml.v3 v3.0.1 // indirect
129128
gvisor.dev/gvisor v0.0.0-20231023213702-2691a8f9b1cf // indirect
129+
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
130130
k8s.io/klog/v2 v2.130.1 // indirect
131131
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
132132
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect

pkg/vz/network_darwin.go

Lines changed: 93 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@
33
package vz
44

55
import (
6-
"context"
76
"encoding/binary"
7+
"errors"
88
"io"
99
"net"
1010
"os"
11+
"sync"
12+
"syscall"
1113
"time"
1214

1315
"github.com/balajiv113/fd"
1416

1517
"github.com/sirupsen/logrus"
16-
"inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod
1718
)
1819

1920
func PassFDToUnix(unixSock string) (*os.File, error) {
@@ -40,7 +41,7 @@ func DialQemu(unixSock string) (*os.File, error) {
4041
if err != nil {
4142
return nil, err
4243
}
43-
qemuConn := &QEMUPacketConn{unixConn: unixConn}
44+
qemuConn := &qemuPacketConn{Conn: unixConn}
4445

4546
server, client, err := createSockPair()
4647
if err != nil {
@@ -50,77 +51,117 @@ func DialQemu(unixSock string) (*os.File, error) {
5051
if err != nil {
5152
return nil, err
5253
}
54+
vzConn := &packetConn{Conn: dgramConn}
5355

54-
remote := tcpproxy.DialProxy{
55-
DialContext: func(context.Context, string, string) (net.Conn, error) {
56-
return dgramConn, nil
57-
},
58-
}
59-
go remote.HandleConn(qemuConn)
56+
go forwardPackets(qemuConn, vzConn)
6057

6158
return client, nil
6259
}
6360

64-
// QEMUPacketConn converts raw network packet to a QEMU supported network packet.
65-
type QEMUPacketConn struct {
66-
unixConn net.Conn
61+
func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
62+
defer qemuConn.Close()
63+
defer vzConn.Close()
64+
65+
var wg sync.WaitGroup
66+
wg.Add(2)
67+
68+
go func() {
69+
defer wg.Done()
70+
if _, err := io.Copy(qemuConn, vzConn); err != nil {
71+
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
72+
}
73+
}()
74+
75+
go func() {
76+
defer wg.Done()
77+
if _, err := io.Copy(vzConn, qemuConn); err != nil {
78+
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
79+
}
80+
}()
81+
82+
wg.Wait()
6783
}
6884

69-
var _ net.Conn = (*QEMUPacketConn)(nil)
85+
// qemuPacketConn converts raw network packet to a QEMU supported network packet.
86+
type qemuPacketConn struct {
87+
net.Conn
88+
}
7089

71-
// Read gets rid of the QEMU header packet and returns the raw packet as response.
72-
func (v *QEMUPacketConn) Read(b []byte) (n int, err error) {
73-
header := make([]byte, 4)
74-
_, err = io.ReadFull(v.unixConn, header)
75-
if err != nil {
76-
logrus.Errorln("Failed to read header", err)
90+
// Read reads a QEMU packet and returns the contained raw packet. Returns (len,
91+
// nil) if a packet was read, and (0, err) on error. Errors means the prorocol
92+
// is broken and the socket must be closed.
93+
func (c *qemuPacketConn) Read(b []byte) (n int, err error) {
94+
var size uint32
95+
if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil {
96+
// Likely connection closed by peer.
97+
return 0, err
7798
}
7899

79-
size := binary.BigEndian.Uint32(header)
80-
reader := io.LimitReader(v.unixConn, int64(size))
100+
reader := io.LimitReader(c.Conn, int64(size))
81101
_, err = reader.Read(b)
82102
if err != nil {
83-
logrus.Errorln("Failed to read packet", err)
103+
// Likely connection closed by peer.
104+
return 0, err
84105
}
85106
return int(size), nil
86107
}
87108

88-
// Write puts QEMU header packet first and then writes the raw packet.
89-
func (v *QEMUPacketConn) Write(b []byte) (n int, err error) {
90-
header := make([]byte, 4)
91-
binary.BigEndian.PutUint32(header, uint32(len(b)))
92-
_, err = v.unixConn.Write(header)
93-
if err != nil {
94-
logrus.Errorln("Failed to write header", err)
109+
// Write writes a QEMU packet containing the raw packet. Returns (len(b), nil)
110+
// if a packet was written, and (0, err) if a packet was not fully written.
111+
// Errors means the prorocol is broken and the socket must be closed.
112+
func (c *qemuPacketConn) Write(b []byte) (int, error) {
113+
size := len(b)
114+
header := uint32(size)
115+
if err := binary.Write(c.Conn, binary.BigEndian, header); err != nil {
116+
return 0, err
95117
}
96118

97-
write, err := v.unixConn.Write(b)
98-
if err != nil {
99-
logrus.Errorln("Failed to write packet", err)
119+
start := 0
120+
for start < size {
121+
nw, err := c.Conn.Write(b[start:])
122+
if err != nil {
123+
return 0, err
124+
}
125+
start += nw
100126
}
101-
return write, nil
127+
return size, nil
102128
}
103129

104-
func (v *QEMUPacketConn) Close() error {
105-
return v.unixConn.Close()
106-
}
107-
108-
func (v *QEMUPacketConn) LocalAddr() net.Addr {
109-
return v.unixConn.LocalAddr()
110-
}
111-
112-
func (v *QEMUPacketConn) RemoteAddr() net.Addr {
113-
return v.unixConn.RemoteAddr()
114-
}
115-
116-
func (v *QEMUPacketConn) SetDeadline(t time.Time) error {
117-
return v.unixConn.SetDeadline(t)
118-
}
130+
// Testing show that retries are very rare (e.g 24 of 62,499,008 packets) and
131+
// requires 1 or 2 retries to complete the write. A 100 microseconds sleep loop
132+
// consumes about 4% CPU on M1 Pro.
133+
const writeRetryDelay = 100 * time.Microsecond
119134

120-
func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error {
121-
return v.unixConn.SetReadDeadline(t)
135+
// packetConn handles ENOBUFS errors when writing to unixgram socket.
136+
type packetConn struct {
137+
net.Conn
122138
}
123139

124-
func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error {
125-
return v.unixConn.SetWriteDeadline(t)
140+
// Write writes a packet retrying on ENOBUFS errors.
141+
func (c *packetConn) Write(b []byte) (int, error) {
142+
var retries uint64
143+
for {
144+
n, err := c.Conn.Write(b)
145+
if n == 0 && err != nil && errors.Is(err, syscall.ENOBUFS) {
146+
// This is an expected condition on BSD based system. The kernel
147+
// does not support blocking until buffer space is available.
148+
// The only way to recover is to retry the call until it
149+
// succeeds, or drop the packet.
150+
// Handled in a similar way in gvisor-tap-vsock:
151+
// https://github.com/containers/gvisor-tap-vsock/issues/367
152+
time.Sleep(writeRetryDelay)
153+
retries++
154+
continue
155+
}
156+
if err != nil {
157+
return 0, err
158+
}
159+
if n < len(b) {
160+
return n, errors.New("incomplete write to unixgram socket")
161+
}
162+
if retries > 0 {
163+
logrus.Debugf("Write completed after %d retries", retries)
164+
}
165+
return n, nil
166+
}
126167
}

pkg/vz/network_darwin_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
//go:build darwin && !no_vz
2+
3+
package vz
4+
5+
import (
6+
"encoding/binary"
7+
"fmt"
8+
"net"
9+
"path/filepath"
10+
"testing"
11+
)
12+
13+
const vmnetMaxPacketSize = 1514
14+
const packetsCount = 1000
15+
16+
func TestDialQemu(t *testing.T) {
17+
listener, err := listenUnix(t.TempDir())
18+
if err != nil {
19+
t.Fatal(err)
20+
}
21+
defer listener.Close()
22+
t.Logf("Listening at %q", listener.Addr())
23+
24+
errc := make(chan error, 2)
25+
26+
// Start the fake vmnet server.
27+
go func() {
28+
t.Log("Fake vmnet started")
29+
errc <- serveOneClient(listener)
30+
t.Log("Fake vmnet finished")
31+
}()
32+
33+
// Connect to the fake vmnet server.
34+
client, err := DialQemu(listener.Addr().String())
35+
if err != nil {
36+
t.Fatal(err)
37+
}
38+
t.Log("Connected to fake vment server")
39+
40+
dgramConn, err := net.FileConn(client)
41+
if err != nil {
42+
t.Fatal(err)
43+
}
44+
45+
vzConn := packetConn{Conn: dgramConn}
46+
defer vzConn.Close()
47+
48+
go func() {
49+
t.Log("Sender started")
50+
buf := make([]byte, vmnetMaxPacketSize)
51+
for i := 0; i < vmnetMaxPacketSize; i++ {
52+
buf[i] = 0x55
53+
}
54+
55+
// data packet format:
56+
// 0-4 packet number
57+
// 4-1514 0x55 ...
58+
for i := 0; i < packetsCount; i++ {
59+
binary.BigEndian.PutUint32(buf, uint32(i))
60+
if _, err := vzConn.Write(buf); err != nil {
61+
errc <- err
62+
return
63+
}
64+
}
65+
t.Logf("Sent %d data packets", packetsCount)
66+
67+
// quit packet format:
68+
// 0-4: "quit"
69+
copy(buf[:4], []byte("quit"))
70+
if _, err := vzConn.Write(buf[:4]); err != nil {
71+
errc <- err
72+
return
73+
}
74+
75+
errc <- nil
76+
t.Log("Sender finished")
77+
}()
78+
79+
// Read and verify packets to the server.
80+
81+
buf := make([]byte, vmnetMaxPacketSize)
82+
83+
t.Logf("Receiving and verifying data packets...")
84+
for i := 0; i < packetsCount; i++ {
85+
n, err := vzConn.Read(buf)
86+
if err != nil {
87+
t.Fatal(err)
88+
}
89+
if n < vmnetMaxPacketSize {
90+
t.Fatalf("Expected %d bytes, got %d", vmnetMaxPacketSize, n)
91+
}
92+
93+
number := binary.BigEndian.Uint32(buf[:4])
94+
if number != uint32(i) {
95+
t.Fatalf("Expected packet %d, got packet %d", i, number)
96+
}
97+
98+
for j := 4; j < vmnetMaxPacketSize; j++ {
99+
if buf[j] != 0x55 {
100+
t.Fatalf("Expected byte 0x55 at offset %d, got 0x%02x", j, buf[j])
101+
}
102+
}
103+
}
104+
t.Logf("Recived and verified %d data packets", packetsCount)
105+
106+
for i := 0; i < 2; i++ {
107+
err := <-errc
108+
if err != nil {
109+
t.Fatal(err)
110+
}
111+
}
112+
}
113+
114+
// serveOneClient accepts one client and echo back received packets until a
115+
// "quit" packet is sent.
116+
func serveOneClient(listener *net.UnixListener) error {
117+
conn, err := listener.Accept()
118+
if err != nil {
119+
return err
120+
}
121+
qemuConn := qemuPacketConn{Conn: conn}
122+
defer qemuConn.Close()
123+
124+
buf := make([]byte, vmnetMaxPacketSize)
125+
for {
126+
nr, err := qemuConn.Read(buf)
127+
if err != nil {
128+
return err
129+
}
130+
if string(buf[:4]) == "quit" {
131+
return nil
132+
}
133+
nw, err := qemuConn.Write(buf[:nr])
134+
if err != nil {
135+
return err
136+
}
137+
if nw != nr {
138+
return fmt.Errorf("incomplete write: expected: %d, wrote: %d", nr, nw)
139+
}
140+
}
141+
}
142+
143+
// listenUnix creates and listen to unix socket under dir
144+
func listenUnix(dir string) (*net.UnixListener, error) {
145+
sock := filepath.Join(dir, "sock")
146+
addr, err := net.ResolveUnixAddr("unix", sock)
147+
if err != nil {
148+
return nil, err
149+
}
150+
return net.ListenUnix("unix", addr)
151+
}

0 commit comments

Comments
 (0)