Skip to content

Commit 9f7e609

Browse files
author
robaho
committed
recycle the market data output buffers
1 parent 19238ce commit 9f7e609

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

internal/exchange/marketdata.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ var udpCon *net.UDPConn
2929
var pUdpCon *ipv4.PacketConn
3030
var subMutex sync.Mutex
3131
var subscriptions []chan *Book
32+
var buffers = &SPSC{}
3233

3334
type MarketEvent struct {
3435
book *Book
@@ -94,8 +95,15 @@ func GetBook(symbol string) *Book {
9495
}
9596

9697
func newBuffer() *bytes.Buffer {
97-
9898
placeholder := make([]byte, 8)
99+
100+
p := buffers.get()
101+
102+
if p != nil {
103+
p.Write(placeholder) // leave room for packet number
104+
return p
105+
}
106+
99107
buf := new(bytes.Buffer)
100108
buf.Grow(protocol.MaxMsgSize)
101109
buf.Write(placeholder) // leave room for packet number
@@ -323,7 +331,8 @@ func rememberPacket(packetNumber uint64, data []byte) {
323331
defer history.Unlock()
324332

325333
if history.packets.Len() > 10000 {
326-
history.packets.Remove(history.packets.Front())
334+
p := history.packets.Remove(history.packets.Front()).(*Packet)
335+
buffers.put(bytes.NewBuffer(p.data[:0]))
327336
}
328337

329338
packet := Packet{packetNumber, data}

internal/exchange/spsc.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package exchange
2+
3+
import (
4+
"bytes"
5+
"sync/atomic"
6+
"unsafe"
7+
)
8+
9+
type node struct {
10+
data *bytes.Buffer
11+
next *node
12+
}
13+
14+
// SPSC is a lock-free queue for []byte (single producer, single consumer), used to recycle marketdata multicast packets
15+
type SPSC struct {
16+
head *node
17+
}
18+
19+
func (q *SPSC) put(data *bytes.Buffer) {
20+
n := &node{}
21+
n.data = data
22+
for {
23+
n.next = (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head))))
24+
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(n.next), unsafe.Pointer(n)) {
25+
return
26+
}
27+
}
28+
}
29+
30+
func (q *SPSC) get() *bytes.Buffer {
31+
for {
32+
head := (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head))))
33+
34+
if head == nil {
35+
return nil
36+
}
37+
38+
next := head.next
39+
40+
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(head), unsafe.Pointer(next)) {
41+
return head.data
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)