Skip to content

Commit 0ca9659

Browse files
authored
Compaction without tombstone (#33)
1 parent d5fa566 commit 0ca9659

File tree

5 files changed

+127
-17
lines changed

5 files changed

+127
-17
lines changed

simpledb/compaction.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package simpledb
22

33
import (
44
"errors"
5-
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
6-
"github.com/thomasjungblut/go-sstables/simpledb/proto"
7-
"github.com/thomasjungblut/go-sstables/skiplist"
8-
"github.com/thomasjungblut/go-sstables/sstables"
95
"log"
106
"os"
117
"path/filepath"
128
"sort"
139
"strings"
1410
"time"
11+
12+
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
13+
"github.com/thomasjungblut/go-sstables/simpledb/proto"
14+
"github.com/thomasjungblut/go-sstables/skiplist"
15+
"github.com/thomasjungblut/go-sstables/sstables"
1516
)
1617

1718
func backgroundCompaction(db *DB) {
@@ -115,7 +116,10 @@ func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, er
115116
}
116117
}()
117118

118-
// TODO(thomas): this includes tombstones, do we really need to keep them?
119+
// we need to keep it if the sstable is not the first with this key.
120+
// SS1(KEY1=toto) SS2(KEY2=deleted) in this case the KEY2 can be removed in SS2
121+
// SS1(KEY2=toto) SS2(KEY2=deleted) in this case the KEY2 can't be removed in SS2
122+
119123
err = sstables.NewSSTableMerger(db.cmp).MergeCompact(iterators, writer, sstables.ScanReduceLatestWins)
120124
if err != nil {
121125
return nil, err

simpledb/compaction_test.go

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ package simpledb
22

33
import (
44
"fmt"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
59
"github.com/stretchr/testify/assert"
10+
"github.com/thomasjungblut/go-sstables/memstore"
11+
"github.com/thomasjungblut/go-sstables/sstables"
612
"github.com/thomasjungblut/go-sstables/sstables/proto"
7-
"testing"
813
)
914

1015
func TestExecCompactionLessFilesThanExpected(t *testing.T) {
@@ -47,3 +52,93 @@ func TestExecCompactionSameContent(t *testing.T) {
4752
// for cleanups
4853
assert.Nil(t, db.sstableManager.currentReader.Close())
4954
}
55+
56+
func writeSSTableWithDataInDatabaseFolder(t *testing.T, db *DB, p string) {
57+
fakeTablePath := filepath.Join(db.basePath, p)
58+
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
59+
mStore := memstore.NewMemStore()
60+
for i := 0; i < 1000; i++ {
61+
assert.Nil(t, mStore.Add([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i))))
62+
}
63+
assert.Nil(t, mStore.Flush(
64+
sstables.WriteBasePath(fakeTablePath),
65+
sstables.WithKeyComparator(db.cmp),
66+
))
67+
}
68+
69+
func writeSSTableWithTombstoneInDatabaseFolder(t *testing.T, db *DB, p string) {
70+
fakeTablePath := filepath.Join(db.basePath, p)
71+
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
72+
mStore := memstore.NewMemStore()
73+
74+
// delete all key between 500 and 800
75+
for i := 500; i < 800; i++ {
76+
assert.Nil(t, mStore.Tombstone([]byte(fmt.Sprintf("%d", i))))
77+
}
78+
assert.Nil(t, mStore.FlushWithTombstones(
79+
sstables.WriteBasePath(fakeTablePath),
80+
sstables.WithKeyComparator(db.cmp),
81+
))
82+
}
83+
84+
func TestExecCompactionWithTombstone(t *testing.T) {
85+
db := newOpenedSimpleDB(t, "simpledb_compactionSameContent")
86+
defer cleanDatabaseFolder(t, db)
87+
// we'll close the database to mock some internals directly, yes it's very hacky
88+
closeDatabase(t, db)
89+
db.closed = false
90+
db.compactionThreshold = 0
91+
92+
writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
93+
// only one SStable with holes should shrink
94+
writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
95+
assert.Nil(t, db.reconstructSSTables())
96+
// 1000 initial + 300 Tombstone on second table
97+
assert.Equal(t, 1300, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))
98+
99+
compactionMeta, err := executeCompaction(db)
100+
assert.Nil(t, err)
101+
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
102+
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
103+
fmt.Print(compactionMeta)
104+
err = db.sstableManager.reflectCompactionResult(compactionMeta)
105+
assert.NoError(t, err)
106+
v, err := db.Get("512")
107+
assert.ErrorIs(t, err, ErrNotFound)
108+
assert.Equal(t, "", v)
109+
// for cleanups
110+
assert.Nil(t, db.sstableManager.currentReader.Close())
111+
112+
// check size of compacted sstable
113+
assert.Equal(t, 700, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
114+
}
115+
func TestExecCompactionWithTombstoneRewriten(t *testing.T) {
116+
db := newOpenedSimpleDB(t, "simpledb_compactionSameContent")
117+
defer cleanDatabaseFolder(t, db)
118+
// we'll close the database to mock some internals directly, yes it's very hacky
119+
closeDatabase(t, db)
120+
db.closed = false
121+
db.compactionThreshold = 0
122+
123+
writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
124+
// the tombstone are overwrite
125+
writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
126+
assert.Nil(t, db.reconstructSSTables())
127+
assert.Equal(t, 1300, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))
128+
129+
compactionMeta, err := executeCompaction(db)
130+
assert.Nil(t, err)
131+
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
132+
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
133+
fmt.Print(compactionMeta)
134+
err = db.sstableManager.reflectCompactionResult(compactionMeta)
135+
assert.NoError(t, err)
136+
v, err := db.Get("512")
137+
assert.NoError(t, err)
138+
assert.Equal(t, "512", v)
139+
// for cleanups
140+
assert.Nil(t, db.sstableManager.currentReader.Close())
141+
142+
// check size of compacted sstable
143+
assert.Equal(t, 1000, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
144+
}

simpledb/db.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ type DatabaseI interface {
5151
}
5252

5353
type compactionAction struct {
54-
pathsToCompact []string
55-
totalRecords uint64
54+
pathsToCompact []string
55+
totalRecords uint64
56+
canRemoveTombstone bool // if the compaction don't start from the first sstable we cannot remove tombstone
5657
}
5758

5859
type memStoreFlushAction struct {

simpledb/sstable_manager.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package simpledb
22

33
import (
44
"fmt"
5-
"github.com/thomasjungblut/go-sstables/simpledb/proto"
6-
"github.com/thomasjungblut/go-sstables/skiplist"
7-
"github.com/thomasjungblut/go-sstables/sstables"
8-
"golang.org/x/exp/slices"
95
"os"
106
"path/filepath"
117
"sort"
128
"sync"
9+
10+
"github.com/thomasjungblut/go-sstables/simpledb/proto"
11+
"github.com/thomasjungblut/go-sstables/skiplist"
12+
"github.com/thomasjungblut/go-sstables/sstables"
13+
"golang.org/x/exp/slices"
1314
)
1415

1516
type SSTableManager struct {
@@ -117,21 +118,26 @@ func (s *SSTableManager) candidateTablesForCompaction(compactionMaxSizeBytes uin
117118
defer s.managerLock.RUnlock()
118119

119120
numRecords := uint64(0)
121+
canRemoveTombstone := false
120122
var paths []string
121123
for i := 0; i < len(s.allSSTableReaders); i++ {
122124
reader := s.allSSTableReaders[i]
123125
// avoid the EmptySStableReader (or empty files) and only include small enough SSTables
124126
if reader.MetaData().NumRecords > 0 && reader.MetaData().TotalBytes < compactionMaxSizeBytes {
125127
paths = append(paths, reader.BasePath())
126128
numRecords += reader.MetaData().NumRecords
129+
if i == 0 {
130+
canRemoveTombstone = true
131+
}
127132
}
128133
}
129134

130135
sort.Strings(paths)
131136

132137
return compactionAction{
133-
pathsToCompact: paths,
134-
totalRecords: numRecords,
138+
pathsToCompact: paths,
139+
totalRecords: numRecords,
140+
canRemoveTombstone: canRemoveTombstone,
135141
}
136142
}
137143

sstables/super_sstable_reader.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package sstables
22

33
import (
44
"errors"
5+
"strings"
6+
57
"github.com/thomasjungblut/go-sstables/skiplist"
68
"github.com/thomasjungblut/go-sstables/sstables/proto"
7-
"strings"
89
)
910

1011
// SuperSSTableReader unifies several sstables under one single reader with the same interface.
@@ -108,8 +109,11 @@ func ScanReduceLatestWins(key []byte, values [][]byte, context []int) ([]byte, [
108109
maxCtxIndex = i
109110
}
110111
}
111-
112-
return key, values[maxCtxIndex]
112+
val := values[maxCtxIndex]
113+
if len(val) == 0 {
114+
return nil, nil
115+
}
116+
return key, val
113117
}
114118

115119
func (s SuperSSTableReader) Close() (err error) {

0 commit comments

Comments
 (0)