Skip to content

Commit 82e73db

Browse files
Merge pull request #59 from Workiva/ring_buffer
Ring buffer
2 parents 959ef5b + d2f6ec0 commit 82e73db

File tree

10 files changed

+491
-22
lines changed

10 files changed

+491
-22
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ Still pretty specific to gotable, but contains logic required to maintain graph
2020
#### Queue:
2121
Package contains both a normal and priority queue. Both implementations never block on send and grow as much as necessary. Both also only return errors if you attempt to push to a disposed queue and will not panic like sending a message on a closed channel. The priority queue also allows you to place items in priority order inside the queue. If you give a useful hint to the regular queue, it is actually faster than a channel. The priority queue is somewhat slow currently and targeted for an update to a Fibonacci heap.
2222

23+
Also included in the queue package is a MPMC threadsafe ring buffer. This is a block full/empty queue, but will return a blocked thread if the queue is disposed while a thread is blocked. This can be used to synchronize goroutines and ensure goroutines quit so objects can be GC'd. Threadsafety is acheived using only CAS operations making this queue quite fast. Benchmarks can be found in that package.
24+
2325
#### Range Tree:
2426
Useful to determine if n-dimensional points fall within an n-dimensional range. Not a typical range tree however, as we are actually using an n-dimensional sorted list of points as this proved to be simpler and faster than attempting a traditional range tree while saving space on any dimension greater than one. Inserts are typical BBST times at O(log n^d) where d is the number of dimensions.
2527

btree/palm/tree.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ func (ptree *ptree) runOperations() {
151151
writeOperations[n] = append(writeOperations[n], toPerform.writes[i])
152152
}
153153

154-
toPerform.signaler <- true
155154
ptree.runAdds(writeOperations)
155+
toPerform.signaler <- true
156156
}
157157

158158
func (ptree *ptree) recursiveSplit(n, parent, left *node, nodes *[]*node, keys *Keys) {

queue/error.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ limitations under the License.
1616

1717
package queue
1818

19-
type DisposedError struct{}
19+
import "errors"
2020

21-
func (de DisposedError) Error() string {
22-
return `Queue has been disposed.`
23-
}
21+
var disposedError = errors.New(`Queue has been disposed.`)

queue/priority_queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (pq *PriorityQueue) Put(items ...Item) error {
107107
pq.lock.Lock()
108108
if pq.disposed {
109109
pq.lock.Unlock()
110-
return DisposedError{}
110+
return disposedError
111111
}
112112

113113
for _, item := range items {
@@ -144,7 +144,7 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) {
144144

145145
if pq.disposed {
146146
pq.lock.Unlock()
147-
return nil, DisposedError{}
147+
return nil, disposedError
148148
}
149149

150150
var items []Item
@@ -159,7 +159,7 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) {
159159
pq.disposeLock.Lock()
160160
if pq.disposed {
161161
pq.disposeLock.Unlock()
162-
return nil, DisposedError{}
162+
return nil, disposedError
163163
}
164164
pq.disposeLock.Unlock()
165165

queue/priority_queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,18 +193,18 @@ func TestEmptyPriorityGetWithDispose(t *testing.T) {
193193

194194
wg.Wait()
195195

196-
assert.IsType(t, DisposedError{}, err)
196+
assert.IsType(t, disposedError, err)
197197
}
198198

199199
func TestPriorityGetPutDisposed(t *testing.T) {
200200
q := NewPriorityQueue(1)
201201
q.Dispose()
202202

203203
_, err := q.Get(1)
204-
assert.IsType(t, DisposedError{}, err)
204+
assert.IsType(t, disposedError, err)
205205

206206
err = q.Put(mockItem(1))
207-
assert.IsType(t, DisposedError{}, err)
207+
assert.IsType(t, disposedError, err)
208208
}
209209

210210
func BenchmarkPriorityQueue(b *testing.B) {

queue/queue.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,31 @@ as opposed to panicking as with channels. Queues will grow with unbounded
2424
behavior as opposed to channels which can be buffered but will pause
2525
while a thread attempts to put to a full channel.
2626
27-
TODO: Unify the two types of queue to the same interface.
28-
TODO: Implement an even faster lockless circular buffer.
27+
Recently added is a lockless ring buffer using the same basic C design as
28+
found here:
29+
30+
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31+
32+
Modified for use with Go with the addition of some dispose semantics providing
33+
the capability to release blocked threads. This works for both puts
34+
and gets, either will return an error if they are blocked and the buffer
35+
is disposed. This could serve as a signal to kill a goroutine. All threadsafety
36+
is acheived using CAS operations, making this buffer pretty quick.
37+
38+
Benchmarks:
39+
BenchmarkPriorityQueue-8 2000000 782 ns/op
40+
BenchmarkQueue-8 2000000 671 ns/op
41+
BenchmarkChannel-8 1000000 2083 ns/op
42+
BenchmarkQueuePut-8 20000 84299 ns/op
43+
BenchmarkQueueGet-8 20000 80753 ns/op
44+
BenchmarkExecuteInParallel-8 20000 68891 ns/op
45+
BenchmarkRBLifeCycle-8 10000000 177 ns/op
46+
BenchmarkRBPut-8 30000000 58.1 ns/op
47+
BenchmarkRBGet-8 50000000 26.8 ns/op
48+
49+
TODO: We really need a Fibonacci heap for the priority queue.
50+
TODO: Unify the types of queue to the same interface.
2951
*/
30-
3152
package queue
3253

3354
import (
@@ -128,7 +149,7 @@ func (q *Queue) Put(items ...interface{}) error {
128149

129150
if q.disposed {
130151
q.lock.Unlock()
131-
return DisposedError{}
152+
return disposedError
132153
}
133154

134155
q.items = append(q.items, items...)
@@ -163,7 +184,7 @@ func (q *Queue) Get(number int64) ([]interface{}, error) {
163184

164185
if q.disposed {
165186
q.lock.Unlock()
166-
return nil, DisposedError{}
187+
return nil, disposedError
167188
}
168189

169190
var items []interface{}
@@ -177,7 +198,7 @@ func (q *Queue) Get(number int64) ([]interface{}, error) {
177198
sema.wg.Wait()
178199
// we are now inside the put's lock
179200
if q.disposed {
180-
return nil, DisposedError{}
201+
return nil, disposedError
181202
}
182203
items = q.items.get(number)
183204
sema.response.Done()
@@ -201,7 +222,7 @@ func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, e
201222

202223
if q.disposed {
203224
q.lock.Unlock()
204-
return nil, DisposedError{}
225+
return nil, disposedError
205226
}
206227

207228
result := q.items.getUntil(checker)

queue/queue.prof

30.4 KB
Binary file not shown.

queue/queue_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestEmptyGetWithDispose(t *testing.T) {
189189

190190
wg.Wait()
191191

192-
assert.IsType(t, DisposedError{}, err)
192+
assert.IsType(t, disposedError, err)
193193
}
194194

195195
func TestGetPutDisposed(t *testing.T) {
@@ -198,10 +198,10 @@ func TestGetPutDisposed(t *testing.T) {
198198
q.Dispose()
199199

200200
_, err := q.Get(1)
201-
assert.IsType(t, DisposedError{}, err)
201+
assert.IsType(t, disposedError, err)
202202

203203
err = q.Put(`a`)
204-
assert.IsType(t, DisposedError{}, err)
204+
assert.IsType(t, disposedError, err)
205205
}
206206

207207
func BenchmarkQueue(b *testing.B) {
@@ -289,7 +289,7 @@ func TestTakeUntilOnDisposedQueue(t *testing.T) {
289289
})
290290

291291
assert.Nil(t, result)
292-
assert.IsType(t, DisposedError{}, err)
292+
assert.IsType(t, disposedError, err)
293293
}
294294

295295
func TestExecuteInParallel(t *testing.T) {

queue/ring.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
Copyright 2014 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package queue
17+
18+
import (
19+
"runtime"
20+
"sync/atomic"
21+
)
22+
23+
// roundUp takes a uint64 greater than 0 and rounds it up to the next
24+
// power of 2.
25+
func roundUp(v uint64) uint64 {
26+
v--
27+
v |= v >> 1
28+
v |= v >> 2
29+
v |= v >> 4
30+
v |= v >> 8
31+
v |= v >> 16
32+
v |= v >> 32
33+
v++
34+
return v
35+
}
36+
37+
type node struct {
38+
position uint64
39+
data interface{}
40+
}
41+
42+
type nodes []*node
43+
44+
// RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations
45+
// only. A put on full or get on empty call will block until an item
46+
// is put or retrieved. Calling Dispose on the RingBuffer will unblock
47+
// any blocked threads with an error. This buffer is similar to the buffer
48+
// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
49+
// with some minor additions.
50+
type RingBuffer struct {
51+
nodes nodes
52+
queue, dequeue, mask, disposed uint64
53+
}
54+
55+
func (rb *RingBuffer) init(size uint64) {
56+
size = roundUp(size)
57+
rb.nodes = make(nodes, size)
58+
for i := uint64(0); i < size; i++ {
59+
rb.nodes[i] = &node{position: i}
60+
}
61+
rb.mask = size - 1 // so we don't have to do this with every put/get operation
62+
}
63+
64+
// Put adds the provided item to the queue. If the queue is full, this
65+
// call will block until an item is added to the queue or Dispose is called
66+
// on the queue. An error will be returned if the queue is disposed.
67+
func (rb *RingBuffer) Put(item interface{}) error {
68+
var n *node
69+
pos := atomic.LoadUint64(&rb.queue)
70+
L:
71+
for {
72+
if atomic.LoadUint64(&rb.disposed) == 1 {
73+
return disposedError
74+
}
75+
76+
n = rb.nodes[pos&rb.mask]
77+
seq := atomic.LoadUint64(&n.position)
78+
switch dif := seq - pos; {
79+
case dif == 0:
80+
if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) {
81+
break L
82+
}
83+
case dif < 0:
84+
panic(`Ring buffer in a compromised state during a put operation.`)
85+
default:
86+
pos = atomic.LoadUint64(&rb.queue)
87+
}
88+
runtime.Gosched() // free up the cpu before the next iteration
89+
}
90+
91+
n.data = item
92+
atomic.StoreUint64(&n.position, pos+1)
93+
return nil
94+
}
95+
96+
// Get will return the next item in the queue. This call will block
97+
// if the queue is empty. This call will unblock when an item is added
98+
// to the queue or Dispose is called on the queue. An error will be returned
99+
// if the queue is disposed.
100+
func (rb *RingBuffer) Get() (interface{}, error) {
101+
var n *node
102+
pos := atomic.LoadUint64(&rb.dequeue)
103+
L:
104+
for {
105+
if atomic.LoadUint64(&rb.disposed) == 1 {
106+
return nil, disposedError
107+
}
108+
109+
n = rb.nodes[pos&rb.mask]
110+
seq := atomic.LoadUint64(&n.position)
111+
switch dif := seq - (pos + 1); {
112+
case dif == 0:
113+
if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) {
114+
break L
115+
}
116+
case dif < 0:
117+
panic(`Ring buffer in compromised state during a get operation.`)
118+
default:
119+
pos = atomic.LoadUint64(&rb.dequeue)
120+
}
121+
runtime.Gosched() // free up cpu before next iteration
122+
}
123+
data := n.data
124+
n.data = nil
125+
atomic.StoreUint64(&n.position, pos+rb.mask+1)
126+
return data, nil
127+
}
128+
129+
// Len returns the number of items in the queue.
130+
func (rb *RingBuffer) Len() uint64 {
131+
return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue)
132+
}
133+
134+
// Cap returns the capacity of this ring buffer.
135+
func (rb *RingBuffer) Cap() uint64 {
136+
return uint64(len(rb.nodes))
137+
}
138+
139+
// Dispose will dispose of this queue and free any blocked threads
140+
// in the Put and/or Get methods. Calling those methods on a disposed
141+
// queue will return an error.
142+
func (rb *RingBuffer) Dispose() {
143+
atomic.CompareAndSwapUint64(&rb.disposed, 0, 1)
144+
}
145+
146+
// IsDisposed will return a bool indicating if this queue has been
147+
// disposed.
148+
func (rb *RingBuffer) IsDisposed() bool {
149+
return atomic.LoadUint64(&rb.disposed) == 1
150+
}
151+
152+
// NewRingBuffer will allocate, initialize, and return a ring buffer
153+
// with the specified size.
154+
func NewRingBuffer(size uint64) *RingBuffer {
155+
rb := &RingBuffer{}
156+
rb.init(size)
157+
return rb
158+
}

0 commit comments

Comments
 (0)