Skip to content

Commit d2da7a6

Browse files
Added some documentation.
1 parent 7aba396 commit d2da7a6

File tree

3 files changed

+50
-11
lines changed

3 files changed

+50
-11
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ Still pretty specific to gotable, but contains logic required to maintain graph
2020
#### Queue:
2121
Package contains both a normal and priority queue. Both implementations never block on send and grow as much as necessary. Both also only return errors if you attempt to push to a disposed queue and will not panic like sending a message on a closed channel. The priority queue also allows you to place items in priority order inside the queue. If you give a useful hint to the regular queue, it is actually faster than a channel. The priority queue is somewhat slow currently and targeted for an update to a Fibonacci heap.
2222

23+
Also included in the queue package is a MPMC threadsafe ring buffer. This is a block full/empty queue, but will return a blocked thread if the queue is disposed while a thread is blocked. This can be used to synchronize goroutines and ensure goroutines quit so objects can be GC'd. Threadsafety is acheived using only CAS operations making this queue quite fast. Benchmarks can be found in that package.
24+
2325
#### Range Tree:
2426
Useful to determine if n-dimensional points fall within an n-dimensional range. Not a typical range tree however, as we are actually using an n-dimensional sorted list of points as this proved to be simpler and faster than attempting a traditional range tree while saving space on any dimension greater than one. Inserts are typical BBST times at O(log n^d) where d is the number of dimensions.
2527

queue/queue.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,31 @@ as opposed to panicking as with channels. Queues will grow with unbounded
2424
behavior as opposed to channels which can be buffered but will pause
2525
while a thread attempts to put to a full channel.
2626
27-
TODO: Unify the two types of queue to the same interface.
28-
TODO: Implement an even faster lockless circular buffer.
27+
Recently added is a lockless ring buffer using the same basic C design as
28+
found here:
29+
30+
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31+
32+
Modified for use with Go with the addition of some dispose semantics providing
33+
the capability to release blocked threads. This works for both puts
34+
and gets, either will return an error if they are blocked and the buffer
35+
is disposed. This could serve as a signal to kill a goroutine. All threadsafety
36+
is acheived using CAS operations, making this buffer pretty quick.
37+
38+
Benchmarks:
39+
BenchmarkPriorityQueue-8 2000000 782 ns/op
40+
BenchmarkQueue-8 2000000 671 ns/op
41+
BenchmarkChannel-8 1000000 2083 ns/op
42+
BenchmarkQueuePut-8 20000 84299 ns/op
43+
BenchmarkQueueGet-8 20000 80753 ns/op
44+
BenchmarkExecuteInParallel-8 20000 68891 ns/op
45+
BenchmarkRBLifeCycle-8 10000000 177 ns/op
46+
BenchmarkRBPut-8 30000000 58.1 ns/op
47+
BenchmarkRBGet-8 50000000 26.8 ns/op
48+
49+
TODO: We really need a Fibonacci heap for the priority queue.
50+
TODO: Unify the types of queue to the same interface.
2951
*/
30-
3152
package queue
3253

3354
import (

queue/ring.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,10 @@ limitations under the License.
1616
package queue
1717

1818
import (
19-
"errors"
20-
"log"
2119
"runtime"
2220
"sync/atomic"
2321
)
2422

25-
func init() {
26-
log.Printf(`I HATE THIS.`)
27-
}
28-
2923
// roundUp takes a uint64 greater than 0 and rounds it up to the next
3024
// power of 2.
3125
func roundUp(v uint64) uint64 {
@@ -47,6 +41,12 @@ type node struct {
4741

4842
type nodes []*node
4943

44+
// RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations
45+
// only. A put on full or get on empty call will block until an item
46+
// is put or retrieved. Calling Dispose on the RingBuffer will unblock
47+
// any blocked threads with an error. This buffer is similar to the buffer
48+
// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
49+
// with some minor additions.
5050
type RingBuffer struct {
5151
nodes nodes
5252
queue, dequeue, mask, disposed uint64
@@ -61,6 +61,9 @@ func (rb *RingBuffer) init(size uint64) {
6161
rb.mask = size - 1 // so we don't have to do this with every put/get operation
6262
}
6363

64+
// Put adds the provided item to the queue. If the queue is full, this
65+
// call will block until an item is added to the queue or Dispose is called
66+
// on the queue. An error will be returned if the queue is disposed.
6467
func (rb *RingBuffer) Put(item interface{}) error {
6568
var n *node
6669
pos := atomic.LoadUint64(&rb.queue)
@@ -78,7 +81,7 @@ L:
7881
break L
7982
}
8083
case dif < 0:
81-
return errors.New(`Full.`)
84+
panic(`Ring buffer in a compromised state during a put operation.`)
8285
default:
8386
pos = atomic.LoadUint64(&rb.queue)
8487
}
@@ -90,6 +93,10 @@ L:
9093
return nil
9194
}
9295

96+
// Get will return the next item in the queue. This call will block
97+
// if the queue is empty. This call will unblock when an item is added
98+
// to the queue or Dispose is called on the queue. An error will be returned
99+
// if the queue is disposed.
93100
func (rb *RingBuffer) Get() (interface{}, error) {
94101
var n *node
95102
pos := atomic.LoadUint64(&rb.dequeue)
@@ -107,7 +114,7 @@ L:
107114
break L
108115
}
109116
case dif < 0:
110-
return nil, errors.New(`Queue empty.`)
117+
panic(`Ring buffer in compromised state during a get operation.`)
111118
default:
112119
pos = atomic.LoadUint64(&rb.dequeue)
113120
}
@@ -119,22 +126,31 @@ L:
119126
return data, nil
120127
}
121128

129+
// Len returns the number of items in the queue.
122130
func (rb *RingBuffer) Len() uint64 {
123131
return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue)
124132
}
125133

134+
// Cap returns the capacity of this ring buffer.
126135
func (rb *RingBuffer) Cap() uint64 {
127136
return uint64(len(rb.nodes))
128137
}
129138

139+
// Dispose will dispose of this queue and free any blocked threads
140+
// in the Put and/or Get methods. Calling those methods on a disposed
141+
// queue will return an error.
130142
func (rb *RingBuffer) Dispose() {
131143
atomic.CompareAndSwapUint64(&rb.disposed, 0, 1)
132144
}
133145

146+
// IsDisposed will return a bool indicating if this queue has been
147+
// disposed.
134148
func (rb *RingBuffer) IsDisposed() bool {
135149
return atomic.LoadUint64(&rb.disposed) == 1
136150
}
137151

152+
// NewRingBuffer will allocate, initialize, and return a ring buffer
153+
// with the specified size.
138154
func NewRingBuffer(size uint64) *RingBuffer {
139155
rb := &RingBuffer{}
140156
rb.init(size)

0 commit comments

Comments
 (0)