Skip to content

Commit 10169ca

Browse files
committed
Adds queue service
1 parent bcd0002 commit 10169ca

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package queueinternal
2+
3+
import (
4+
"ashishkujoy/queue/internal/config"
5+
"os"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestEnqueue(t *testing.T) {
12+
segmentPath := createTempDir("testEnqueue/segments")
13+
defer os.RemoveAll(segmentPath)
14+
metaDataPath := createTempDir("testEnqueue/metadata")
15+
defer os.RemoveAll(metaDataPath)
16+
config := config.NewConfigWithMetadataPath(segmentPath, metaDataPath, 1024)
17+
18+
queueService, err := NewQueueService(config)
19+
assert.NoError(t, err)
20+
21+
queueService.Enqueue([]byte("Hello World"))
22+
queueService.Enqueue([]byte("Hello World 1"))
23+
queueService.Enqueue([]byte("Hello World 2"))
24+
queueService.Enqueue([]byte("Hello World 3"))
25+
26+
data, _ := queueService.Dequeue(1)
27+
assert.Equal(t, []byte("Hello World"), data)
28+
29+
data, _ = queueService.Dequeue(1)
30+
assert.Equal(t, []byte("Hello World 1"), data)
31+
32+
data, _ = queueService.Dequeue(1)
33+
assert.Equal(t, []byte("Hello World 2"), data)
34+
35+
data, _ = queueService.Dequeue(2)
36+
assert.Equal(t, []byte("Hello World"), data)
37+
38+
data, _ = queueService.Dequeue(1)
39+
assert.Equal(t, []byte("Hello World 3"), data)
40+
}

internal/queue/queueService.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package queueinternal
2+
3+
import (
4+
"ashishkujoy/queue/internal/config"
5+
"ashishkujoy/queue/internal/consumer"
6+
)
7+
8+
type QueueService struct {
9+
queue *Queue
10+
consumerIndex *consumer.ConsumerIndex
11+
}
12+
13+
func NewQueueService(config *config.Config) (*QueueService, error) {
14+
queue, err := NewQueue(config)
15+
if err != nil {
16+
return nil, err
17+
}
18+
consumerIndex, err := consumer.NewConsumerIndex(config)
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
return &QueueService{
24+
queue: queue,
25+
consumerIndex: consumerIndex,
26+
}, nil
27+
}
28+
29+
func (qs *QueueService) Enqueue(data []byte) error {
30+
_, err := qs.queue.Enqueue(data)
31+
return err
32+
}
33+
34+
func (qs *QueueService) Dequeue(consumerId int) ([]byte, error) {
35+
index := qs.consumerIndex.ReadIndex(consumerId)
36+
data, err := qs.queue.Dequeue(index + 1)
37+
if err != nil {
38+
return nil, err
39+
}
40+
qs.consumerIndex.WriteIndex(consumerId, index+1)
41+
return data, nil
42+
}

0 commit comments

Comments
 (0)