Skip to content

Commit 8dd5ea9

Browse files
committed
add: lock-free FIFO queue
1 parent 0c6fa27 commit 8dd5ea9

File tree

2 files changed

+197
-0
lines changed

2 files changed

+197
-0
lines changed

queue.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package lockfree
2+
3+
import (
4+
"sync/atomic"
5+
"unsafe"
6+
)
7+
8+
// Queue implements lock-free FIFO freelist based queue.
9+
// ref: https://dl.acm.org/citation.cfm?doid=248052.248106
10+
type Queue struct {
11+
head unsafe.Pointer
12+
tail unsafe.Pointer
13+
len uint64
14+
}
15+
16+
// NewQueue creates a new lock-free queue.
17+
func NewQueue() *Queue {
18+
head := queueitem{next: nil, v: nil} // allocate a free item
19+
return &Queue{
20+
tail: unsafe.Pointer(&head), // both head and tail points
21+
head: unsafe.Pointer(&head), // to the free item
22+
}
23+
}
24+
25+
// Enqueue puts the given value v at the tail of the queue.
26+
func (q *Queue) Enqueue(v interface{}) {
27+
item := &queueitem{next: nil, v: v} // allocate new item
28+
var last, lastnext *queueitem
29+
for {
30+
last = loadqitem(&q.tail)
31+
lastnext = loadqitem(&last.next)
32+
if loadqitem(&q.tail) == last { // are tail and next consistent?
33+
if lastnext == nil { // was tail pointing to the last node?
34+
if casqitem(&last.next, lastnext, item) { // try to link item at the end of linked list
35+
casqitem(&q.tail, last, item) // enqueue is done. try swing tail to the inserted node
36+
atomic.AddUint64(&q.len, 1)
37+
return
38+
}
39+
} else { // tail was not pointing to the last node
40+
casqitem(&q.tail, last, lastnext) // try swing tail to the next node
41+
}
42+
}
43+
}
44+
}
45+
46+
// Dequeue removes and returns the value at the head of the queue.
47+
// It returns nil if the queue is empty.
48+
func (q *Queue) Dequeue() interface{} {
49+
var first, last, firstnext *queueitem
50+
for {
51+
first = loadqitem(&q.head)
52+
last = loadqitem(&q.tail)
53+
firstnext = loadqitem(&first.next)
54+
if first == loadqitem(&q.head) { // are head, tail and next consistent?
55+
if first == last { // is queue empty?
56+
if firstnext == nil { // queue is empty, couldn't dequeue
57+
return nil
58+
}
59+
casqitem(&q.tail, last, firstnext) // tail is falling behind, try to advance it
60+
} else { // read value before cas, otherwise another dequeue might free the next node
61+
v := firstnext.v
62+
if casqitem(&q.head, first, firstnext) { // try to swing head to the next node
63+
atomic.AddUint64(&q.len, ^uint64(0))
64+
return v // queue was not empty and dequeue finished.
65+
}
66+
}
67+
}
68+
}
69+
}
70+
71+
// Length returns the length of the queue.
72+
func (q *Queue) Length() uint64 {
73+
return atomic.LoadUint64(&q.len)
74+
}
75+
76+
type queueitem struct {
77+
next unsafe.Pointer
78+
v interface{}
79+
}
80+
81+
func loadqitem(p *unsafe.Pointer) *queueitem {
82+
return (*queueitem)(atomic.LoadPointer(p))
83+
}
84+
func casqitem(p *unsafe.Pointer, old, new *queueitem) bool {
85+
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
86+
}

queue_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package lockfree_test
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"sync"
7+
"sync/atomic"
8+
"testing"
9+
10+
"github.com/changkun/lockfree"
11+
)
12+
13+
func TestQueueDequeueEmpty(t *testing.T) {
14+
q := lockfree.NewQueue()
15+
if q.Dequeue() != nil {
16+
t.Fatalf("dequeue empty queue returns non-nil")
17+
}
18+
}
19+
20+
func TestQueue_Length(t *testing.T) {
21+
q := lockfree.NewQueue()
22+
if q.Length() != 0 {
23+
t.Fatalf("empty queue has non-zero length")
24+
}
25+
26+
q.Enqueue(1)
27+
if q.Length() != 1 {
28+
t.Fatalf("count of enqueue wrong, want %d, got %d.", 1, q.Length())
29+
}
30+
31+
q.Dequeue()
32+
if q.Length() != 0 {
33+
t.Fatalf("count of dequeue wrong, want %d, got %d", 0, q.Length())
34+
}
35+
}
36+
37+
func ExampleQueue() {
38+
q := lockfree.NewQueue()
39+
40+
q.Enqueue("1st item")
41+
q.Enqueue("2nd item")
42+
q.Enqueue("3rd item")
43+
44+
fmt.Println(q.Dequeue())
45+
fmt.Println(q.Dequeue())
46+
fmt.Println(q.Dequeue())
47+
48+
// Output:
49+
// 1st item
50+
// 2nd item
51+
// 3rd item
52+
}
53+
54+
type queueInterface interface {
55+
Enqueue(interface{})
56+
Dequeue() interface{}
57+
}
58+
59+
type mutexQueue struct {
60+
v []interface{}
61+
mu sync.Mutex
62+
}
63+
64+
func newMutexQueue() *mutexQueue {
65+
return &mutexQueue{v: make([]interface{}, 0)}
66+
}
67+
68+
func (q *mutexQueue) Enqueue(v interface{}) {
69+
q.mu.Lock()
70+
q.v = append(q.v, v)
71+
q.mu.Unlock()
72+
}
73+
74+
func (q *mutexQueue) Dequeue() interface{} {
75+
q.mu.Lock()
76+
if len(q.v) == 0 {
77+
q.mu.Unlock()
78+
return nil
79+
}
80+
v := q.v[0]
81+
q.v = q.v[1:]
82+
q.mu.Unlock()
83+
return v
84+
}
85+
86+
func BenchmarkQueue(b *testing.B) {
87+
length := 1 << 12
88+
inputs := make([]int, length)
89+
for i := 0; i < length; i++ {
90+
inputs = append(inputs, rand.Int())
91+
}
92+
q, mq := lockfree.NewQueue(), newMutexQueue()
93+
b.ResetTimer()
94+
95+
for _, q := range [...]queueInterface{q, mq} {
96+
b.Run(fmt.Sprintf("%T", q), func(b *testing.B) {
97+
var c int64
98+
b.RunParallel(func(pb *testing.PB) {
99+
for pb.Next() {
100+
i := int(atomic.AddInt64(&c, 1)-1) % length
101+
v := inputs[i]
102+
if v >= 0 {
103+
q.Enqueue(v)
104+
} else {
105+
q.Dequeue()
106+
}
107+
}
108+
})
109+
})
110+
}
111+
}

0 commit comments

Comments
 (0)