Skip to content

Commit 7d5393e

Browse files
committed
Adds segment
1 parent a70d37b commit 7d5393e

File tree

7 files changed

+223
-48
lines changed

7 files changed

+223
-48
lines changed

internal/storage/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package storage
22

33
type Config struct {
4-
segmentsRoot string
4+
segmentsRoot string
5+
maxSegmentSizeInBytes int
56
}

internal/storage/index.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,34 @@
11
package storage
22

3+
type MessageEntry struct {
4+
segmentId int
5+
offset int
6+
}
7+
8+
func NewMessageEntry(segmentId int, offset int) MessageEntry {
9+
return MessageEntry{segmentId: segmentId, offset: offset}
10+
}
11+
312
type Index struct {
4-
entries map[int]int
13+
entries map[int]MessageEntry
514
elementId int
615
}
716

817
func NewIndex() *Index {
918
return &Index{
10-
entries: make(map[int]int),
19+
entries: make(map[int]MessageEntry),
1120
elementId: 0,
1221
}
1322
}
1423

15-
func (i *Index) Append(offset int) int {
24+
func (i *Index) Append(messageEntry MessageEntry) int {
1625
currentElementId := i.elementId
17-
i.entries[currentElementId] = offset
26+
i.entries[currentElementId] = messageEntry
1827
i.elementId++
1928
return currentElementId
2029
}
2130

22-
func (i *Index) GetOffset(elementId int) (int, bool) {
31+
func (i *Index) GetOffset(elementId int) (MessageEntry, bool) {
2332
v, ok := i.entries[elementId]
2433
return v, ok
2534
}

internal/storage/segment.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,37 +5,36 @@ import (
55
)
66

77
type Segment struct {
8+
id int
89
store *Store
9-
index *Index
1010
}
1111

12-
func NewSegment(startingIndex int, config *Config, index *Index) (*Segment, error) {
13-
filePath := fmt.Sprintf("%s/segment-%d", config.segmentsRoot, startingIndex)
12+
func (s *Segment) CloseWriter() error {
13+
return s.store.CloseWriter()
14+
}
15+
16+
func NewSegment(id int, config *Config) (*Segment, error) {
17+
filePath := fmt.Sprintf("%s/segment-%d", config.segmentsRoot, id)
1418
store, err := NewStore(filePath)
1519
if err != nil {
1620
return nil, err
1721
}
1822

19-
return &Segment{store: store, index: index}, nil
23+
return &Segment{store: store, id: id}, nil
2024
}
2125

2226
func (s *Segment) Append(data []byte) (int, error) {
23-
offset, err := s.store.Append(data)
24-
if err != nil {
25-
return 0, err
26-
}
27-
elementIndex := s.index.Append(offset)
28-
return elementIndex, nil
27+
return s.store.Append(data)
2928
}
3029

31-
func (s *Segment) Read(id int) ([]byte, error) {
32-
offset, ok := s.index.GetOffset(id)
33-
if !ok {
34-
return nil, fmt.Errorf("unknown message id: %d", id)
35-
}
30+
func (s *Segment) Read(offset int) ([]byte, error) {
3631
return s.store.Read(offset)
3732
}
3833

34+
func (s *Segment) isFull(maxSizeInBytes int) bool {
35+
return float64(s.store.Size()) >= float64(maxSizeInBytes)*0.9
36+
}
37+
3938
func (s *Segment) Close() error {
4039
return s.store.Close()
4140
}

internal/storage/segment_test.go

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,40 +12,33 @@ func TestAppendToTheSegment(t *testing.T) {
1212
config := &Config{segmentsRoot: os.TempDir()}
1313
defer os.Remove(fmt.Sprintf("%s/segment_10", os.TempDir()))
1414

15-
index := NewIndex()
16-
17-
segment, err := NewSegment(10, config, index)
15+
segment, err := NewSegment(10, config)
1816
assert.NoError(t, err)
1917
defer segment.Close()
2018

21-
id, err := segment.Append([]byte("Hello World"))
19+
_, err = segment.Append([]byte("Hello World"))
2220
assert.NoError(t, err)
23-
assert.Equal(t, id, 0)
24-
_, ok := index.entries[0]
25-
assert.True(t, ok)
2621
}
2722

2823
func TestReadFromTheSegment(t *testing.T) {
2924
config := &Config{segmentsRoot: os.TempDir()}
3025
defer os.Remove(fmt.Sprintf("%s/segment_10", os.TempDir()))
3126

32-
index := NewIndex()
33-
34-
segment, err := NewSegment(10, config, index)
27+
segment, err := NewSegment(10, config)
3528
assert.NoError(t, err)
3629
defer segment.Close()
3730

38-
id1, err := segment.Append([]byte("Hello World"))
31+
offset1, err := segment.Append([]byte("Hello World"))
3932
assert.NoError(t, err)
4033

41-
id2, err := segment.Append([]byte("Bye World"))
34+
offset2, err := segment.Append([]byte("Bye World"))
4235
assert.NoError(t, err)
4336

44-
data2, err := segment.Read(id2)
37+
data2, err := segment.Read(offset2)
4538
assert.NoError(t, err)
4639
assert.Equal(t, []byte("Bye World"), data2)
4740

48-
data1, err := segment.Read(id1)
41+
data1, err := segment.Read(offset1)
4942
assert.NoError(t, err)
5043
assert.Equal(t, []byte("Hello World"), data1)
5144
}
@@ -54,9 +47,7 @@ func TestReadNonExistingMessage(t *testing.T) {
5447
config := &Config{segmentsRoot: os.TempDir()}
5548
defer os.Remove(fmt.Sprintf("%s/segment_12", os.TempDir()))
5649

57-
index := NewIndex()
58-
59-
segment, err := NewSegment(12, config, index)
50+
segment, err := NewSegment(12, config)
6051
assert.NoError(t, err)
6152
defer segment.Close()
6253

@@ -68,22 +59,20 @@ func TestReadFromReloadedSegment(t *testing.T) {
6859
config := &Config{segmentsRoot: os.TempDir()}
6960
defer os.Remove(fmt.Sprintf("%s/segment_13", os.TempDir()))
7061

71-
index := NewIndex()
72-
73-
segment, err := NewSegment(12, config, index)
62+
segment, err := NewSegment(12, config)
7463
assert.NoError(t, err)
7564

76-
segment.Append([]byte("Hello world"))
77-
segment.Append([]byte("Hello earth"))
78-
segment.Append([]byte("Hello India"))
65+
offset1, _ := segment.Append([]byte("Hello world"))
66+
offset2, _ := segment.Append([]byte("Hello earth"))
67+
offset3, _ := segment.Append([]byte("Hello India"))
7968
segment.Close()
8069

81-
restoredSegment, err := NewSegment(12, config, index)
70+
restoredSegment, err := NewSegment(12, config)
8271
assert.NoError(t, err)
8372

84-
data1, _ := restoredSegment.Read(0)
85-
data2, _ := restoredSegment.Read(1)
86-
data3, _ := restoredSegment.Read(2)
73+
data1, _ := restoredSegment.Read(offset1)
74+
data2, _ := restoredSegment.Read(offset2)
75+
data3, _ := restoredSegment.Read(offset3)
8776

8877
assert.Equal(t, []byte("Hello world"), data1)
8978
assert.Equal(t, []byte("Hello earth"), data2)

internal/storage/segments.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package storage
2+
3+
import "fmt"
4+
5+
type Segments struct {
6+
config *Config
7+
active *Segment
8+
id int
9+
index *Index
10+
closedSegments []*Segment
11+
}
12+
13+
func NewSegments(config *Config, index *Index) (*Segments, error) {
14+
segment, err := NewSegment(0, config)
15+
if err != nil {
16+
return nil, err
17+
}
18+
19+
return &Segments{
20+
config: config,
21+
active: segment,
22+
index: index,
23+
closedSegments: make([]*Segment, 0),
24+
}, nil
25+
}
26+
27+
func (s *Segments) Append(data []byte) (int, error) {
28+
if s.active.isFull(s.config.maxSegmentSizeInBytes) {
29+
if err := s.rollOverSegment(); err != nil {
30+
return 0, err
31+
}
32+
}
33+
offset, err := s.active.Append(data)
34+
if err != nil {
35+
return 0, err
36+
}
37+
messageId := s.index.Append(NewMessageEntry(s.active.id, offset))
38+
return messageId, nil
39+
}
40+
41+
func (s *Segments) Read(messageId int) ([]byte, error) {
42+
entry, ok := s.index.GetOffset(messageId)
43+
if !ok {
44+
return nil, fmt.Errorf("unknown message id: %d", messageId)
45+
}
46+
segment, err := s.findSegment(entry.segmentId)
47+
if err != nil {
48+
return nil, err
49+
}
50+
return segment.Read(entry.offset)
51+
}
52+
53+
func (s *Segments) Flush() {
54+
s.active.store.Flush()
55+
}
56+
57+
func (s *Segments) findSegment(segmentId int) (*Segment, error) {
58+
if s.active.id == segmentId {
59+
return s.active, nil
60+
}
61+
for _, segment := range s.closedSegments {
62+
if segment.id == segmentId {
63+
return segment, nil
64+
}
65+
}
66+
return nil, fmt.Errorf("unknown segment %d", segmentId)
67+
}
68+
69+
func (s *Segments) rollOverSegment() error {
70+
s.active.CloseWriter()
71+
s.closedSegments = append(s.closedSegments, s.active)
72+
s.id++
73+
newActiveSegment, err := NewSegment(s.id, s.config)
74+
if err != nil {
75+
return err
76+
}
77+
s.active = newActiveSegment
78+
return nil
79+
}

internal/storage/segments_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package storage
2+
3+
import (
4+
"os"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func createTempDir(suffix string) string {
11+
dir := os.TempDir() + "/" + suffix
12+
os.MkdirAll(dir, 0755)
13+
return dir
14+
}
15+
16+
func removeTempDir(suffix string) {
17+
dir := os.TempDir() + "/" + suffix
18+
os.RemoveAll(dir)
19+
}
20+
21+
func TestAppend(t *testing.T) {
22+
config := &Config{maxSegmentSizeInBytes: 1000, segmentsRoot: createTempDir("SegmentTestAppend")}
23+
defer removeTempDir("SegmentTestAppend")
24+
index := NewIndex()
25+
segments, err := NewSegments(config, index)
26+
27+
assert.NoError(t, err)
28+
29+
messageId, err := segments.Append([]byte("Hello Segments"))
30+
assert.NoError(t, err)
31+
32+
data, err := segments.Read(messageId)
33+
assert.NoError(t, err)
34+
assert.Equal(t, []byte("Hello Segments"), data)
35+
}
36+
37+
func TestAppendMultipleEntry(t *testing.T) {
38+
config := &Config{maxSegmentSizeInBytes: 1000, segmentsRoot: createTempDir("SegmentTestAppendMultipleEntry")}
39+
defer removeTempDir("SegmentTestAppendMultipleEntry")
40+
index := NewIndex()
41+
segments, err := NewSegments(config, index)
42+
assert.NoError(t, err)
43+
44+
messageId1, err := segments.Append([]byte("Hello Segments"))
45+
assert.NoError(t, err)
46+
47+
messageId2, err := segments.Append([]byte("Another Hello Segments"))
48+
assert.NoError(t, err)
49+
50+
data2, err := segments.Read(messageId2)
51+
assert.NoError(t, err)
52+
assert.Equal(t, "Another Hello Segments", string(data2))
53+
54+
data1, err := segments.Read(messageId1)
55+
assert.NoError(t, err)
56+
assert.Equal(t, []byte("Hello Segments"), data1)
57+
}
58+
59+
func TestSegmentRollOver(t *testing.T) {
60+
config := &Config{maxSegmentSizeInBytes: 10, segmentsRoot: createTempDir("TestSegmentRollOver")}
61+
defer removeTempDir("TestSegmentRollOver")
62+
index := NewIndex()
63+
segments, err := NewSegments(config, index)
64+
assert.NoError(t, err)
65+
66+
segments.Append([]byte("Hello Segments"))
67+
assert.Equal(t, 0, len(segments.closedSegments))
68+
69+
segments.Append([]byte("Another Hello Segments"))
70+
assert.NoError(t, err)
71+
assert.Equal(t, 1, len(segments.closedSegments))
72+
}
73+
74+
func TestReadFromARolledOverSegment(t *testing.T) {
75+
config := &Config{maxSegmentSizeInBytes: 10, segmentsRoot: createTempDir("TestSegmentRollOver")}
76+
defer removeTempDir("TestSegmentRollOver")
77+
index := NewIndex()
78+
segments, err := NewSegments(config, index)
79+
assert.NoError(t, err)
80+
81+
messageId1, _ := segments.Append([]byte("Hello Segments"))
82+
messageId2, _ := segments.Append([]byte("Another Hello Segments"))
83+
84+
assert.Equal(t, 1, len(segments.closedSegments))
85+
86+
data2, _ := segments.Read(messageId2)
87+
data1, _ := segments.Read(messageId1)
88+
assert.Equal(t, []byte("Another Hello Segments"), data2)
89+
assert.Equal(t, []byte("Hello Segments"), data1)
90+
}

internal/storage/store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ type Store struct {
1111
offset int
1212
}
1313

14+
func (s *Store) CloseWriter() error {
15+
return s.writer.Close()
16+
}
17+
1418
func NewStore(filePath string) (*Store, error) {
1519
writer, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
1620
if err != nil {
@@ -87,3 +91,7 @@ func (s *Store) Close() error {
8791

8892
return s.writer.Close()
8993
}
94+
95+
func (s *Store) Size() int {
96+
return s.offset
97+
}

0 commit comments

Comments
 (0)