Skip to content

Commit d831fef

Browse files
jwhitedraggi
authored andcommitted
device: distribute crypto work as slice of elements
After reducing UDP stack traversal overhead via GSO and GRO, runtime.chanrecv() began to account for a high percentage (20% in one environment) of perf samples during a throughput benchmark. The individual packet channel ops with the crypto goroutines was the primary contributor to this overhead. Updating these channels to pass vectors, which the device package already handles at its ends, reduced this overhead substantially, and improved throughput. The iperf3 results below demonstrate the effect of this commit between two Linux computers with i5-12400 CPUs. There is roughly ~13us of round trip latency between them. The first result is with UDP GSO and GRO, and with single element channels. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 3.15 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 12.3 GBytes 10.6 Gbits/sec 232 sender [ 5] 0.00-10.04 sec 12.3 GBytes 10.6 Gbits/sec receiver The second result is with channels updated to pass a slice of elements. Starting Test: protocol: TCP, 1 streams, 131072 byte blocks [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-10.00 sec 13.2 GBytes 11.3 Gbits/sec 182 3.15 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - Test Complete. Summary Results: [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 13.2 GBytes 11.3 Gbits/sec 182 sender [ 5] 0.00-10.04 sec 13.2 GBytes 11.3 Gbits/sec receiver Reviewed-by: Adrian Dewhurst <[email protected]> Signed-off-by: Jordan Whited <[email protected]>
1 parent e06231b commit d831fef

File tree

3 files changed

+55
-55
lines changed

3 files changed

+55
-55
lines changed

device/channels.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ import (
1919
// call wg.Done to remove the initial reference.
2020
// When the refcount hits 0, the queue's channel is closed.
2121
type outboundQueue struct {
22-
c chan *QueueOutboundElement
22+
c chan *[]*QueueOutboundElement
2323
wg sync.WaitGroup
2424
}
2525

2626
func newOutboundQueue() *outboundQueue {
2727
q := &outboundQueue{
28-
c: make(chan *QueueOutboundElement, QueueOutboundSize),
28+
c: make(chan *[]*QueueOutboundElement, QueueOutboundSize),
2929
}
3030
q.wg.Add(1)
3131
go func() {
@@ -37,13 +37,13 @@ func newOutboundQueue() *outboundQueue {
3737

3838
// A inboundQueue is similar to an outboundQueue; see those docs.
3939
type inboundQueue struct {
40-
c chan *QueueInboundElement
40+
c chan *[]*QueueInboundElement
4141
wg sync.WaitGroup
4242
}
4343

4444
func newInboundQueue() *inboundQueue {
4545
q := &inboundQueue{
46-
c: make(chan *QueueInboundElement, QueueInboundSize),
46+
c: make(chan *[]*QueueInboundElement, QueueInboundSize),
4747
}
4848
q.wg.Add(1)
4949
go func() {

device/receive.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,7 @@ func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.Receive
220220
for peer, elems := range elemsByPeer {
221221
if peer.isRunning.Load() {
222222
peer.queue.inbound.c <- elems
223-
for _, elem := range *elems {
224-
device.queue.decryption.c <- elem
225-
}
223+
device.queue.decryption.c <- elems
226224
} else {
227225
for _, elem := range *elems {
228226
device.PutMessageBuffer(elem.buffer)
@@ -241,26 +239,28 @@ func (device *Device) RoutineDecryption(id int) {
241239
defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
242240
device.log.Verbosef("Routine: decryption worker %d - started", id)
243241

244-
for elem := range device.queue.decryption.c {
245-
// split message into fields
246-
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
247-
content := elem.packet[MessageTransportOffsetContent:]
248-
249-
// decrypt and release to consumer
250-
var err error
251-
elem.counter = binary.LittleEndian.Uint64(counter)
252-
// copy counter to nonce
253-
binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter)
254-
elem.packet, err = elem.keypair.receive.Open(
255-
content[:0],
256-
nonce[:],
257-
content,
258-
nil,
259-
)
260-
if err != nil {
261-
elem.packet = nil
242+
for elems := range device.queue.decryption.c {
243+
for _, elem := range *elems {
244+
// split message into fields
245+
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
246+
content := elem.packet[MessageTransportOffsetContent:]
247+
248+
// decrypt and release to consumer
249+
var err error
250+
elem.counter = binary.LittleEndian.Uint64(counter)
251+
// copy counter to nonce
252+
binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter)
253+
elem.packet, err = elem.keypair.receive.Open(
254+
content[:0],
255+
nonce[:],
256+
content,
257+
nil,
258+
)
259+
if err != nil {
260+
elem.packet = nil
261+
}
262+
elem.Unlock()
262263
}
263-
elem.Unlock()
264264
}
265265
}
266266

device/send.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -385,9 +385,7 @@ top:
385385
// add to parallel and sequential queue
386386
if peer.isRunning.Load() {
387387
peer.queue.outbound.c <- elems
388-
for _, elem := range *elems {
389-
peer.device.queue.encryption.c <- elem
390-
}
388+
peer.device.queue.encryption.c <- elems
391389
} else {
392390
for _, elem := range *elems {
393391
peer.device.PutMessageBuffer(elem.buffer)
@@ -447,32 +445,34 @@ func (device *Device) RoutineEncryption(id int) {
447445
defer device.log.Verbosef("Routine: encryption worker %d - stopped", id)
448446
device.log.Verbosef("Routine: encryption worker %d - started", id)
449447

450-
for elem := range device.queue.encryption.c {
451-
// populate header fields
452-
header := elem.buffer[:MessageTransportHeaderSize]
453-
454-
fieldType := header[0:4]
455-
fieldReceiver := header[4:8]
456-
fieldNonce := header[8:16]
457-
458-
binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
459-
binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
460-
binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
461-
462-
// pad content to multiple of 16
463-
paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load()))
464-
elem.packet = append(elem.packet, paddingZeros[:paddingSize]...)
465-
466-
// encrypt content and release to consumer
467-
468-
binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
469-
elem.packet = elem.keypair.send.Seal(
470-
header,
471-
nonce[:],
472-
elem.packet,
473-
nil,
474-
)
475-
elem.Unlock()
448+
for elems := range device.queue.encryption.c {
449+
for _, elem := range *elems {
450+
// populate header fields
451+
header := elem.buffer[:MessageTransportHeaderSize]
452+
453+
fieldType := header[0:4]
454+
fieldReceiver := header[4:8]
455+
fieldNonce := header[8:16]
456+
457+
binary.LittleEndian.PutUint32(fieldType, MessageTransportType)
458+
binary.LittleEndian.PutUint32(fieldReceiver, elem.keypair.remoteIndex)
459+
binary.LittleEndian.PutUint64(fieldNonce, elem.nonce)
460+
461+
// pad content to multiple of 16
462+
paddingSize := calculatePaddingSize(len(elem.packet), int(device.tun.mtu.Load()))
463+
elem.packet = append(elem.packet, paddingZeros[:paddingSize]...)
464+
465+
// encrypt content and release to consumer
466+
467+
binary.LittleEndian.PutUint64(nonce[4:], elem.nonce)
468+
elem.packet = elem.keypair.send.Seal(
469+
header,
470+
nonce[:],
471+
elem.packet,
472+
nil,
473+
)
474+
elem.Unlock()
475+
}
476476
}
477477
}
478478

0 commit comments

Comments
 (0)