Skip to content

Commit 12ec188

Browse files
committed
Persist elements indexes
1 parent 96e3fbd commit 12ec188

File tree

9 files changed

+158
-25
lines changed

9 files changed

+158
-25
lines changed

internal/config/base_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ func (c *Config) ConsumerIndexSyncInterval() time.Duration {
2121
return c.consumerIndexSyncInterval
2222
}
2323

24+
func (c *Config) IndexFilePath() string {
25+
return c.MetadataPath + "/index"
26+
}
27+
2428
func NewConfig(
2529
segmentsRoot string,
2630
metadataPath string,

internal/queue/queue.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,24 @@ type Queue struct {
99
segments *storage.Segments
1010
}
1111

12-
func NewQueue(config *config.Config) (*Queue, error) {
13-
segments, err := storage.NewSegments(config, storage.NewIndex())
12+
func NewQueue(cfg *config.Config) (*Queue, error) {
13+
index, err := storage.NewIndex(cfg)
14+
if err != nil {
15+
return nil, err
16+
}
17+
segments, err := storage.NewSegments(cfg, index)
1418
if err != nil {
1519
return nil, err
1620
}
1721
return &Queue{segments: segments}, nil
1822
}
1923

20-
func RestoreQueue(config *config.Config) (*Queue, error) {
21-
segments, err := storage.RestoreSegments(config, storage.NewIndex())
24+
func RestoreQueue(cfg *config.Config) (*Queue, error) {
25+
index, err := storage.RestoreIndex(cfg)
26+
if err != nil {
27+
return nil, err
28+
}
29+
segments, err := storage.RestoreSegments(cfg, index)
2230
if err != nil {
2331
return nil, err
2432
}

internal/queue/queue_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type QueueService struct {
1111
}
1212

1313
func NewQueueService(config *config.Config) (*QueueService, error) {
14-
queue, err := NewQueue(config)
14+
queue, err := RestoreQueue(config)
1515
if err != nil {
1616
return nil, err
1717
}

internal/queue/queue_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ func removeTempDir(suffix string) {
2323
func TestEnqueueSingleElement(t *testing.T) {
2424
cfg := config.NewConfig(
2525
createTempDir("TestEnqueueSingleElement"),
26-
"/tmp/metadata",
26+
createTempDir("metadata"),
2727
1000,
2828
time.Second)
2929
defer removeTempDir("TestEnqueueSingleElement")
30+
defer removeTempDir("metadata")
3031
queue, err := NewQueue(cfg)
3132
assert.NoError(t, err)
3233
defer queue.Close()

internal/storage/index.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,35 @@
11
package storage
22

3+
import (
4+
"ashishkujoy/queue/internal/config"
5+
"encoding/binary"
6+
"sync"
7+
)
8+
39
type MessageEntry struct {
410
segmentId int
511
offset int
12+
elementId int
13+
}
14+
15+
func (m *MessageEntry) Encode() []byte {
16+
data := make([]byte, 24)
17+
offset := 0
18+
binary.BigEndian.PutUint64(data[offset:offset+8], uint64(m.segmentId))
19+
offset += 8
20+
binary.BigEndian.PutUint64(data[offset:offset+8], uint64(m.offset))
21+
offset += 8
22+
binary.BigEndian.PutUint64(data[offset:offset+8], uint64(m.offset))
23+
return data
24+
}
25+
26+
func (m *MessageEntry) Decode(data []byte) {
27+
offset := 0
28+
m.segmentId = int(binary.BigEndian.Uint64(data[offset : offset+8]))
29+
offset += 8
30+
m.offset = int(binary.BigEndian.Uint64(data[offset : offset+8]))
31+
offset += 8
32+
m.elementId = int(binary.BigEndian.Uint64(data[offset : offset+8]))
633
}
734

835
func NewMessageEntry(segmentId int, offset int) MessageEntry {
@@ -12,20 +39,68 @@ func NewMessageEntry(segmentId int, offset int) MessageEntry {
1239
type Index struct {
1340
entries map[int]MessageEntry
1441
elementId int
42+
store *Store
43+
mu *sync.Mutex
1544
}
1645

17-
func NewIndex() *Index {
46+
func NewIndex(cfg *config.Config) (*Index, error) {
47+
store, err := NewStore(cfg.IndexFilePath())
48+
if err != nil {
49+
return nil, err
50+
}
1851
return &Index{
1952
entries: make(map[int]MessageEntry),
53+
store: store,
2054
elementId: 0,
55+
mu: &sync.Mutex{},
56+
}, nil
57+
}
58+
59+
func RestoreIndex(cfg *config.Config) (*Index, error) {
60+
store, err := NewStore(cfg.IndexFilePath())
61+
if err != nil {
62+
return nil, err
63+
}
64+
entries, elementId, err := restoreEntries(store)
65+
if err != nil {
66+
return nil, err
67+
}
68+
return &Index{
69+
entries: entries,
70+
store: store,
71+
elementId: elementId,
72+
mu: &sync.Mutex{},
73+
}, nil
74+
}
75+
76+
func restoreEntries(store *Store) (map[int]MessageEntry, int, error) {
77+
entriesBytes, err := store.readAllEntries()
78+
entries := make(map[int]MessageEntry)
79+
if err != nil {
80+
return nil, 0, err
2181
}
82+
var elementId = 0
83+
for _, entry := range entriesBytes {
84+
messageEntry := MessageEntry{}
85+
messageEntry.Decode(entry)
86+
entries[messageEntry.elementId] = messageEntry
87+
elementId = messageEntry.elementId
88+
}
89+
return entries, elementId, nil
2290
}
2391

24-
func (i *Index) Append(messageEntry MessageEntry) int {
92+
func (i *Index) Append(messageEntry MessageEntry) (int, error) {
2593
currentElementId := i.elementId
94+
messageEntry.elementId = currentElementId
2695
i.entries[currentElementId] = messageEntry
2796
i.elementId++
28-
return currentElementId
97+
i.mu.Lock()
98+
defer i.mu.Unlock()
99+
_, err := i.store.Append(messageEntry.Encode())
100+
if err != nil {
101+
return 0, err
102+
}
103+
return currentElementId, nil
29104
}
30105

31106
func (i *Index) GetOffset(elementId int) (MessageEntry, bool) {

internal/storage/segments.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,7 @@ func (s *Segments) Append(data []byte) (int, error) {
122122
if err != nil {
123123
return 0, err
124124
}
125-
messageId := s.index.Append(NewMessageEntry(s.active.id, offset))
126-
return messageId, nil
125+
return s.index.Append(NewMessageEntry(s.active.id, offset))
127126
}
128127

129128
// Read reads data from the segment with the given message ID.

internal/storage/segments_test.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@ func removeTempDir(suffix string) {
2121
}
2222

2323
func TestAppend(t *testing.T) {
24-
cfg := config.NewConfig(createTempDir("SegmentTestAppend"), "tmp", 1000, time.Second)
24+
cfg := config.NewConfig(
25+
createTempDir("SegmentTestAppend"),
26+
createTempDir("metadata"),
27+
1000,
28+
time.Second,
29+
)
2530
defer removeTempDir("SegmentTestAppend")
26-
index := NewIndex()
31+
defer removeTempDir("metadata")
32+
index, err := NewIndex(cfg)
33+
assert.NoError(t, err)
2734
segments, err := NewSegments(cfg, index)
2835

2936
assert.NoError(t, err)
@@ -37,10 +44,16 @@ func TestAppend(t *testing.T) {
3744
}
3845

3946
func TestAppendMultipleEntry(t *testing.T) {
40-
config := config.NewConfig(createTempDir("SegmentTestAppendMultipleEntry"), "tmp", 1000, time.Second)
47+
cfg := config.NewConfig(
48+
createTempDir("SegmentTestAppendMultipleEntry"),
49+
createTempDir("metadata"),
50+
1000,
51+
time.Second,
52+
)
4153
defer removeTempDir("SegmentTestAppendMultipleEntry")
42-
index := NewIndex()
43-
segments, err := NewSegments(config, index)
54+
defer removeTempDir("metadata")
55+
index, _ := NewIndex(cfg)
56+
segments, err := NewSegments(cfg, index)
4457
assert.NoError(t, err)
4558

4659
messageId1, err := segments.Append([]byte("Hello Segments"))
@@ -59,10 +72,15 @@ func TestAppendMultipleEntry(t *testing.T) {
5972
}
6073

6174
func TestSegmentRollOver(t *testing.T) {
62-
config := config.NewConfig(createTempDir("TestSegmentRollOver1"), "tmp", 20, time.Second)
75+
cfg := config.NewConfig(
76+
createTempDir("TestSegmentRollOver1"),
77+
createTempDir("metadata"),
78+
20,
79+
time.Second,
80+
)
6381
defer removeTempDir("TestSegmentRollOver1")
64-
index := NewIndex()
65-
segments, err := NewSegments(config, index)
82+
index, _ := NewIndex(cfg)
83+
segments, err := NewSegments(cfg, index)
6684
assert.NoError(t, err)
6785

6886
segments.Append([]byte("Hello Segments"))
@@ -74,9 +92,14 @@ func TestSegmentRollOver(t *testing.T) {
7492
}
7593

7694
func TestReadFromARolledOverSegment(t *testing.T) {
77-
cfg := config.NewConfig(createTempDir("TestReadFromARolledOverSegment"), "tmp", 10, time.Second)
95+
cfg := config.NewConfig(
96+
createTempDir("TestReadFromARolledOverSegment"),
97+
createTempDir("metadata"),
98+
10,
99+
time.Second,
100+
)
78101
defer removeTempDir("TestReadFromARolledOverSegment")
79-
index := NewIndex()
102+
index, _ := NewIndex(cfg)
80103
segments, err := NewSegments(cfg, index)
81104
assert.NoError(t, err)
82105

@@ -92,9 +115,14 @@ func TestReadFromARolledOverSegment(t *testing.T) {
92115
}
93116

94117
func TestRestoreASegment(t *testing.T) {
95-
cfg := config.NewConfig(createTempDir("TestRestoreASegment"), "tmp", 10, time.Second)
118+
cfg := config.NewConfig(
119+
createTempDir("TestRestoreASegment"),
120+
createTempDir("metadata"),
121+
10,
122+
time.Second,
123+
)
96124
defer removeTempDir("TestRestoreASegment")
97-
index := NewIndex()
125+
index, _ := NewIndex(cfg)
98126
segments, err := NewSegments(cfg, index)
99127
assert.NoError(t, err)
100128

internal/storage/store.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,19 @@ func NewStore(filePath string) (*Store, error) {
4040

4141
// RestoreStore restores a store from a file at the given filePath.
4242
// It opens the file for reading only
43-
// TODO: Add error handling of writing to a restored store.(closed segment)
4443
func RestoreStore(filePath string) (*Store, error) {
44+
writer, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
45+
if err != nil {
46+
return nil, err
47+
}
48+
4549
reader, err := os.OpenFile(filePath, os.O_RDONLY, 0644)
4650
if err != nil {
4751
return nil, err
4852
}
4953
return &Store{
5054
reader: reader,
55+
writer: writer,
5156
}, nil
5257
}
5358

@@ -108,3 +113,17 @@ func (s *Store) Close() error {
108113
func (s *Store) Size() int {
109114
return s.offset
110115
}
116+
117+
func (s *Store) readAllEntries() ([][]byte, error) {
118+
var entries [][]byte
119+
offset := 0
120+
for {
121+
entry, err := s.Read(offset)
122+
if err != nil {
123+
break
124+
}
125+
entries = append(entries, entry)
126+
offset += len(entry) + 4
127+
}
128+
return entries, nil
129+
}

internal/storage/store_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ func TestRestoreStore(t *testing.T) {
8080
assert.NoError(t, err)
8181

8282
restoreStore, err := RestoreStore(filePath)
83-
assert.Nil(t, restoreStore.writer)
8483
assert.NoError(t, err)
8584

8685
data1, err := restoreStore.Read(offset1)

0 commit comments

Comments
 (0)