Skip to content

Commit b54572a

Browse files
committed
Schedules consumer index persistence based on the configuration
1 parent f016e40 commit b54572a

File tree

3 files changed

+21
-13
lines changed

3 files changed

+21
-13
lines changed

cmd/server/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import (
88
)
99

1010
func main() {
11-
conf := config.NewConfig("data/segments", "data/metadata", 1024*1024, time.Second*100)
11+
conf := config.NewConfig(
12+
"data/segments",
13+
"data/metadata",
14+
1024*1024,
15+
time.Second*2,
16+
)
1217
server, err := netinternal.NewQueueServer(conf, ":50051")
1318
if err != nil {
1419
log.Fatalf("Failed to create server: %v", err)

internal/consumer/consumer_index.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,18 @@ func NewConsumerIndex(config *config.Config) (*ConsumerIndex, error) {
4141
}
4242

4343
func (ci *ConsumerIndex) schedulePersist() {
44+
fmt.Printf("Starting consumer index persistence with interval: %s\n", ci.config.ConsumerIndexSyncInterval())
4445
ticker := time.NewTicker(ci.config.ConsumerIndexSyncInterval())
4546
go func() {
4647
for {
4748
select {
4849
case <-ticker.C:
49-
ci.mu.Lock()
50+
fmt.Printf("Starting consumer index persistence\n")
5051
err := ci.Persist()
51-
ci.mu.Unlock()
5252
if err != nil {
5353
fmt.Printf("Error persisting consumer index: %v\n", err)
5454
}
55+
fmt.Printf("Done with consumer index persistence\n")
5556
}
5657
}
5758
}()
@@ -141,12 +142,14 @@ func RestoreConsumerIndex(config *config.Config) (*ConsumerIndex, error) {
141142
return nil, err
142143
}
143144

144-
return &ConsumerIndex{
145+
c := &ConsumerIndex{
145146
writer: lastIndexFile,
146147
indexes: indexes,
147148
mu: &sync.RWMutex{},
148149
config: config,
149-
}, nil
150+
}
151+
go c.schedulePersist()
152+
return c, nil
150153
}
151154

152155
// createIndexFile creates a new index file for the consumer.

internal/consumer/consumer_index_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ func TestReadAndWriteIndex(t *testing.T) {
2323
assert.NoError(t, err)
2424
defer os.RemoveAll(metadataDir)
2525

26-
config := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
27-
index, err := NewConsumerIndex(config)
26+
cfg := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
27+
index, err := NewConsumerIndex(cfg)
2828
assert.NoError(t, err)
2929

3030
index.WriteIndex(1, 10)
@@ -66,26 +66,26 @@ func TestRestoreIndexUsesTheLatestSnapshot(t *testing.T) {
6666
assert.NoError(t, err)
6767
defer os.RemoveAll(metadataDir)
6868

69-
config := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
69+
cfg := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
7070

71-
index1, err := NewConsumerIndex(config)
71+
index1, err := NewConsumerIndex(cfg)
7272
assert.NoError(t, err)
7373

7474
index1.WriteIndex(11, 10)
7575
index1.WriteIndex(12, 20)
7676
index1.WriteIndex(13, 30)
7777

78-
index1.Close()
78+
assert.NoError(t, index1.Close())
7979

80-
index2, err := RestoreConsumerIndex(config)
80+
index2, err := RestoreConsumerIndex(cfg)
8181
assert.NoError(t, err)
8282

8383
index2.WriteIndex(12, 200)
8484
index2.WriteIndex(14, 1300)
8585

86-
index2.Close()
86+
assert.NoError(t, index2.Close())
8787

88-
restoredIndex, err := RestoreConsumerIndex(config)
88+
restoredIndex, err := RestoreConsumerIndex(cfg)
8989
assert.NoError(t, err)
9090

9191
assert.Equal(t, 10, restoredIndex.ReadIndex(11))

0 commit comments

Comments
 (0)