Skip to content

Commit 97d9151

Browse files
committed
Periodically persists consumer records
1 parent ab14db7 commit 97d9151

File tree

10 files changed

+124
-48
lines changed

10 files changed

+124
-48
lines changed

cmd/server/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ import (
44
"ashishkujoy/queue/internal/config"
55
netinternal "ashishkujoy/queue/internal/net"
66
"log"
7+
"time"
78
)
89

910
func main() {
10-
conf := config.NewConfigWithMetadataPath("data/segments", "data/metadata", 1024*1024)
11+
conf := config.NewConfig("data/segments", "data/metadata", 1024*1024, time.Second*100)
1112
server, err := netinternal.NewQueueServer(conf, ":50051")
1213
if err != nil {
1314
log.Fatalf("Failed to create server: %v", err)

internal/arrayUtils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,11 @@ func Filter[T any](elements []T, f func(T) bool) []T {
2020

2121
return filtered
2222
}
23+
24+
func Map[I any, O any](elements []I, f func(I) O) []O {
25+
var mapped []O
26+
for _, element := range elements {
27+
mapped = append(mapped, f(element))
28+
}
29+
return mapped
30+
}

internal/config/base_config.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package config
22

3+
import "time"
4+
35
type Config struct {
4-
segmentsRoot string
5-
MetadataPath string
6-
maxSegmentSizeInBytes int
6+
segmentsRoot string
7+
MetadataPath string
8+
consumerIndexSyncInterval time.Duration
9+
maxSegmentSizeInBytes int
710
}
811

912
func (c *Config) MaxSegmentSizeInBytes() int {
@@ -14,17 +17,20 @@ func (c *Config) SegmentsRoot() string {
1417
return c.segmentsRoot
1518
}
1619

17-
func NewConfig(segmentsRoot string, maxSegmentSizeInBytes int) *Config {
18-
return &Config{
19-
segmentsRoot: segmentsRoot,
20-
maxSegmentSizeInBytes: maxSegmentSizeInBytes,
21-
}
20+
func (c *Config) ConsumerIndexSyncInterval() time.Duration {
21+
return c.consumerIndexSyncInterval
2222
}
2323

24-
func NewConfigWithMetadataPath(segmentsRoot, metadataPath string, maxSegmentSizeInBytes int) *Config {
24+
func NewConfig(
25+
segmentsRoot string,
26+
metadataPath string,
27+
maxSegmentSizeInBytes int,
28+
consumerIndexSyncInterval time.Duration,
29+
) *Config {
2530
return &Config{
26-
segmentsRoot: segmentsRoot,
27-
MetadataPath: metadataPath,
28-
maxSegmentSizeInBytes: maxSegmentSizeInBytes,
31+
segmentsRoot: segmentsRoot,
32+
maxSegmentSizeInBytes: maxSegmentSizeInBytes,
33+
MetadataPath: metadataPath,
34+
consumerIndexSyncInterval: consumerIndexSyncInterval,
2935
}
3036
}

internal/consumer/consumer_index.go

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package consumer
22

33
import (
4+
"ashishkujoy/queue/internal"
45
"ashishkujoy/queue/internal/config"
56
"encoding/binary"
67
"fmt"
@@ -30,12 +31,30 @@ func NewConsumerIndex(config *config.Config) (*ConsumerIndex, error) {
3031
return nil, err
3132
}
3233

33-
return &ConsumerIndex{
34+
consumerIndex := &ConsumerIndex{
3435
writer: writer,
3536
mu: &sync.RWMutex{},
3637
indexes: make(map[int]int),
3738
config: config,
38-
}, nil
39+
}
40+
return consumerIndex, nil
41+
}
42+
43+
func (ci *ConsumerIndex) schedulePersist() {
44+
ticker := time.NewTicker(ci.config.ConsumerIndexSyncInterval())
45+
go func() {
46+
for {
47+
select {
48+
case <-ticker.C:
49+
ci.mu.Lock()
50+
err := ci.Persist()
51+
ci.mu.Unlock()
52+
if err != nil {
53+
fmt.Printf("Error persisting consumer index: %v\n", err)
54+
}
55+
}
56+
}
57+
}()
3958
}
4059

4160
func extractTimestamp(filename string) int64 {
@@ -46,12 +65,20 @@ func extractTimestamp(filename string) int64 {
4665
}
4766

4867
func getLastIndexFile(config *config.Config) (*os.File, error) {
49-
enteries, err := os.ReadDir(config.MetadataPath)
68+
files, err := getSortedIndexFiles(config)
69+
if err != nil {
70+
return nil, err
71+
}
72+
return files[0], nil
73+
}
74+
75+
func getSortedIndexFiles(config *config.Config) ([]*os.File, error) {
76+
entries, err := os.ReadDir(config.MetadataPath)
5077
if err != nil {
5178
return nil, err
5279
}
5380
var metadataTimestamp []int64
54-
for _, entry := range enteries {
81+
for _, entry := range entries {
5582
if strings.Contains(entry.Name(), "consumer_index_") {
5683
metadataTimestamp = append(metadataTimestamp, extractTimestamp(entry.Name()))
5784
}
@@ -62,10 +89,14 @@ func getLastIndexFile(config *config.Config) (*os.File, error) {
6289
sort.Slice(metadataTimestamp, func(i, j int) bool {
6390
return metadataTimestamp[i] > metadataTimestamp[j]
6491
})
65-
lastTimestamp := metadataTimestamp[0]
66-
lastIndexFile := fmt.Sprintf("%s/consumer_index_%d", config.MetadataPath, lastTimestamp)
67-
68-
return os.OpenFile(lastIndexFile, os.O_RDWR, 0666)
92+
files := internal.Map(metadataTimestamp, func(i int64) *os.File {
93+
file, err := os.OpenFile(fmt.Sprintf("%s/consumer_index_%d", config.MetadataPath, i), os.O_RDWR, 0666)
94+
if err != nil {
95+
return nil
96+
}
97+
return file
98+
})
99+
return files, nil
69100
}
70101

71102
func restoreIndexesFromFile(file *os.File) (map[int]int, error) {
@@ -78,11 +109,17 @@ func restoreIndexesFromFile(file *os.File) (map[int]int, error) {
78109
indexes := make(map[int]int, stat.Size()/8)
79110
for i := int64(0); i < indexSize; i++ {
80111
buf := make([]byte, 4)
81-
file.ReadAt(buf, int64(offset))
112+
_, err := file.ReadAt(buf, int64(offset))
113+
if err != nil {
114+
return nil, err
115+
}
82116
offset += 4
83117
consumerId := binary.BigEndian.Uint32(buf)
84118
buf = make([]byte, 4)
85-
file.ReadAt(buf, int64(offset))
119+
_, err = file.ReadAt(buf, int64(offset))
120+
if err != nil {
121+
return nil, err
122+
}
86123
offset += 4
87124
consumerIndex := binary.BigEndian.Uint32(buf)
88125
indexes[int(consumerId)] = int(consumerIndex)
@@ -192,12 +229,27 @@ func (ci *ConsumerIndex) CreateSnapshot() []byte {
192229
}
193230

194231
func (ci *ConsumerIndex) Close() error {
232+
return ci.Persist()
233+
}
234+
235+
func (ci *ConsumerIndex) Persist() error {
195236
snapshot := ci.CreateSnapshot()
196237
indexFile, err := createIndexFile(ci.config)
197238
if err != nil {
198239
return err
199240
}
200241
_, err = indexFile.Write(snapshot)
201-
indexFile.Sync()
242+
err = indexFile.Sync()
243+
if err != nil {
244+
return err
245+
}
246+
ci.writer = indexFile
247+
files, err := getSortedIndexFiles(ci.config)
248+
for _, file := range files[1:] {
249+
_ = file.Close()
250+
}
251+
if err != nil {
252+
return err
253+
}
202254
return err
203255
}

internal/consumer/consumer_index_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"ashishkujoy/queue/internal/config"
55
"os"
66
"testing"
7+
"time"
78

89
"github.com/stretchr/testify/assert"
910
)
@@ -22,7 +23,7 @@ func TestReadAndWriteIndex(t *testing.T) {
2223
assert.NoError(t, err)
2324
defer os.RemoveAll(metadataDir)
2425

25-
config := config.NewConfigWithMetadataPath("/tmp", metadataDir, 1234)
26+
config := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
2627
index, err := NewConsumerIndex(config)
2728
assert.NoError(t, err)
2829

@@ -41,17 +42,17 @@ func TestReadFromARestoredIndex(t *testing.T) {
4142
assert.NoError(t, err)
4243
defer os.RemoveAll(metadataDir)
4344

44-
config := config.NewConfigWithMetadataPath("/tmp", metadataDir, 1234)
45-
index, err := NewConsumerIndex(config)
45+
cfg := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
46+
index, err := NewConsumerIndex(cfg)
4647
assert.NoError(t, err)
4748

4849
index.WriteIndex(11, 10)
4950
index.WriteIndex(12, 20)
5051
index.WriteIndex(13, 30)
5152

52-
index.Close()
53+
assert.NoError(t, index.Close())
5354

54-
restoredIndex, err := RestoreConsumerIndex(config)
55+
restoredIndex, err := RestoreConsumerIndex(cfg)
5556
assert.NoError(t, err)
5657

5758
assert.Equal(t, 10, restoredIndex.ReadIndex(11))
@@ -65,7 +66,7 @@ func TestRestoreIndexUsesTheLatestSnapshot(t *testing.T) {
6566
assert.NoError(t, err)
6667
defer os.RemoveAll(metadataDir)
6768

68-
config := config.NewConfigWithMetadataPath("/tmp", metadataDir, 1234)
69+
config := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
6970

7071
index1, err := NewConsumerIndex(config)
7172
assert.NoError(t, err)

internal/queue/queue_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func NewQueueService(config *config.Config) (*QueueService, error) {
1515
if err != nil {
1616
return nil, err
1717
}
18-
consumerIndex, err := consumer.NewConsumerIndex(config)
18+
consumerIndex, err := consumer.RestoreConsumerIndex(config)
1919
if err != nil {
2020
return nil, err
2121
}

internal/queue/queue_service_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"ashishkujoy/queue/internal/config"
55
"os"
66
"testing"
7+
"time"
78

89
"github.com/stretchr/testify/assert"
910
)
@@ -13,7 +14,7 @@ func TestEnqueue(t *testing.T) {
1314
defer os.RemoveAll(segmentPath)
1415
metaDataPath := createTempDir("testEnqueue/metadata")
1516
defer os.RemoveAll(metaDataPath)
16-
config := config.NewConfigWithMetadataPath(segmentPath, metaDataPath, 1024)
17+
config := config.NewConfig(segmentPath, metaDataPath, 1024, time.Second)
1718

1819
queueService, err := NewQueueService(config)
1920
assert.NoError(t, err)

internal/queue/queue_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"ashishkujoy/queue/internal/config"
55
"os"
66
"testing"
7+
"time"
78

89
"github.com/stretchr/testify/assert"
910
)
@@ -20,9 +21,13 @@ func removeTempDir(suffix string) {
2021
}
2122

2223
func TestEnqueueSingleElement(t *testing.T) {
23-
config := config.NewConfig(createTempDir("TestEnqueueSingleElement"), 1000)
24+
cfg := config.NewConfig(
25+
createTempDir("TestEnqueueSingleElement"),
26+
"/tmp/metadata",
27+
1000,
28+
time.Second)
2429
defer removeTempDir("TestEnqueueSingleElement")
25-
queue, err := NewQueue(config)
30+
queue, err := NewQueue(cfg)
2631
assert.NoError(t, err)
2732
defer queue.Close()
2833

internal/storage/segment_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@ import (
44
"fmt"
55
"os"
66
"testing"
7+
"time"
78

89
"ashishkujoy/queue/internal/config"
910

1011
"github.com/stretchr/testify/assert"
1112
)
1213

1314
func TestAppendToTheSegment(t *testing.T) {
14-
config := config.NewConfig(os.TempDir(), 1024)
15+
cfg := config.NewConfig(os.TempDir(), "/tmp", 1024, time.Second*5)
1516
defer os.Remove(fmt.Sprintf("%s/segment_10", os.TempDir()))
1617

17-
segment, err := NewSegment(10, config)
18+
segment, err := NewSegment(10, cfg)
1819
assert.NoError(t, err)
1920
defer segment.Close()
2021

@@ -23,10 +24,10 @@ func TestAppendToTheSegment(t *testing.T) {
2324
}
2425

2526
func TestReadFromTheSegment(t *testing.T) {
26-
config := config.NewConfig(os.TempDir(), 1024)
27+
cfg := config.NewConfig(os.TempDir(), "/tmp", 1024, time.Second)
2728
defer os.Remove(fmt.Sprintf("%s/segment_10", os.TempDir()))
2829

29-
segment, err := NewSegment(10, config)
30+
segment, err := NewSegment(10, cfg)
3031
assert.NoError(t, err)
3132
defer segment.Close()
3233

@@ -46,10 +47,10 @@ func TestReadFromTheSegment(t *testing.T) {
4647
}
4748

4849
func TestReadNonExistingMessage(t *testing.T) {
49-
config := config.NewConfig(os.TempDir(), 1024)
50+
cfg := config.NewConfig(os.TempDir(), "/tmp", 1024, time.Second)
5051
defer os.Remove(fmt.Sprintf("%s/segment_12", os.TempDir()))
5152

52-
segment, err := NewSegment(12, config)
53+
segment, err := NewSegment(12, cfg)
5354
assert.NoError(t, err)
5455
defer segment.Close()
5556

@@ -58,18 +59,18 @@ func TestReadNonExistingMessage(t *testing.T) {
5859
}
5960

6061
func TestReadFromReloadedSegment(t *testing.T) {
61-
config := config.NewConfig(os.TempDir(), 1024)
62+
cfg := config.NewConfig(os.TempDir(), "/tmp", 1024, time.Second)
6263
defer os.Remove(fmt.Sprintf("%s/segment_13", os.TempDir()))
6364

64-
segment, err := NewSegment(12, config)
65+
segment, err := NewSegment(12, cfg)
6566
assert.NoError(t, err)
6667

6768
offset1, _ := segment.Append([]byte("Hello world"))
6869
offset2, _ := segment.Append([]byte("Hello earth"))
6970
offset3, _ := segment.Append([]byte("Hello India"))
7071
segment.Close()
7172

72-
restoredSegment, err := NewSegment(12, config)
73+
restoredSegment, err := NewSegment(12, cfg)
7374
assert.NoError(t, err)
7475

7576
data1, _ := restoredSegment.Read(offset1)

0 commit comments

Comments
 (0)