Skip to content

Commit 003cf93

Browse files
Implementa
1 parent b81671b commit 003cf93

File tree

3 files changed

+125
-13
lines changed

3 files changed

+125
-13
lines changed

queue/priority_queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,18 +193,18 @@ func TestEmptyPriorityGetWithDispose(t *testing.T) {
193193

194194
wg.Wait()
195195

196-
assert.IsType(t, DisposedError{}, err)
196+
assert.IsType(t, disposedError, err)
197197
}
198198

199199
func TestPriorityGetPutDisposed(t *testing.T) {
200200
q := NewPriorityQueue(1)
201201
q.Dispose()
202202

203203
_, err := q.Get(1)
204-
assert.IsType(t, DisposedError{}, err)
204+
assert.IsType(t, disposedError, err)
205205

206206
err = q.Put(mockItem(1))
207-
assert.IsType(t, DisposedError{}, err)
207+
assert.IsType(t, disposedError, err)
208208
}
209209

210210
func BenchmarkPriorityQueue(b *testing.B) {

queue/queue_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestEmptyGetWithDispose(t *testing.T) {
189189

190190
wg.Wait()
191191

192-
assert.IsType(t, DisposedError{}, err)
192+
assert.IsType(t, disposedError, err)
193193
}
194194

195195
func TestGetPutDisposed(t *testing.T) {
@@ -198,10 +198,10 @@ func TestGetPutDisposed(t *testing.T) {
198198
q.Dispose()
199199

200200
_, err := q.Get(1)
201-
assert.IsType(t, DisposedError{}, err)
201+
assert.IsType(t, disposedError, err)
202202

203203
err = q.Put(`a`)
204-
assert.IsType(t, DisposedError{}, err)
204+
assert.IsType(t, disposedError, err)
205205
}
206206

207207
func BenchmarkQueue(b *testing.B) {
@@ -289,7 +289,7 @@ func TestTakeUntilOnDisposedQueue(t *testing.T) {
289289
})
290290

291291
assert.Nil(t, result)
292-
assert.IsType(t, DisposedError{}, err)
292+
assert.IsType(t, disposedError, err)
293293
}
294294

295295
func TestExecuteInParallel(t *testing.T) {

queue/ring.go

Lines changed: 118 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,128 @@ limitations under the License.
1515
*/
1616
package queue
1717

18+
import (
19+
"errors"
20+
"log"
21+
"runtime"
22+
"sync/atomic"
23+
)
24+
25+
func init() {
26+
log.Printf(`I HATE THIS.`)
27+
}
28+
29+
// roundUp takes a uint64 greater than 0 and rounds it up to the next
30+
// power of 2.
31+
func roundUp(v uint64) uint64 {
32+
v--
33+
v |= v >> 1
34+
v |= v >> 2
35+
v |= v >> 4
36+
v |= v >> 8
37+
v |= v >> 16
38+
v |= v >> 32
39+
v++
40+
return v
41+
}
42+
43+
type node struct {
44+
position uint64
45+
data interface{}
46+
}
47+
48+
type nodes []*node
49+
1850
type RingBuffer struct {
19-
ifs []interface{}
20-
placed, retrieved uint64
21-
size int64
51+
nodes nodes
52+
queue, dequeue, mask, disposed uint64
53+
}
54+
55+
func (rb *RingBuffer) init(size uint64) {
56+
size = roundUp(size)
57+
rb.nodes = make(nodes, size)
58+
for i := uint64(0); i < size; i++ {
59+
rb.nodes[i] = &node{position: i}
60+
}
61+
rb.mask = size - 1 // so we don't have to do this with every put/get operation
2262
}
2363

24-
func (rb *RingBuffer) Put(items ...interface{}) error {
25-
if len(items) == 0 {
26-
return nil
64+
func (rb *RingBuffer) Put(item interface{}) error {
65+
var n *node
66+
pos := atomic.LoadUint64(&rb.queue)
67+
L:
68+
for {
69+
if atomic.LoadUint64(&rb.disposed) == 1 {
70+
return disposedError
71+
}
72+
73+
n = rb.nodes[pos&rb.mask]
74+
seq := atomic.LoadUint64(&n.position)
75+
switch dif := seq - pos; {
76+
case dif == 0:
77+
if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) {
78+
break L
79+
}
80+
case dif < 0:
81+
return errors.New(`Full.`)
82+
default:
83+
pos = atomic.LoadUint64(&rb.queue)
84+
}
85+
runtime.Gosched() // free up the cpu before the next iteration
2786
}
2887

88+
n.data = item
89+
atomic.StoreUint64(&n.position, pos+1)
2990
return nil
3091
}
92+
93+
func (rb *RingBuffer) Get() (interface{}, error) {
94+
var n *node
95+
pos := atomic.LoadUint64(&rb.dequeue)
96+
L:
97+
for {
98+
if atomic.LoadUint64(&rb.disposed) == 1 {
99+
return nil, disposedError
100+
}
101+
102+
n = rb.nodes[pos&rb.mask]
103+
seq := atomic.LoadUint64(&n.position)
104+
switch dif := seq - (pos + 1); {
105+
case dif == 0:
106+
if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) {
107+
break L
108+
}
109+
case dif < 0:
110+
return nil, errors.New(`Queue empty.`)
111+
default:
112+
pos = atomic.LoadUint64(&rb.dequeue)
113+
}
114+
runtime.Gosched() // free up cpu before next iteration
115+
}
116+
data := n.data
117+
n.data = nil
118+
atomic.StoreUint64(&n.position, pos+rb.mask+1)
119+
return data, nil
120+
}
121+
122+
func (rb *RingBuffer) Len() uint64 {
123+
return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue)
124+
}
125+
126+
func (rb *RingBuffer) Cap() uint64 {
127+
return uint64(len(rb.nodes))
128+
}
129+
130+
func (rb *RingBuffer) Dispose() {
131+
atomic.CompareAndSwapUint64(&rb.disposed, 0, 1)
132+
}
133+
134+
func (rb *RingBuffer) IsDisposed() bool {
135+
return atomic.LoadUint64(&rb.disposed) == 1
136+
}
137+
138+
func NewRingBuffer(size uint64) *RingBuffer {
139+
rb := &RingBuffer{}
140+
rb.init(size)
141+
return rb
142+
}

0 commit comments

Comments
 (0)