Skip to content

Commit 96e3fbd

Browse files
committed
Restores segments on start up
1 parent d04c24c commit 96e3fbd

File tree

6 files changed

+158
-2
lines changed

6 files changed

+158
-2
lines changed

internal/queue/queue.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ func NewQueue(config *config.Config) (*Queue, error) {
1717
return &Queue{segments: segments}, nil
1818
}
1919

20+
func RestoreQueue(config *config.Config) (*Queue, error) {
21+
segments, err := storage.RestoreSegments(config, storage.NewIndex())
22+
if err != nil {
23+
return nil, err
24+
}
25+
return &Queue{segments: segments}, nil
26+
}
27+
2028
func (q *Queue) Enqueue(data []byte) (int, error) {
2129
return q.segments.Append(data)
2230
}

internal/storage/segment.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@ func NewSegment(id int, config *config.Config) (*Segment, error) {
2929
return &Segment{store: store, id: id, mu: &sync.RWMutex{}}, nil
3030
}
3131

32+
// RestoreSegment restores a segment from the given ID and configuration.
33+
func RestoreSegment(id int, config *config.Config) (*Segment, error) {
34+
filePath := fmt.Sprintf("%s/segment-%d", config.SegmentsRoot(), id)
35+
store, err := RestoreStore(filePath)
36+
if err != nil {
37+
return nil, err
38+
}
39+
40+
return &Segment{store: store, id: id, mu: &sync.RWMutex{}}, nil
41+
}
42+
3243
// Append appends data to the segment.
3344
// It locks the segment for writing to ensure thread safety.
3445
// It returns the offset of the appended data or an error if the operation fails.

internal/storage/segments.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ package storage
33
import (
44
"ashishkujoy/queue/internal/config"
55
"fmt"
6+
"os"
7+
"slices"
8+
"strconv"
9+
"strings"
610
"sync"
711
)
812

@@ -35,6 +39,77 @@ func NewSegments(config *config.Config, index *Index) (*Segments, error) {
3539
}, nil
3640
}
3741

42+
// RestoreSegments restores segments from the given configuration and index.
43+
func RestoreSegments(c *config.Config, index *Index) (*Segments, error) {
44+
segmentIds, err := getSegmentIds(c.SegmentsRoot())
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
closedSegments, err2 := restoreSegmentsById(c, segmentIds)
50+
if err2 != nil {
51+
return nil, err2
52+
}
53+
54+
activeSegmentId, activeSegment, err := createActiveSegment(c, segmentIds, err)
55+
if err != nil {
56+
return nil, err
57+
}
58+
return &Segments{
59+
config: c,
60+
active: activeSegment,
61+
id: activeSegmentId,
62+
index: index,
63+
closedSegments: closedSegments,
64+
mu: &sync.Mutex{},
65+
}, nil
66+
}
67+
68+
func createActiveSegment(c *config.Config, segmentIds []int, err error) (int, *Segment, error) {
69+
activeSegmentId := 0
70+
if len(segmentIds) != 0 {
71+
activeSegmentId = segmentIds[len(segmentIds)-1]
72+
}
73+
activeSegment, err := NewSegment(activeSegmentId+1, c)
74+
if err != nil {
75+
return 0, nil, err
76+
}
77+
return activeSegmentId + 1, activeSegment, nil
78+
}
79+
80+
func restoreSegmentsById(c *config.Config, segmentIds []int) ([]*Segment, error) {
81+
var closedSegments []*Segment
82+
for _, segmentId := range segmentIds {
83+
segment, err := RestoreSegment(segmentId, c)
84+
if err != nil {
85+
return nil, err
86+
}
87+
closedSegments = append(closedSegments, segment)
88+
}
89+
return closedSegments, nil
90+
}
91+
92+
func getSegmentIds(segmentsRoot string) ([]int, error) {
93+
entries, err := os.ReadDir(segmentsRoot)
94+
if err != nil {
95+
return nil, err
96+
}
97+
var segmentIds []int
98+
for _, entry := range entries {
99+
if !strings.HasPrefix(entry.Name(), "segment-") {
100+
continue
101+
}
102+
segmentIdStr := strings.TrimPrefix(entry.Name(), "segment-")
103+
id, err := strconv.Atoi(segmentIdStr)
104+
if err != nil {
105+
return nil, err
106+
}
107+
segmentIds = append(segmentIds, id)
108+
}
109+
slices.Sort(segmentIds)
110+
return segmentIds, nil
111+
}
112+
38113
// Append appends data to the active segment.
39114
// If the active segment is full, it rolls over to a new segment.
40115
func (s *Segments) Append(data []byte) (int, error) {

internal/storage/segments_test.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ func TestSegmentRollOver(t *testing.T) {
7474
}
7575

7676
func TestReadFromARolledOverSegment(t *testing.T) {
77-
config := config.NewConfig(createTempDir("TestReadFromARolledOverSegment"), "tmp", 10, time.Second)
77+
cfg := config.NewConfig(createTempDir("TestReadFromARolledOverSegment"), "tmp", 10, time.Second)
7878
defer removeTempDir("TestReadFromARolledOverSegment")
7979
index := NewIndex()
80-
segments, err := NewSegments(config, index)
80+
segments, err := NewSegments(cfg, index)
8181
assert.NoError(t, err)
8282

8383
messageId1, _ := segments.Append([]byte("Hello Segments"))
@@ -90,3 +90,26 @@ func TestReadFromARolledOverSegment(t *testing.T) {
9090
assert.Equal(t, []byte("Another Hello Segments"), data2)
9191
assert.Equal(t, []byte("Hello Segments"), data1)
9292
}
93+
94+
func TestRestoreASegment(t *testing.T) {
95+
cfg := config.NewConfig(createTempDir("TestRestoreASegment"), "tmp", 10, time.Second)
96+
defer removeTempDir("TestRestoreASegment")
97+
index := NewIndex()
98+
segments, err := NewSegments(cfg, index)
99+
assert.NoError(t, err)
100+
101+
messageId1, _ := segments.Append([]byte("Hello Segments"))
102+
messageId2, _ := segments.Append([]byte("Another Hello Segments"))
103+
messageId3, _ := segments.Append([]byte("Yet Another Hello Segments"))
104+
_ = segments.Close()
105+
restoreSegments, err := RestoreSegments(cfg, index)
106+
assert.NoError(t, err)
107+
assert.Equal(t, 3, len(restoreSegments.closedSegments))
108+
109+
data2, _ := restoreSegments.Read(messageId2)
110+
data1, _ := restoreSegments.Read(messageId1)
111+
data3, _ := restoreSegments.Read(messageId3)
112+
assert.Equal(t, []byte("Another Hello Segments"), data2)
113+
assert.Equal(t, []byte("Hello Segments"), data1)
114+
assert.Equal(t, []byte("Yet Another Hello Segments"), data3)
115+
}

internal/storage/store.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,19 @@ func NewStore(filePath string) (*Store, error) {
3838
}, nil
3939
}
4040

41+
// RestoreStore restores a store from a file at the given filePath.
42+
// It opens the file for reading only
43+
// TODO: Add error handling of writing to a restored store.(closed segment)
44+
func RestoreStore(filePath string) (*Store, error) {
45+
reader, err := os.OpenFile(filePath, os.O_RDONLY, 0644)
46+
if err != nil {
47+
return nil, err
48+
}
49+
return &Store{
50+
reader: reader,
51+
}, nil
52+
}
53+
4154
func (s *Store) Append(data []byte) (int, error) {
4255
currentOffset := s.offset
4356
sizeBuff := make([]byte, 4)

internal/storage/store_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,29 @@ func TestReadFromANewStoreAfterClose(t *testing.T) {
6565
assert.Equal(t, data1, []byte("Hello World"))
6666
assert.Equal(t, data2, []byte("Another Hello World"))
6767
}
68+
69+
func TestRestoreStore(t *testing.T) {
70+
filePath := fmt.Sprintf("%s/%s", os.TempDir(), "TestReadFromANewStoreAfterClose")
71+
defer os.Remove(filePath)
72+
store, err := NewStore(filePath)
73+
defer store.Close()
74+
75+
assert.NoError(t, err)
76+
77+
offset1, _ := store.Append([]byte("Hello World"))
78+
offset2, _ := store.Append([]byte("Another Hello World"))
79+
err = store.Close()
80+
assert.NoError(t, err)
81+
82+
restoreStore, err := RestoreStore(filePath)
83+
assert.Nil(t, restoreStore.writer)
84+
assert.NoError(t, err)
85+
86+
data1, err := restoreStore.Read(offset1)
87+
assert.NoError(t, err)
88+
assert.Equal(t, data1, []byte("Hello World"))
89+
data2, err := restoreStore.Read(offset2)
90+
assert.NoError(t, err)
91+
assert.Equal(t, data2, []byte("Another Hello World"))
92+
assert.Equal(t, data1, []byte("Hello World"))
93+
}

0 commit comments

Comments
 (0)