Skip to content

Commit cc7b29b

Browse files
jwhitedraggi
authored andcommitted
device: move Queue{In,Out}boundElement Mutex to container type
Queue{In,Out}boundElement locking can contribute to significant overhead via sync.Mutex.lockSlow() in some environments. These types are passed throughout the device package as elements in a slice, so move the per-element Mutex to a container around the slice. Signed-off-by: Jordan Whited <[email protected]>
1 parent ceb9a09 commit cc7b29b

File tree

6 files changed

+123
-113
lines changed

6 files changed

+123
-113
lines changed

device/channels.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ import (
1919
// call wg.Done to remove the initial reference.
2020
// When the refcount hits 0, the queue's channel is closed.
2121
type outboundQueue struct {
22-
c chan *[]*QueueOutboundElement
22+
c chan *QueueOutboundElementsContainer
2323
wg sync.WaitGroup
2424
}
2525

2626
func newOutboundQueue() *outboundQueue {
2727
q := &outboundQueue{
28-
c: make(chan *[]*QueueOutboundElement, QueueOutboundSize),
28+
c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
2929
}
3030
q.wg.Add(1)
3131
go func() {
@@ -37,13 +37,13 @@ func newOutboundQueue() *outboundQueue {
3737

3838
// A inboundQueue is similar to an outboundQueue; see those docs.
3939
type inboundQueue struct {
40-
c chan *[]*QueueInboundElement
40+
c chan *QueueInboundElementsContainer
4141
wg sync.WaitGroup
4242
}
4343

4444
func newInboundQueue() *inboundQueue {
4545
q := &inboundQueue{
46-
c: make(chan *[]*QueueInboundElement, QueueInboundSize),
46+
c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
4747
}
4848
q.wg.Add(1)
4949
go func() {
@@ -72,7 +72,7 @@ func newHandshakeQueue() *handshakeQueue {
7272
}
7373

7474
type autodrainingInboundQueue struct {
75-
c chan *[]*QueueInboundElement
75+
c chan *QueueInboundElementsContainer
7676
}
7777

7878
// newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd.
@@ -81,7 +81,7 @@ type autodrainingInboundQueue struct {
8181
// some other means, such as sending a sentinel nil values.
8282
func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
8383
q := &autodrainingInboundQueue{
84-
c: make(chan *[]*QueueInboundElement, QueueInboundSize),
84+
c: make(chan *QueueInboundElementsContainer, QueueInboundSize),
8585
}
8686
runtime.SetFinalizer(q, device.flushInboundQueue)
8787
return q
@@ -90,21 +90,21 @@ func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
9090
func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
9191
for {
9292
select {
93-
case elems := <-q.c:
94-
for _, elem := range *elems {
95-
elem.Lock()
93+
case elemsContainer := <-q.c:
94+
elemsContainer.Lock()
95+
for _, elem := range elemsContainer.elems {
9696
device.PutMessageBuffer(elem.buffer)
9797
device.PutInboundElement(elem)
9898
}
99-
device.PutInboundElementsSlice(elems)
99+
device.PutInboundElementsContainer(elemsContainer)
100100
default:
101101
return
102102
}
103103
}
104104
}
105105

106106
type autodrainingOutboundQueue struct {
107-
c chan *[]*QueueOutboundElement
107+
c chan *QueueOutboundElementsContainer
108108
}
109109

110110
// newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd.
@@ -114,7 +114,7 @@ type autodrainingOutboundQueue struct {
114114
// All sends to the channel must be best-effort, because there may be no receivers.
115115
func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
116116
q := &autodrainingOutboundQueue{
117-
c: make(chan *[]*QueueOutboundElement, QueueOutboundSize),
117+
c: make(chan *QueueOutboundElementsContainer, QueueOutboundSize),
118118
}
119119
runtime.SetFinalizer(q, device.flushOutboundQueue)
120120
return q
@@ -123,13 +123,13 @@ func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
123123
func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
124124
for {
125125
select {
126-
case elems := <-q.c:
127-
for _, elem := range *elems {
128-
elem.Lock()
126+
case elemsContainer := <-q.c:
127+
elemsContainer.Lock()
128+
for _, elem := range elemsContainer.elems {
129129
device.PutMessageBuffer(elem.buffer)
130130
device.PutOutboundElement(elem)
131131
}
132-
device.PutOutboundElementsSlice(elems)
132+
device.PutOutboundElementsContainer(elemsContainer)
133133
default:
134134
return
135135
}

device/device.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ type Device struct {
6868
cookieChecker CookieChecker
6969

7070
pool struct {
71-
outboundElementsSlice *WaitPool
72-
inboundElementsSlice *WaitPool
73-
messageBuffers *WaitPool
74-
inboundElements *WaitPool
75-
outboundElements *WaitPool
71+
inboundElementsContainer *WaitPool
72+
outboundElementsContainer *WaitPool
73+
messageBuffers *WaitPool
74+
inboundElements *WaitPool
75+
outboundElements *WaitPool
7676
}
7777

7878
queue struct {

device/peer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ type Peer struct {
4545
}
4646

4747
queue struct {
48-
staged chan *[]*QueueOutboundElement // staged packets before a handshake is available
49-
outbound *autodrainingOutboundQueue // sequential ordering of udp transmission
50-
inbound *autodrainingInboundQueue // sequential ordering of tun writing
48+
staged chan *QueueOutboundElementsContainer // staged packets before a handshake is available
49+
outbound *autodrainingOutboundQueue // sequential ordering of udp transmission
50+
inbound *autodrainingInboundQueue // sequential ordering of tun writing
5151
}
5252

5353
cookieGenerator CookieGenerator
@@ -81,7 +81,7 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
8181
peer.device = device
8282
peer.queue.outbound = newAutodrainingOutboundQueue(device)
8383
peer.queue.inbound = newAutodrainingInboundQueue(device)
84-
peer.queue.staged = make(chan *[]*QueueOutboundElement, QueueStagedSize)
84+
peer.queue.staged = make(chan *QueueOutboundElementsContainer, QueueStagedSize)
8585

8686
// map public key
8787
_, ok := device.peers.keyMap[pk]

device/pools.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ func (p *WaitPool) Put(x any) {
4646
}
4747

4848
func (device *Device) PopulatePools() {
49-
device.pool.outboundElementsSlice = NewWaitPool(PreallocatedBuffersPerPool, func() any {
50-
s := make([]*QueueOutboundElement, 0, device.BatchSize())
51-
return &s
52-
})
53-
device.pool.inboundElementsSlice = NewWaitPool(PreallocatedBuffersPerPool, func() any {
49+
device.pool.inboundElementsContainer = NewWaitPool(PreallocatedBuffersPerPool, func() any {
5450
s := make([]*QueueInboundElement, 0, device.BatchSize())
55-
return &s
51+
return &QueueInboundElementsContainer{elems: s}
52+
})
53+
device.pool.outboundElementsContainer = NewWaitPool(PreallocatedBuffersPerPool, func() any {
54+
s := make([]*QueueOutboundElement, 0, device.BatchSize())
55+
return &QueueOutboundElementsContainer{elems: s}
5656
})
5757
device.pool.messageBuffers = NewWaitPool(PreallocatedBuffersPerPool, func() any {
5858
return new([MaxMessageSize]byte)
@@ -65,28 +65,32 @@ func (device *Device) PopulatePools() {
6565
})
6666
}
6767

68-
func (device *Device) GetOutboundElementsSlice() *[]*QueueOutboundElement {
69-
return device.pool.outboundElementsSlice.Get().(*[]*QueueOutboundElement)
68+
func (device *Device) GetInboundElementsContainer() *QueueInboundElementsContainer {
69+
c := device.pool.inboundElementsContainer.Get().(*QueueInboundElementsContainer)
70+
c.Mutex = sync.Mutex{}
71+
return c
7072
}
7173

72-
func (device *Device) PutOutboundElementsSlice(s *[]*QueueOutboundElement) {
73-
for i := range *s {
74-
(*s)[i] = nil
74+
func (device *Device) PutInboundElementsContainer(c *QueueInboundElementsContainer) {
75+
for i := range c.elems {
76+
c.elems[i] = nil
7577
}
76-
*s = (*s)[:0]
77-
device.pool.outboundElementsSlice.Put(s)
78+
c.elems = c.elems[:0]
79+
device.pool.inboundElementsContainer.Put(c)
7880
}
7981

80-
func (device *Device) GetInboundElementsSlice() *[]*QueueInboundElement {
81-
return device.pool.inboundElementsSlice.Get().(*[]*QueueInboundElement)
82+
func (device *Device) GetOutboundElementsContainer() *QueueOutboundElementsContainer {
83+
c := device.pool.outboundElementsContainer.Get().(*QueueOutboundElementsContainer)
84+
c.Mutex = sync.Mutex{}
85+
return c
8286
}
8387

84-
func (device *Device) PutInboundElementsSlice(s *[]*QueueInboundElement) {
85-
for i := range *s {
86-
(*s)[i] = nil
88+
func (device *Device) PutOutboundElementsContainer(c *QueueOutboundElementsContainer) {
89+
for i := range c.elems {
90+
c.elems[i] = nil
8791
}
88-
*s = (*s)[:0]
89-
device.pool.inboundElementsSlice.Put(s)
92+
c.elems = c.elems[:0]
93+
device.pool.outboundElementsContainer.Put(c)
9094
}
9195

9296
func (device *Device) GetMessageBuffer() *[MaxMessageSize]byte {

device/receive.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,18 @@ type QueueHandshakeElement struct {
2727
}
2828

2929
type QueueInboundElement struct {
30-
sync.Mutex
3130
buffer *[MaxMessageSize]byte
3231
packet []byte
3332
counter uint64
3433
keypair *Keypair
3534
endpoint conn.Endpoint
3635
}
3736

37+
type QueueInboundElementsContainer struct {
38+
sync.Mutex
39+
elems []*QueueInboundElement
40+
}
41+
3842
// clearPointers clears elem fields that contain pointers.
3943
// This makes the garbage collector's life easier and
4044
// avoids accidentally keeping other objects around unnecessarily.
@@ -87,7 +91,7 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
8791
count int
8892
endpoints = make([]conn.Endpoint, maxBatchSize)
8993
deathSpiral int
90-
elemsByPeer = make(map[*Peer]*[]*QueueInboundElement, maxBatchSize)
94+
elemsByPeer = make(map[*Peer]*QueueInboundElementsContainer, maxBatchSize)
9195
)
9296

9397
for i := range bufsArrs {
@@ -170,15 +174,14 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
170174
elem.keypair = keypair
171175
elem.endpoint = endpoints[i]
172176
elem.counter = 0
173-
elem.Mutex = sync.Mutex{}
174-
elem.Lock()
175177

176178
elemsForPeer, ok := elemsByPeer[peer]
177179
if !ok {
178-
elemsForPeer = device.GetInboundElementsSlice()
180+
elemsForPeer = device.GetInboundElementsContainer()
181+
elemsForPeer.Lock()
179182
elemsByPeer[peer] = elemsForPeer
180183
}
181-
*elemsForPeer = append(*elemsForPeer, elem)
184+
elemsForPeer.elems = append(elemsForPeer.elems, elem)
182185
bufsArrs[i] = device.GetMessageBuffer()
183186
bufs[i] = bufsArrs[i][:]
184187
continue
@@ -217,16 +220,16 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
217220
default:
218221
}
219222
}
220-
for peer, elems := range elemsByPeer {
223+
for peer, elemsContainer := range elemsByPeer {
221224
if peer.isRunning.Load() {
222-
peer.queue.inbound.c <- elems
223-
device.queue.decryption.c <- elems
225+
peer.queue.inbound.c <- elemsContainer
226+
device.queue.decryption.c <- elemsContainer
224227
} else {
225-
for _, elem := range *elems {
228+
for _, elem := range elemsContainer.elems {
226229
device.PutMessageBuffer(elem.buffer)
227230
device.PutInboundElement(elem)
228231
}
229-
device.PutInboundElementsSlice(elems)
232+
device.PutInboundElementsContainer(elemsContainer)
230233
}
231234
delete(elemsByPeer, peer)
232235
}
@@ -239,8 +242,8 @@ func (device *Device) RoutineDecryption(id int) {
239242
defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
240243
device.log.Verbosef("Routine: decryption worker %d - started", id)
241244

242-
for elems := range device.queue.decryption.c {
243-
for _, elem := range *elems {
245+
for elemsContainer := range device.queue.decryption.c {
246+
for _, elem := range elemsContainer.elems {
244247
// split message into fields
245248
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
246249
content := elem.packet[MessageTransportOffsetContent:]
@@ -259,8 +262,8 @@ func (device *Device) RoutineDecryption(id int) {
259262
if err != nil {
260263
elem.packet = nil
261264
}
262-
elem.Unlock()
263265
}
266+
elemsContainer.Unlock()
264267
}
265268
}
266269

@@ -437,12 +440,12 @@ func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
437440

438441
bufs := make([][]byte, 0, maxBatchSize)
439442

440-
for elems := range peer.queue.inbound.c {
441-
if elems == nil {
443+
for elemsContainer := range peer.queue.inbound.c {
444+
if elemsContainer == nil {
442445
return
443446
}
444-
for _, elem := range *elems {
445-
elem.Lock()
447+
elemsContainer.Lock()
448+
for _, elem := range elemsContainer.elems {
446449
if elem.packet == nil {
447450
// decryption failed
448451
continue
@@ -515,11 +518,11 @@ func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
515518
device.log.Errorf("Failed to write packets to TUN device: %v", err)
516519
}
517520
}
518-
for _, elem := range *elems {
521+
for _, elem := range elemsContainer.elems {
519522
device.PutMessageBuffer(elem.buffer)
520523
device.PutInboundElement(elem)
521524
}
522525
bufs = bufs[:0]
523-
device.PutInboundElementsSlice(elems)
526+
device.PutInboundElementsContainer(elemsContainer)
524527
}
525528
}

0 commit comments

Comments
 (0)