Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,51 @@ func (pq *PriorityQueue) DequeueByPriority(priority uint8) (*PriorityItem, error
return item, nil
}

func (pq *PriorityQueue) DequeueByID(priority uint8, id uint64) (*PriorityItem, error) {
pq.Lock()
defer pq.Unlock()

// Check if queue is closed.
if !pq.isOpen {
return nil, ErrDBClosed
}

item := &PriorityItem{ID: id, Priority: priority, Key: pq.generateKey(priority, id)}

var err error
if item.Value, err = pq.db.Get(item.Key, nil); err != nil {
return nil, err
}

if err = pq.db.Delete(item.Key, nil); err != nil {
return nil, err
}

// Increment head position if this happens to be the first one
if id == pq.levels[priority].head+1 {
pq.levels[priority].head++
}

return item, nil

}

// DequeueItems dequeue's a slice of items inside a transaction so either
// they are all deleted, or none are deleted
func (pq *PriorityQueue) DequeueItems(items []*PriorityItem) error {
trans, err := pq.db.OpenTransaction()
if err != nil {
return err
}
defer trans.Discard()
for _, item := range items {
if err := trans.Delete(item.Key, nil); err != nil {
return err
}
}
return trans.Commit()
}

// Peek returns the next item in the priority queue without removing it.
func (pq *PriorityQueue) Peek() (*PriorityItem, error) {
pq.RLock()
Expand Down Expand Up @@ -479,7 +524,20 @@ func (pq *PriorityQueue) getNextItem() (*PriorityItem, error) {
}

// Try to get the next item in the current priority level.
return pq.getItemByPriorityID(pq.curLevel, pq.levels[pq.curLevel].head+1)
// Items may have been removed from inside the queue so if we get an error
// increment head until we get something valid
for {
result, err := pq.getItemByPriorityID(pq.curLevel, pq.levels[pq.curLevel].head+1)
if err == nil {
return result, nil
}
// If we've iterated the whole way through... we have to give up
if pq.levels[pq.curLevel].head > pq.levels[pq.curLevel].tail {
return result, err
}
// This item was deleted, move on to the next one!
pq.levels[pq.curLevel].head++
}
}

// getItemByID returns an item, if found, for the given ID.
Expand Down
129 changes: 129 additions & 0 deletions priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package goque

import (
"fmt"
"io/ioutil"
"math"
"os"
"reflect"
"testing"
"time"
)
Expand Down Expand Up @@ -1056,6 +1058,133 @@ func TestPriorityQueueOutOfBounds(t *testing.T) {
}
}

func TestPriorityQueuePopMidStream(t *testing.T) {
type Vector struct {
Data string
}
tmpdir, err := ioutil.TempDir("", "testdb")
if err != nil {
t.Error(err)
}

defer os.RemoveAll(tmpdir)

queue, err := OpenPriorityQueue(tmpdir, ASC)
if err != nil {
t.Error(err)
}

item1 := Vector{Data: "1"}
item2 := Vector{Data: "2"}
item3 := Vector{Data: "3"}

if _, err = queue.EnqueueObject(1, item1); err != nil {
t.Error(err)
}
saved2, err := queue.EnqueueObject(1, item2)
if err != nil {
t.Error(err)
}
if _, err = queue.EnqueueObject(1, item3); err != nil {
t.Error(err)
}

result2, err := queue.DequeueByID(saved2.Priority, saved2.ID)
if err != nil {
t.Error(err)
}
result1, err := queue.Dequeue()
if err != nil {
t.Error(err)
}
result3, err := queue.Dequeue()
if err != nil {
t.Error(err)
}

var resultItem1, resultItem2, resultItem3 Vector
result1.ToObject(&resultItem1)
result2.ToObject(&resultItem2)
result3.ToObject(&resultItem3)

if resultItem1.Data != item1.Data {
t.Error(resultItem1, " != ", item1.Data)
}
if resultItem2.Data != item2.Data {
t.Error(resultItem1, " != ", item1.Data)
}
if resultItem3.Data != item3.Data {
t.Error(resultItem1, " != ", item1.Data)
}
}

func TestDequeueItems(t *testing.T) {
type Vector struct {
Data string
}
tmpdir, err := ioutil.TempDir("", "testdb")
if err != nil {
t.Error(err)
}

defer os.RemoveAll(tmpdir)

queue, err := OpenPriorityQueue(tmpdir, ASC)
if err != nil {
t.Error(err)
}

item1 := Vector{Data: "1"}
item2 := Vector{Data: "2"}
item3 := Vector{Data: "3"}

saved1, err := queue.EnqueueObject(1, item1)
if err != nil {
t.Error(err)
}
saved2, err := queue.EnqueueObject(1, item2)
if err != nil {
t.Error(err)
}
saved3, err := queue.EnqueueObject(1, item3)
if err != nil {
t.Error(err)
}

err = queue.DequeueItems([]*PriorityItem{saved1, saved3})
if err != nil {
t.Error(err)
}

actual, err := queue.Dequeue()

if !reflect.DeepEqual(actual, saved2) {
t.Errorf("got %+v, expected %+v", actual, saved2)
}
}

func TestDequeueItemsEmpty(t *testing.T) {
type Vector struct {
Data string
}
tmpdir, err := ioutil.TempDir("", "testdb*")
if err != nil {
t.Error(err)
}

defer os.RemoveAll(tmpdir)

queue, err := OpenPriorityQueue(tmpdir, ASC)
if err != nil {
t.Error(err)
}

err = queue.DequeueItems(nil)
if err != nil {
t.Error(err)
}
}

func BenchmarkPriorityQueueEnqueue(b *testing.B) {
// Open test database
file := fmt.Sprintf("test_db_%d", time.Now().UnixNano())
Expand Down