Skip to content

Commit 3bb8fec

Browse files
jwhitedraggi
authored andcommitted
conn, device, tun: implement vectorized I/O plumbing
Accept packet vectors for reading and writing in the tun.Device and conn.Bind interfaces, so that the internal plumbing between these interfaces now passes a vector of packets. Vectors move untouched between these interfaces, i.e. if 128 packets are received from conn.Bind.Read(), 128 packets are passed to tun.Device.Write(). There is no internal buffering. Currently, existing implementations are only adjusted to have vectors of length one. Subsequent patches will improve that. Also, as a related fixup, use the unix and windows packages rather than the syscall package when possible. Co-authored-by: James Tucker <[email protected]> Signed-off-by: James Tucker <[email protected]> Signed-off-by: Jordan Whited <[email protected]> Signed-off-by: Jason A. Donenfeld <[email protected]>
1 parent 2163620 commit 3bb8fec

25 files changed

+1026
-494
lines changed

conn/bind_linux.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ func (bind *LinuxSocketBind) SetMark(value uint32) error {
193193
return nil
194194
}
195195

196+
func (bind *LinuxSocketBind) BatchSize() int {
197+
return 1
198+
}
199+
196200
func (bind *LinuxSocketBind) Close() error {
197201
// Take a readlock to shut down the sockets...
198202
bind.mu.RLock()
@@ -223,29 +227,39 @@ func (bind *LinuxSocketBind) Close() error {
223227
return err2
224228
}
225229

226-
func (bind *LinuxSocketBind) receiveIPv4(buf []byte) (int, Endpoint, error) {
230+
func (bind *LinuxSocketBind) receiveIPv4(buffs [][]byte, sizes []int, eps []Endpoint) (int, error) {
227231
bind.mu.RLock()
228232
defer bind.mu.RUnlock()
229233
if bind.sock4 == -1 {
230-
return 0, nil, net.ErrClosed
234+
return 0, net.ErrClosed
231235
}
232236
var end LinuxSocketEndpoint
233-
n, err := receive4(bind.sock4, buf, &end)
234-
return n, &end, err
237+
n, err := receive4(bind.sock4, buffs[0], &end)
238+
if err != nil {
239+
return 0, err
240+
}
241+
eps[0] = &end
242+
sizes[0] = n
243+
return 1, nil
235244
}
236245

237-
func (bind *LinuxSocketBind) receiveIPv6(buf []byte) (int, Endpoint, error) {
246+
func (bind *LinuxSocketBind) receiveIPv6(buffs [][]byte, sizes []int, eps []Endpoint) (int, error) {
238247
bind.mu.RLock()
239248
defer bind.mu.RUnlock()
240249
if bind.sock6 == -1 {
241-
return 0, nil, net.ErrClosed
250+
return 0, net.ErrClosed
242251
}
243252
var end LinuxSocketEndpoint
244-
n, err := receive6(bind.sock6, buf, &end)
245-
return n, &end, err
253+
n, err := receive6(bind.sock6, buffs[0], &end)
254+
if err != nil {
255+
return 0, err
256+
}
257+
eps[0] = &end
258+
sizes[0] = n
259+
return 1, nil
246260
}
247261

248-
func (bind *LinuxSocketBind) Send(buff []byte, end Endpoint) error {
262+
func (bind *LinuxSocketBind) Send(buffs [][]byte, end Endpoint) error {
249263
nend, ok := end.(*LinuxSocketEndpoint)
250264
if !ok {
251265
return ErrWrongEndpointType
@@ -256,13 +270,24 @@ func (bind *LinuxSocketBind) Send(buff []byte, end Endpoint) error {
256270
if bind.sock4 == -1 {
257271
return net.ErrClosed
258272
}
259-
return send4(bind.sock4, nend, buff)
273+
for _, buff := range buffs {
274+
err := send4(bind.sock4, nend, buff)
275+
if err != nil {
276+
return err
277+
}
278+
}
260279
} else {
261280
if bind.sock6 == -1 {
262281
return net.ErrClosed
263282
}
264-
return send6(bind.sock6, nend, buff)
283+
for _, buff := range buffs {
284+
err := send6(bind.sock6, nend, buff)
285+
if err != nil {
286+
return err
287+
}
288+
}
265289
}
290+
return nil
266291
}
267292

268293
func (end *LinuxSocketEndpoint) SrcIP() netip.Addr {

conn/bind_std.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ again:
128128
return fns, uint16(port), nil
129129
}
130130

131+
func (bind *StdNetBind) BatchSize() int {
132+
return 1
133+
}
134+
131135
func (bind *StdNetBind) Close() error {
132136
bind.mu.Lock()
133137
defer bind.mu.Unlock()
@@ -150,20 +154,30 @@ func (bind *StdNetBind) Close() error {
150154
}
151155

152156
func (*StdNetBind) makeReceiveIPv4(conn *net.UDPConn) ReceiveFunc {
153-
return func(buff []byte) (int, Endpoint, error) {
154-
n, endpoint, err := conn.ReadFromUDPAddrPort(buff)
155-
return n, asEndpoint(endpoint), err
157+
return func(buffs [][]byte, sizes []int, eps []Endpoint) (n int, err error) {
158+
size, endpoint, err := conn.ReadFromUDPAddrPort(buffs[0])
159+
if err == nil {
160+
sizes[0] = size
161+
eps[0] = asEndpoint(endpoint)
162+
return 1, nil
163+
}
164+
return 0, err
156165
}
157166
}
158167

159168
func (*StdNetBind) makeReceiveIPv6(conn *net.UDPConn) ReceiveFunc {
160-
return func(buff []byte) (int, Endpoint, error) {
161-
n, endpoint, err := conn.ReadFromUDPAddrPort(buff)
162-
return n, asEndpoint(endpoint), err
169+
return func(buffs [][]byte, sizes []int, eps []Endpoint) (n int, err error) {
170+
size, endpoint, err := conn.ReadFromUDPAddrPort(buffs[0])
171+
if err == nil {
172+
sizes[0] = size
173+
eps[0] = asEndpoint(endpoint)
174+
return 1, nil
175+
}
176+
return 0, err
163177
}
164178
}
165179

166-
func (bind *StdNetBind) Send(buff []byte, endpoint Endpoint) error {
180+
func (bind *StdNetBind) Send(buffs [][]byte, endpoint Endpoint) error {
167181
var err error
168182
nend, ok := endpoint.(StdNetEndpoint)
169183
if !ok {
@@ -186,8 +200,13 @@ func (bind *StdNetBind) Send(buff []byte, endpoint Endpoint) error {
186200
if conn == nil {
187201
return syscall.EAFNOSUPPORT
188202
}
189-
_, err = conn.WriteToUDPAddrPort(buff, addrPort)
190-
return err
203+
for _, buff := range buffs {
204+
_, err = conn.WriteToUDPAddrPort(buff, addrPort)
205+
if err != nil {
206+
return err
207+
}
208+
}
209+
return nil
191210
}
192211

193212
// endpointPool contains a re-usable set of mapping from netip.AddrPort to Endpoint.

conn/bind_windows.go

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,11 @@ func (bind *WinRingBind) Close() error {
321321
return nil
322322
}
323323

324+
func (bind *WinRingBind) BatchSize() int {
325+
// TODO: implement batching in and out of the ring
326+
return 1
327+
}
328+
324329
func (bind *WinRingBind) SetMark(mark uint32) error {
325330
return nil
326331
}
@@ -409,16 +414,22 @@ retry:
409414
return n, &ep, nil
410415
}
411416

412-
func (bind *WinRingBind) receiveIPv4(buf []byte) (int, Endpoint, error) {
417+
func (bind *WinRingBind) receiveIPv4(buffs [][]byte, sizes []int, eps []Endpoint) (int, error) {
413418
bind.mu.RLock()
414419
defer bind.mu.RUnlock()
415-
return bind.v4.Receive(buf, &bind.isOpen)
420+
n, ep, err := bind.v4.Receive(buffs[0], &bind.isOpen)
421+
sizes[0] = n
422+
eps[0] = ep
423+
return 1, err
416424
}
417425

418-
func (bind *WinRingBind) receiveIPv6(buf []byte) (int, Endpoint, error) {
426+
func (bind *WinRingBind) receiveIPv6(buffs [][]byte, sizes []int, eps []Endpoint) (int, error) {
419427
bind.mu.RLock()
420428
defer bind.mu.RUnlock()
421-
return bind.v6.Receive(buf, &bind.isOpen)
429+
n, ep, err := bind.v6.Receive(buffs[0], &bind.isOpen)
430+
sizes[0] = n
431+
eps[0] = ep
432+
return 1, err
422433
}
423434

424435
func (bind *afWinRingBind) Send(buf []byte, nend *WinRingEndpoint, isOpen *atomic.Uint32) error {
@@ -473,32 +484,38 @@ func (bind *afWinRingBind) Send(buf []byte, nend *WinRingEndpoint, isOpen *atomi
473484
return winrio.SendEx(bind.rq, dataBuffer, 1, nil, addressBuffer, nil, nil, 0, 0)
474485
}
475486

476-
func (bind *WinRingBind) Send(buf []byte, endpoint Endpoint) error {
487+
func (bind *WinRingBind) Send(buffs [][]byte, endpoint Endpoint) error {
477488
nend, ok := endpoint.(*WinRingEndpoint)
478489
if !ok {
479490
return ErrWrongEndpointType
480491
}
481492
bind.mu.RLock()
482493
defer bind.mu.RUnlock()
483-
switch nend.family {
484-
case windows.AF_INET:
485-
if bind.v4.blackhole {
486-
return nil
487-
}
488-
return bind.v4.Send(buf, nend, &bind.isOpen)
489-
case windows.AF_INET6:
490-
if bind.v6.blackhole {
491-
return nil
494+
for _, buf := range buffs {
495+
switch nend.family {
496+
case windows.AF_INET:
497+
if bind.v4.blackhole {
498+
continue
499+
}
500+
if err := bind.v4.Send(buf, nend, &bind.isOpen); err != nil {
501+
return err
502+
}
503+
case windows.AF_INET6:
504+
if bind.v6.blackhole {
505+
continue
506+
}
507+
if err := bind.v6.Send(buf, nend, &bind.isOpen); err != nil {
508+
return err
509+
}
492510
}
493-
return bind.v6.Send(buf, nend, &bind.isOpen)
494511
}
495512
return nil
496513
}
497514

498-
func (bind *StdNetBind) BindSocketToInterface4(interfaceIndex uint32, blackhole bool) error {
499-
bind.mu.Lock()
500-
defer bind.mu.Unlock()
501-
sysconn, err := bind.ipv4.SyscallConn()
515+
func (s *StdNetBind) BindSocketToInterface4(interfaceIndex uint32, blackhole bool) error {
516+
s.mu.Lock()
517+
defer s.mu.Unlock()
518+
sysconn, err := s.ipv4.SyscallConn()
502519
if err != nil {
503520
return err
504521
}
@@ -511,14 +528,14 @@ func (bind *StdNetBind) BindSocketToInterface4(interfaceIndex uint32, blackhole
511528
if err != nil {
512529
return err
513530
}
514-
bind.blackhole4 = blackhole
531+
s.blackhole4 = blackhole
515532
return nil
516533
}
517534

518-
func (bind *StdNetBind) BindSocketToInterface6(interfaceIndex uint32, blackhole bool) error {
519-
bind.mu.Lock()
520-
defer bind.mu.Unlock()
521-
sysconn, err := bind.ipv6.SyscallConn()
535+
func (s *StdNetBind) BindSocketToInterface6(interfaceIndex uint32, blackhole bool) error {
536+
s.mu.Lock()
537+
defer s.mu.Unlock()
538+
sysconn, err := s.ipv6.SyscallConn()
522539
if err != nil {
523540
return err
524541
}
@@ -531,7 +548,7 @@ func (bind *StdNetBind) BindSocketToInterface6(interfaceIndex uint32, blackhole
531548
if err != nil {
532549
return err
533550
}
534-
bind.blackhole6 = blackhole
551+
s.blackhole6 = blackhole
535552
return nil
536553
}
537554

conn/bindtest/bindtest.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -89,32 +89,39 @@ func (c *ChannelBind) Close() error {
8989
return nil
9090
}
9191

92+
func (c *ChannelBind) BatchSize() int { return 1 }
93+
9294
func (c *ChannelBind) SetMark(mark uint32) error { return nil }
9395

9496
func (c *ChannelBind) makeReceiveFunc(ch chan []byte) conn.ReceiveFunc {
95-
return func(b []byte) (n int, ep conn.Endpoint, err error) {
97+
return func(buffs [][]byte, sizes []int, eps []conn.Endpoint) (n int, err error) {
9698
select {
9799
case <-c.closeSignal:
98-
return 0, nil, net.ErrClosed
100+
return 0, net.ErrClosed
99101
case rx := <-ch:
100-
return copy(b, rx), c.target6, nil
102+
copied := copy(buffs[0], rx)
103+
sizes[0] = copied
104+
eps[0] = c.target6
105+
return 1, nil
101106
}
102107
}
103108
}
104109

105-
func (c *ChannelBind) Send(b []byte, ep conn.Endpoint) error {
106-
select {
107-
case <-c.closeSignal:
108-
return net.ErrClosed
109-
default:
110-
bc := make([]byte, len(b))
111-
copy(bc, b)
112-
if ep.(ChannelEndpoint) == c.target4 {
113-
*c.tx4 <- bc
114-
} else if ep.(ChannelEndpoint) == c.target6 {
115-
*c.tx6 <- bc
116-
} else {
117-
return os.ErrInvalid
110+
func (c *ChannelBind) Send(buffs [][]byte, ep conn.Endpoint) error {
111+
for _, b := range buffs {
112+
select {
113+
case <-c.closeSignal:
114+
return net.ErrClosed
115+
default:
116+
bc := make([]byte, len(b))
117+
copy(bc, b)
118+
if ep.(ChannelEndpoint) == c.target4 {
119+
*c.tx4 <- bc
120+
} else if ep.(ChannelEndpoint) == c.target6 {
121+
*c.tx6 <- bc
122+
} else {
123+
return os.ErrInvalid
124+
}
118125
}
119126
}
120127
return nil

conn/conn.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,17 @@ import (
1515
"strings"
1616
)
1717

18-
// A ReceiveFunc receives a single inbound packet from the network.
19-
// It writes the data into b. n is the length of the packet.
20-
// ep is the remote endpoint.
21-
type ReceiveFunc func(b []byte) (n int, ep Endpoint, err error)
18+
const (
19+
DefaultBatchSize = 1 // maximum number of packets handled per read and write
20+
)
21+
22+
// A ReceiveFunc receives at least one packet from the network and writes them
23+
// into packets. On a successful read it returns the number of elements of
24+
// sizes, packets, and endpoints that should be evaluated. Some elements of
25+
// sizes may be zero, and callers should ignore them. Callers must pass a sizes
26+
// and eps slice with a length greater than or equal to the length of packets.
27+
// These lengths must not exceed the length of the associated Bind.BatchSize().
28+
type ReceiveFunc func(packets [][]byte, sizes []int, eps []Endpoint) (n int, err error)
2229

2330
// A Bind listens on a port for both IPv6 and IPv4 UDP traffic.
2431
//
@@ -38,11 +45,16 @@ type Bind interface {
3845
// This mark is passed to the kernel as the socket option SO_MARK.
3946
SetMark(mark uint32) error
4047

41-
// Send writes a packet b to address ep.
42-
Send(b []byte, ep Endpoint) error
48+
// Send writes one or more packets in buffs to address ep. The length of
49+
// buffs must not exceed BatchSize().
50+
Send(buffs [][]byte, ep Endpoint) error
4351

4452
// ParseEndpoint creates a new endpoint from a string.
4553
ParseEndpoint(s string) (Endpoint, error)
54+
55+
// BatchSize is the number of buffers expected to be passed to
56+
// the ReceiveFuncs, and the maximum expected to be passed to SendBatch.
57+
BatchSize() int
4658
}
4759

4860
// BindSocketToInterface is implemented by Bind objects that support being

conn/conn_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/* SPDX-License-Identifier: MIT
2+
*
3+
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
4+
*/
5+
6+
package conn
7+
8+
import (
9+
"testing"
10+
)
11+
12+
func TestPrettyName(t *testing.T) {
13+
var (
14+
recvFunc ReceiveFunc = func(buffs [][]byte, sizes []int, eps []Endpoint) (n int, err error) { return }
15+
)
16+
17+
const want = "TestPrettyName"
18+
19+
t.Run("ReceiveFunc.PrettyName", func(t *testing.T) {
20+
if got := recvFunc.PrettyName(); got != want {
21+
t.Errorf("PrettyName() = %v, want %v", got, want)
22+
}
23+
})
24+
}

0 commit comments

Comments
 (0)