Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@

# dque - a fast embedded durable queue for Go

[![Go Report Card](https://goreportcard.com/badge/github.com/joncrlsn/dque)](https://goreportcard.com/report/github.com/joncrlsn/dque)
[![GoDoc](https://godoc.org/github.com/joncrlsn/dque?status.svg)](https://godoc.org/github.com/joncrlsn/dque)
This is a fork of a [joncrlsn/dque](github.com/joncrlsn/dque) that ads Prepend functionality.

[![Go Report Card](https://goreportcard.com/badge/github.com/iseletsk/dque)](https://goreportcard.com/report/github.com/iseletsk/dque)
[![GoDoc](https://godoc.org/github.com/iseletsk/dque?status.svg)](https://godoc.org/github.com/iseletsk/dque)

dque is:

Expand Down
30 changes: 30 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,36 @@ func (q *DQue) Close() error {
return nil
}

// Prepend adds a slice of objects to the beginning of the queue
// The operation is expected to run very infrequently, and will lead to sub-optimal results overall
// It also breaks Size functionality
func (q *DQue) Prepend(objs []interface{}) error {
// This is heavy-handed but its safe
q.mutex.Lock()
defer q.mutex.Unlock()

if q.fileLock == nil {
return ErrQueueClosed
}

// For the simplicity sake, we always prepend in a new segment, no matter number of objects.
seg, err := newQueueSegment(q.fullPath, q.firstSegment.number-1, q.turbo, q.builder)
if err != nil {
return errors.Wrapf(err, "error creating new queue segment: %d.", q.firstSegment.number-1)
}
if err := q.firstSegment.close(); err != nil {
return errors.Wrapf(err, "error closing first segment file #%d.", q.firstSegment.number)
}
q.firstSegment = seg
for _, obj := range objs {
if err := q.firstSegment.add(obj); err != nil {
return errors.Wrap(err, "error prepending item to the first segment")
}
}
q.emptyCond.Broadcast()
return nil
}

// Enqueue adds an item to the end of the queue
func (q *DQue) Enqueue(obj interface{}) error {
// This is heavy-handed but its safe
Expand Down
63 changes: 63 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,69 @@ func item2Builder() interface{} {
return &item2{}
}

func itemsGen(from, to int) []interface{} {
result := make([]interface{}, to-from)
for i := 0; i < to-from; i++ {
result[i] = &item2{from + i}
}
return result
}

// Test prepend at various stages of execution
func TestQueue_PrependLoop(t *testing.T) {
testQueue_PrependLoop(t, true /* true=turbo */)
testQueue_PrependLoop(t, false /* true=turbo */)
}

func testQueue_PrependLoop(t *testing.T, turbo bool) {
qName := "test1"
if err := os.RemoveAll(qName); err != nil {
t.Fatal("Error removing queue directory", err)
}

// Create a new queue with segment size of 3
q := newQ(t, qName, turbo)

for i := 0; i < 4; i++ {
if err := q.Enqueue(&item2{i}); err != nil {
t.Fatal("Error enqueueing", err)
}
}
if err := q.Prepend(itemsGen(4, 10)); err != nil {
t.Fatal("Error prepending", err)
}
checkQueue(t, q, []int{4, 5, 6, 7, 8, 9, 0, 1})
if err := q.Prepend(itemsGen(10, 15)); err != nil {
t.Fatal("Error prepending", err)
}
checkQueue(t, q, []int{10, 11, 12, 13})
for i := 15; i < 20; i++ {
if err := q.Enqueue(&item2{i}); err != nil {
t.Fatal("Error enqueueing", err)
}
}
checkQueue(t, q, []int{14, 2, 3, 15, 16, 17, 18, 19})
if err := os.RemoveAll(qName); err != nil {
t.Fatal("Error cleaning up the queue directory", err)
}
}

func checkQueue(t *testing.T, q *dque.DQue, values []int) {
for _, i := range values {
obj, err := q.Dequeue()
if err != nil {
t.Fatal("Unable to get element", err)
}
i2, ok := obj.(*item2)
if !ok {
t.Fatalf("Invalid object type %v", obj)
}
if i2.Id != i {
t.Fatalf("Got wrong element (should be %d): %v", i, i2)
}
}
}

// Adds 1 and removes 1 in a loop to ensure that when we've filled
// up the first segment that we delete it and move on to the next segment
func TestQueue_AddRemoveLoop(t *testing.T) {
Expand Down