Skip to content

Commit 3fcf72e

Browse files
committed
Increases segment size
1 parent b54572a commit 3fcf72e

File tree

3 files changed

+28
-8
lines changed

3 files changed

+28
-8
lines changed

cmd/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ func main() {
1111
conf := config.NewConfig(
1212
"data/segments",
1313
"data/metadata",
14-
1024*1024,
14+
1024*1024*10,
1515
time.Second*2,
1616
)
1717
server, err := netinternal.NewQueueServer(conf, ":50051")

internal/consumer/consumer_index.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"ashishkujoy/queue/internal/config"
66
"encoding/binary"
77
"fmt"
8+
"io/fs"
89
"os"
910
"sort"
1011
"strconv"
@@ -235,6 +236,28 @@ func (ci *ConsumerIndex) Close() error {
235236
return ci.Persist()
236237
}
237238

239+
func removeOldIndexFiles(config *config.Config, currentIndexFile string) error {
240+
entries, err := os.ReadDir(config.MetadataPath)
241+
if err != nil {
242+
return err
243+
}
244+
245+
consumerIndexes := internal.Filter(entries, func(entry fs.DirEntry) bool {
246+
return strings.Contains(entry.Name(), "consumer_index_") && !strings.Contains(currentIndexFile, entry.Name())
247+
})
248+
249+
for _, entry := range consumerIndexes {
250+
dir, _ := os.Getwd()
251+
252+
fmt.Printf("Current working dir: %s\n", dir)
253+
err := os.Remove(fmt.Sprintf("%s/%s", config.MetadataPath, entry.Name()))
254+
if err != nil {
255+
return err
256+
}
257+
}
258+
return nil
259+
}
260+
238261
func (ci *ConsumerIndex) Persist() error {
239262
snapshot := ci.CreateSnapshot()
240263
indexFile, err := createIndexFile(ci.config)
@@ -247,10 +270,7 @@ func (ci *ConsumerIndex) Persist() error {
247270
return err
248271
}
249272
ci.writer = indexFile
250-
files, err := getSortedIndexFiles(ci.config)
251-
for _, file := range files[1:] {
252-
_ = file.Close()
253-
}
273+
err = removeOldIndexFiles(ci.config, indexFile.Name())
254274
if err != nil {
255275
return err
256276
}

internal/consumer/consumer_index_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ func TestReadAndWriteIndex(t *testing.T) {
3737
assert.Equal(t, -1, index.ReadIndex(4))
3838
}
3939

40-
func TestReadFromARestoredIndex(t *testing.T) {
41-
metadataDir, err := CreateMetadataDir("2")
40+
func TestReadFromARestoredConsumerIndex(t *testing.T) {
41+
metadataDir, err := CreateMetadataDir("TestReadFromARestoredConsumerIndex")
4242
assert.NoError(t, err)
4343
defer os.RemoveAll(metadataDir)
4444

45-
cfg := config.NewConfig("/tmp", metadataDir, 1234, time.Second*100)
45+
cfg := config.NewConfig("/tmp", metadataDir, 1234, time.Second*1000)
4646
index, err := NewConsumerIndex(cfg)
4747
assert.NoError(t, err)
4848

0 commit comments

Comments
 (0)