Skip to content

Commit 4118dbb

Browse files
committed
Adds queue
1 parent 7d5393e commit 4118dbb

File tree

4 files changed

+81
-0
lines changed

4 files changed

+81
-0
lines changed

internal/queue/queue.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package queueinternal
2+
3+
import "ashishkujoy/queue/internal/storage"
4+
5+
type Queue struct {
6+
segments *storage.Segments
7+
}
8+
9+
func NewQueue(config *storage.Config) (*Queue, error) {
10+
segments, err := storage.NewSegments(config, storage.NewIndex())
11+
if err != nil {
12+
return nil, err
13+
}
14+
return &Queue{segments: segments}, nil
15+
}
16+
17+
func (q *Queue) Enqueue(data []byte) (int, error) {
18+
return q.segments.Append(data)
19+
}
20+
21+
func (q *Queue) Dequeue(id int) ([]byte, error) {
22+
return q.segments.Read(id)
23+
}
24+
25+
func (q *Queue) Close() error {
26+
return q.segments.Close()
27+
}

internal/queue/queue_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package queueinternal
2+
3+
import (
4+
"ashishkujoy/queue/internal/storage"
5+
"os"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func createTempDir(suffix string) string {
12+
dir := os.TempDir() + "/" + suffix
13+
os.MkdirAll(dir, 0755)
14+
return dir
15+
}
16+
17+
func removeTempDir(suffix string) {
18+
dir := os.TempDir() + "/" + suffix
19+
os.RemoveAll(dir)
20+
}
21+
22+
func TestEnqueueSingleElement(t *testing.T) {
23+
config := storage.NewConfig(createTempDir("TestEnqueueSingleElement"), 1000)
24+
defer removeTempDir("TestEnqueueSingleElement")
25+
queue, err := NewQueue(config)
26+
assert.NoError(t, err)
27+
defer queue.Close()
28+
29+
id, err := queue.Enqueue([]byte("First Message"))
30+
assert.NoError(t, err)
31+
32+
data, err := queue.Dequeue(id)
33+
assert.NoError(t, err)
34+
assert.Equal(t, data, []byte("First Message"))
35+
}

internal/storage/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,10 @@ type Config struct {
44
segmentsRoot string
55
maxSegmentSizeInBytes int
66
}
7+
8+
func NewConfig(segmentsRoot string, maxSegmentSizeInBytes int) *Config {
9+
return &Config{
10+
segmentsRoot: segmentsRoot,
11+
maxSegmentSizeInBytes: maxSegmentSizeInBytes,
12+
}
13+
}

internal/storage/segments.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,15 @@ func (s *Segments) rollOverSegment() error {
7777
s.active = newActiveSegment
7878
return nil
7979
}
80+
81+
func (s *Segments) Close() error {
82+
if err := s.active.Close(); err != nil {
83+
return err
84+
}
85+
for _, segment := range s.closedSegments {
86+
if err := segment.Close(); err != nil {
87+
return err
88+
}
89+
}
90+
return nil
91+
}

0 commit comments

Comments
 (0)