Skip to content

Commit f1649a2

Browse files
committed
Add readv dispatcher for darwin
1 parent d4c1ce7 commit f1649a2

File tree

7 files changed

+103
-87
lines changed

7 files changed

+103
-87
lines changed

internal/fdbased_darwin/endpoint.go

Lines changed: 21 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -60,57 +60,14 @@ type linkDispatcher interface {
6060
release()
6161
}
6262

63-
// PacketDispatchMode are the various supported methods of receiving and
64-
// dispatching packets from the underlying FD.
65-
type PacketDispatchMode int
66-
67-
// BatchSize is the number of packets to write in each syscall. It is 47
68-
// because when GVisorGSO is in use then a single 65KB TCP segment can get
69-
// split into 46 segments of 1420 bytes and a single 216 byte segment.
70-
const BatchSize = 47
71-
72-
const (
73-
// Readv is the default dispatch mode and is the least performant of the
74-
// dispatch options but the one that is supported by all underlying FD
75-
// types.
76-
Readv PacketDispatchMode = iota
77-
// RecvMMsg enables use of recvmmsg() syscall instead of readv() to
78-
// read inbound packets. This reduces # of syscalls needed to process
79-
// packets.
80-
//
81-
// NOTE: recvmmsg() is only supported for sockets, so if the underlying
82-
// FD is not a socket then the code will still fall back to the readv()
83-
// path.
84-
RecvMMsg
85-
// PacketMMap enables use of PACKET_RX_RING to receive packets from the
86-
// NIC. PacketMMap requires that the underlying FD be an AF_PACKET. The
87-
// primary use-case for this is runsc which uses an AF_PACKET FD to
88-
// receive packets from the veth device.
89-
PacketMMap
90-
)
91-
92-
func (p PacketDispatchMode) String() string {
93-
switch p {
94-
case Readv:
95-
return "Readv"
96-
case RecvMMsg:
97-
return "RecvMMsg"
98-
case PacketMMap:
99-
return "PacketMMap"
100-
default:
101-
return fmt.Sprintf("unknown packet dispatch mode '%d'", p)
102-
}
103-
}
104-
10563
var (
10664
_ stack.LinkEndpoint = (*endpoint)(nil)
10765
_ stack.GSOEndpoint = (*endpoint)(nil)
10866
)
10967

11068
// +stateify savable
11169
type fdInfo struct {
112-
fd int
113-
isSocket bool
70+
fd int
11471
}
11572

11673
// +stateify savable
@@ -137,10 +94,6 @@ type endpoint struct {
13794
// +checklocks:mu
13895
dispatcher stack.NetworkDispatcher
13996

140-
// packetDispatchMode controls the packet dispatcher used by this
141-
// endpoint.
142-
packetDispatchMode PacketDispatchMode
143-
14497
// wg keeps track of running goroutines.
14598
wg sync.WaitGroup `state:"nosave"`
14699

@@ -168,7 +121,7 @@ type endpoint struct {
168121
mtu uint32
169122

170123
batchSize int
171-
writeMsgX bool
124+
sendMsgX bool
172125
}
173126

174127
// Options specify the details about the fd-based endpoint to be created.
@@ -201,10 +154,6 @@ type Options struct {
201154
// include CapabilityDisconnectOk.
202155
DisconnectOk bool
203156

204-
// PacketDispatchMode specifies the type of inbound dispatcher to be
205-
// used for this endpoint.
206-
PacketDispatchMode PacketDispatchMode
207-
208157
// TXChecksumOffload if true, indicates that this endpoints capability
209158
// set should include CapabilityTXChecksumOffload.
210159
TXChecksumOffload bool
@@ -225,8 +174,8 @@ type Options struct {
225174
// from each FD.
226175
ProcessorsPerChannel int
227176

228-
MultiPendingPackets bool
229-
WriteMsgX bool
177+
RecvMsgX bool
178+
SendMsgX bool
230179
}
231180

232181
// New creates a new fd-based endpoint.
@@ -266,7 +215,7 @@ func New(opts *Options) (stack.LinkEndpoint, error) {
266215
return nil, fmt.Errorf("opts.MaxSyscallHeaderBytes is negative")
267216
}
268217
var batchSize int
269-
if opts.MultiPendingPackets {
218+
if opts.RecvMsgX {
270219
batchSize = int((512*1024)/(opts.MTU)) + 1
271220
} else {
272221
batchSize = 1
@@ -278,11 +227,10 @@ func New(opts *Options) (stack.LinkEndpoint, error) {
278227
closed: opts.ClosedFunc,
279228
addr: opts.Address,
280229
hdrSize: hdrSize,
281-
packetDispatchMode: opts.PacketDispatchMode,
282230
maxSyscallHeaderBytes: uintptr(opts.MaxSyscallHeaderBytes),
283231
writevMaxIovs: rawfile.MaxIovs,
284232
batchSize: batchSize,
285-
writeMsgX: opts.WriteMsgX,
233+
sendMsgX: opts.SendMsgX,
286234
}
287235
if e.maxSyscallHeaderBytes != 0 {
288236
if max := int(e.maxSyscallHeaderBytes / rawfile.SizeofIovec); max < e.writevMaxIovs {
@@ -296,14 +244,23 @@ func New(opts *Options) (stack.LinkEndpoint, error) {
296244
return nil, fmt.Errorf("unix.SetNonblock(%v) failed: %v", fd, err)
297245
}
298246

299-
e.fds = append(e.fds, fdInfo{fd: fd, isSocket: true})
247+
e.fds = append(e.fds, fdInfo{fd: fd})
300248
if opts.ProcessorsPerChannel == 0 {
301249
opts.ProcessorsPerChannel = common.Max(1, runtime.GOMAXPROCS(0)/len(opts.FDs))
302250
}
303251

304-
inboundDispatcher, err := newRecvMMsgDispatcher(fd, e, opts)
305-
if err != nil {
306-
return nil, fmt.Errorf("createInboundDispatcher(...) = %v", err)
252+
var inboundDispatcher linkDispatcher
253+
var err error
254+
if opts.RecvMsgX {
255+
inboundDispatcher, err = newRecvMMsgDispatcher(fd, e, opts)
256+
if err != nil {
257+
return nil, fmt.Errorf("newRecvMMsgDispatcher(%d, %+v) = %v", fd, e, err)
258+
}
259+
} else {
260+
inboundDispatcher, err = newReadVDispatcher(fd, e, opts)
261+
if err != nil {
262+
return nil, fmt.Errorf("newReadVDispatcher(%d, %+v) = %v", fd, e, err)
263+
}
307264
}
308265
e.inboundDispatchers = append(e.inboundDispatchers, inboundDispatcher)
309266
}
@@ -489,7 +446,7 @@ func (e *endpoint) writePacket(pkt *stack.PacketBuffer) tcpip.Error {
489446

490447
func (e *endpoint) sendBatch(batchFDInfo fdInfo, pkts []*stack.PacketBuffer) (int, tcpip.Error) {
491448
// Degrade to writePacket if underlying fd is not a socket.
492-
if !batchFDInfo.isSocket || !e.writeMsgX {
449+
if !e.sendMsgX {
493450
var written int
494451
var err tcpip.Error
495452
for written < len(pkts) {
@@ -597,7 +554,7 @@ func (e *endpoint) sendBatch(batchFDInfo fdInfo, pkts []*stack.PacketBuffer) (in
597554
func (e *endpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
598555
// Preallocate to avoid repeated reallocation as we append to batch.
599556
batch := make([]*stack.PacketBuffer, 0, e.batchSize)
600-
batchFDInfo := fdInfo{fd: -1, isSocket: false}
557+
batchFDInfo := fdInfo{fd: -1}
601558
sentPackets := 0
602559
for _, pkt := range pkts.AsSlice() {
603560
if len(batch) == 0 {

internal/fdbased_darwin/packet_dispatchers.go

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,70 @@ func (b *iovecBuffer) release() {
7979
}
8080
}
8181

82+
// readVDispatcher uses readv() system call to read inbound packets and
83+
// dispatches them.
84+
//
85+
// +stateify savable
86+
type readVDispatcher struct {
87+
stopfd.StopFD
88+
// fd is the file descriptor used to send and receive packets.
89+
fd int
90+
91+
// e is the endpoint this dispatcher is attached to.
92+
e *endpoint
93+
94+
// buf is the iovec buffer that contains the packet contents.
95+
buf *iovecBuffer
96+
97+
// mgr is the processor goroutine manager.
98+
mgr *processorManager
99+
}
100+
101+
func newReadVDispatcher(fd int, e *endpoint, opts *Options) (linkDispatcher, error) {
102+
stopFD, err := stopfd.New()
103+
if err != nil {
104+
return nil, err
105+
}
106+
d := &readVDispatcher{
107+
StopFD: stopFD,
108+
fd: fd,
109+
e: e,
110+
}
111+
d.buf = newIovecBuffer(opts.MTU)
112+
d.mgr = newProcessorManager(opts, e)
113+
d.mgr.start()
114+
return d, nil
115+
}
116+
117+
func (d *readVDispatcher) release() {
118+
d.buf.release()
119+
d.mgr.close()
120+
}
121+
122+
// dispatch reads one packet from the file descriptor and dispatches it.
123+
func (d *readVDispatcher) dispatch() (bool, tcpip.Error) {
124+
n, errno := rawfile.BlockingReadvUntilStopped(d.ReadFD, d.fd, d.buf.nextIovecs())
125+
if n <= 0 || errno != 0 {
126+
return false, TranslateErrno(errno)
127+
}
128+
129+
payload := d.buf.pullBuffer(n)
130+
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
131+
Payload: payload,
132+
})
133+
defer pkt.DecRef()
134+
135+
d.e.mu.RLock()
136+
addr := d.e.addr
137+
d.e.mu.RUnlock()
138+
if !d.e.parseInboundHeader(pkt, addr) {
139+
return false, nil
140+
}
141+
d.mgr.queuePacket(pkt, d.e.hdrSize > 0)
142+
d.mgr.wakeReady()
143+
return true, nil
144+
}
145+
82146
// recvMMsgDispatcher uses the recvmmsg system call to read inbound packets and
83147
// dispatches them.
84148
//
@@ -115,12 +179,7 @@ func newRecvMMsgDispatcher(fd int, e *endpoint, opts *Options) (linkDispatcher,
115179
if err != nil {
116180
return nil, err
117181
}
118-
var batchSize int
119-
if opts.MultiPendingPackets {
120-
batchSize = int((512*1024)/(opts.MTU)) + 1
121-
} else {
122-
batchSize = 1
123-
}
182+
batchSize := int((512*1024)/(opts.MTU)) + 1
124183
d := &recvMMsgDispatcher{
125184
StopFD: stopFD,
126185
fd: fd,

stack_mixed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (m *Mixed) tunLoop() {
7878
return
7979
}
8080
}
81-
if darwinTUN, isDarwinTUN := m.tun.(DarwinTUN); isDarwinTUN && m.multiPendingPackets {
81+
if darwinTUN, isDarwinTUN := m.tun.(DarwinTUN); isDarwinTUN && m.recvMsgX {
8282
m.batchLoopDarwin(darwinTUN)
8383
return
8484
}

stack_system.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type System struct {
4949
interfaceFinder control.InterfaceFinder
5050
frontHeadroom int
5151
txChecksumOffload bool
52-
multiPendingPackets bool
52+
recvMsgX bool
5353
}
5454

5555
type Session struct {
@@ -75,7 +75,7 @@ func NewSystem(options StackOptions) (Stack, error) {
7575
broadcastAddr: BroadcastAddr(options.TunOptions.Inet4Address),
7676
bindInterface: options.ForwarderBindInterface,
7777
interfaceFinder: options.InterfaceFinder,
78-
multiPendingPackets: options.TunOptions.EXP_MultiPendingPackets,
78+
recvMsgX: options.TunOptions.EXP_RecvMsgX,
7979
}
8080
if len(options.TunOptions.Inet4Address) > 0 {
8181
if !HasNextAddress(options.TunOptions.Inet4Address[0], 1) {
@@ -176,7 +176,7 @@ func (s *System) tunLoop() {
176176
return
177177
}
178178
}
179-
if darwinTUN, isDarwinTUN := s.tun.(DarwinTUN); isDarwinTUN && s.multiPendingPackets {
179+
if darwinTUN, isDarwinTUN := s.tun.(DarwinTUN); isDarwinTUN && s.recvMsgX {
180180
s.batchLoopDarwin(darwinTUN)
181181
return
182182
}

tun.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ type Options struct {
9797
// For library usages.
9898
EXP_DisableDNSHijack bool
9999

100-
EXP_MultiPendingPackets bool
101-
EXP_WriteMsgX bool
100+
// For darwin tun
101+
EXP_RecvMsgX bool
102+
EXP_SendMsgX bool
102103
}
103104

104105
func (o *Options) Inet4GatewayAddr() netip.Addr {

tun_darwin.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,14 @@ func New(options Options) (Tun, error) {
110110
unix.Close(tunFd)
111111
return nil, err
112112
}
113-
err = configure(tunFd, options.EXP_MultiPendingPackets, batchSize)
113+
err = configure(tunFd, options.EXP_RecvMsgX, batchSize)
114114
if err != nil {
115115
unix.Close(tunFd)
116116
return nil, err
117117
}
118118
} else {
119119
tunFd = options.FileDescriptor
120-
err := configure(tunFd, options.EXP_MultiPendingPackets, batchSize)
120+
err := configure(tunFd, options.EXP_RecvMsgX, batchSize)
121121
if err != nil {
122122
return nil, err
123123
}
@@ -132,7 +132,7 @@ func New(options Options) (Tun, error) {
132132
msgHdrs: make([]rawfile.MsgHdrX, batchSize),
133133
msgHdrsOutput: make([]rawfile.MsgHdrX, batchSize),
134134
stopFd: common.Must1(stopfd.New()),
135-
writeMsgX: options.EXP_WriteMsgX,
135+
writeMsgX: options.EXP_SendMsgX,
136136
}
137137
for i := 0; i < batchSize; i++ {
138138
nativeTun.iovecs[i] = newIovecBuffer(int(options.MTU))
@@ -316,12 +316,12 @@ func create(tunFd int, ifIndex int, name string, options Options) error {
316316
return nil
317317
}
318318

319-
func configure(tunFd int, multiPendingPackets bool, batchSize int) error {
319+
func configure(tunFd int, recvMsgX bool, batchSize int) error {
320320
err := unix.SetNonblock(tunFd, true)
321321
if err != nil {
322322
return os.NewSyscallError("SetNonblock", err)
323323
}
324-
if multiPendingPackets {
324+
if recvMsgX {
325325
const UTUN_OPT_MAX_PENDING_PACKETS = 16
326326
err = unix.SetsockoptInt(tunFd, 2, UTUN_OPT_MAX_PENDING_PACKETS, batchSize)
327327
if err != nil {

tun_darwin_gvisor.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@ func (t *NativeTun) WritePacket(pkt *stack.PacketBuffer) (int, error) {
3737

3838
func (t *NativeTun) NewEndpoint() (stack.LinkEndpoint, stack.NICOptions, error) {
3939
ep, err := fdbased.New(&fdbased.Options{
40-
FDs: []int{t.tunFd},
41-
MTU: t.options.MTU,
42-
RXChecksumOffload: true,
43-
PacketDispatchMode: fdbased.RecvMMsg,
44-
MultiPendingPackets: t.options.EXP_MultiPendingPackets,
45-
WriteMsgX: t.options.EXP_WriteMsgX,
40+
FDs: []int{t.tunFd},
41+
MTU: t.options.MTU,
42+
RXChecksumOffload: true,
43+
RecvMsgX: t.options.EXP_RecvMsgX,
44+
SendMsgX: t.options.EXP_SendMsgX,
4645
})
4746
if err != nil {
4847
return nil, stack.NICOptions{}, err

0 commit comments

Comments
 (0)