Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ type pebbleDB struct {
func newPebbleDB(dir string) DB {
cache := pebble.NewCache(cacheSize)
defer cache.Unref()
filterCache := pebble.NewCache(cacheSize)
defer filterCache.Unref()
indexCache := pebble.NewCache(cacheSize)
defer indexCache.Unref()
opts := &pebble.Options{
Cache: cache,
FilterCache: filterCache,
IndexCache: indexCache,
Comparer: mvccComparer,
DisableWAL: disableWAL,
FormatMajorVersion: pebble.FormatNewest,
Expand Down
3 changes: 2 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,8 @@ func (d *DB) Close() error {
close(d.closedCh)

defer d.opts.Cache.Unref()

defer d.opts.FilterCache.Unref()
defer d.opts.IndexCache.Unref()
for d.mu.compact.compactingCount > 0 || d.mu.compact.flushing {
d.mu.compact.cond.Wait()
}
Expand Down
6 changes: 4 additions & 2 deletions metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,10 @@ func randomOptions(
}
}

opts.BytesPerSync = 1 << uint(rng.Intn(28)) // 1B - 256MB
opts.Cache = cache.New(1 << uint(rng.Intn(30))) // 1B - 1GB
opts.BytesPerSync = 1 << uint(rng.Intn(28)) // 1B - 256MB
opts.Cache = cache.New(1 << uint(rng.Intn(30))) // 1B - 1GB
opts.FilterCache = cache.New(1 << uint(rng.Intn(30))) // 1B - 1GB
opts.IndexCache = cache.New(1 << uint(rng.Intn(30))) // 1B - 1GB
opts.DisableWAL = rng.Intn(2) == 0
opts.FlushDelayDeleteRange = time.Millisecond * time.Duration(5*rng.Intn(245)) // 5-250ms
opts.FlushDelayRangeKey = time.Millisecond * time.Duration(5*rng.Intn(245)) // 5-250ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func WithReason(ctx context.Context, reason Reason) context.Context { return ctx

// WithBlockType creates a context that has an associated BlockType (which ends up in
// traces created under that context).
func WithBlockType(ctx context.Context, blockType BlockType) context.Context { return ctx }
func WithBlockType(ctx context.Context, blockType BlockType) context.Context {
ctx = context.WithValue(ctx, "blockType", blockType)
return ctx
}

// WithLevel creates a context that has an associated level (which ends up in
// traces created under that context).
Expand Down
12 changes: 12 additions & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
} else {
opts.Cache.Ref()
}
if opts.FilterCache == nil {
opts.FilterCache = cache.New(cacheDefaultSize)
} else {
opts.FilterCache.Ref()
}
if opts.IndexCache == nil {
opts.IndexCache = cache.New(cacheDefaultSize)
} else {
opts.IndexCache.Ref()
}

d := &DB{
cacheID: opts.Cache.NewID(),
Expand Down Expand Up @@ -195,6 +205,8 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
// the tableCache, then the tableCache will also release its
// reference to the cache.
opts.Cache.Unref()
opts.FilterCache.Unref()
opts.IndexCache.Unref()

if d.tableCache != nil {
_ = d.tableCache.close()
Expand Down
8 changes: 7 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ type Options struct {
// Cache is used to cache uncompressed blocks from sstables.
//
// The default cache size is 8 MB.
Cache *cache.Cache
Cache *cache.Cache
FilterCache *cache.Cache
IndexCache *cache.Cache

// LoadBlockSema, if set, is used to limit the number of blocks that can be
// loaded (i.e. read from the filesystem) in parallel. Each load acquires one
Expand Down Expand Up @@ -1732,6 +1734,8 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
var readerOpts sstable.ReaderOptions
if o != nil {
readerOpts.Cache = o.Cache
readerOpts.FilterCache = o.FilterCache
readerOpts.IndexCache = o.IndexCache
readerOpts.LoadBlockSema = o.LoadBlockSema
readerOpts.Comparer = o.Comparer
readerOpts.Filters = o.Filters
Expand All @@ -1751,6 +1755,8 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab
writerOpts.TableFormat = format
if o != nil {
writerOpts.Cache = o.Cache
writerOpts.FilterCache = o.FilterCache
writerOpts.IndexCache = o.IndexCache
writerOpts.Comparer = o.Comparer
if o.Merger != nil {
writerOpts.MergerName = o.Merger.Name
Expand Down
8 changes: 6 additions & 2 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ type ReaderOptions struct {
// Cache is used to cache uncompressed blocks from sstables.
//
// The default cache size is a zero-size cache.
Cache *cache.Cache
Cache *cache.Cache
FilterCache *cache.Cache
IndexCache *cache.Cache

// LoadBlockSema, if set, is used to limit the number of blocks that can be
// loaded (i.e. read from the filesystem) in parallel. Each load acquires one
Expand Down Expand Up @@ -187,7 +189,9 @@ type WriterOptions struct {
// Cache is used to cache uncompressed blocks from sstables.
//
// The default is a nil cache.
Cache *cache.Cache
Cache *cache.Cache
FilterCache *cache.Cache
IndexCache *cache.Cache

// Comparer defines a total ordering over the space of []byte keys: a 'less
// than' relationship. The same comparison algorithm must be used for reads
Expand Down
23 changes: 21 additions & 2 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ type Reader struct {
// Close implements DB.Close, as documented in the pebble package.
func (r *Reader) Close() error {
r.opts.Cache.Unref()
r.opts.FilterCache.Unref()
r.opts.IndexCache.Unref()

if r.readable != nil {
r.err = firstError(r.err, r.readable.Close())
Expand Down Expand Up @@ -524,7 +526,14 @@ func (r *Reader) readBlock(
stats *base.InternalIteratorStats,
bufferPool *BufferPool,
) (handle bufferHandle, _ error) {
if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
dbCache := r.opts.Cache
if ctx.Value("blockType") == objiotracing.FilterBlock {
dbCache = r.opts.FilterCache
} else if ctx.Value("blockType") == objiotracing.MetadataBlock {
dbCache = r.opts.IndexCache
}

if h := dbCache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
// Cache hit.
if readHandle != nil {
readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen))
Expand Down Expand Up @@ -641,7 +650,7 @@ func (r *Reader) readBlock(
if decompressed.buf.Valid() {
return bufferHandle{b: decompressed.buf}, nil
}
h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v)
h := dbCache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v)
return bufferHandle{h: h}, nil
}

Expand Down Expand Up @@ -1114,6 +1123,16 @@ func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption
} else {
r.opts.Cache.Ref()
}
if r.opts.FilterCache == nil {
r.opts.FilterCache = cache.New(0)
} else {
r.opts.FilterCache.Ref()
}
if r.opts.IndexCache == nil {
r.opts.IndexCache = cache.New(0)
} else {
r.opts.IndexCache.Ref()
}

if f == nil {
r.err = errors.New("pebble/table: nil file")
Expand Down
2 changes: 2 additions & 0 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ func (c *tableCacheShard) evict(fileNum base.DiskFileNum, dbOpts *tableCacheOpts
}

dbOpts.opts.Cache.EvictFile(dbOpts.cacheID, fileNum)
dbOpts.opts.FilterCache.EvictFile(dbOpts.cacheID, fileNum)
dbOpts.opts.IndexCache.EvictFile(dbOpts.cacheID, fileNum)
}

// removeDB evicts any nodes which have a reference to the DB
Expand Down
8 changes: 5 additions & 3 deletions tool/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,11 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef)
}()

opts := sstable.ReaderOptions{
Cache: cache,
Comparer: f.opts.Comparer,
Filters: f.opts.Filters,
Cache: cache,
FilterCache: cache,
IndexCache: cache,
Comparer: f.opts.Comparer,
Filters: f.opts.Filters,
}
readable, err := sstable.NewSimpleReadable(tf)
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions tool/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,13 @@ func (s *sstableT) newReader(f vfs.File) (*sstable.Reader, error) {
if err != nil {
return nil, err
}
cache := pebble.NewCache(128 << 20 /* 128 MB */)
o := sstable.ReaderOptions{
Cache: pebble.NewCache(128 << 20 /* 128 MB */),
Comparer: s.opts.Comparer,
Filters: s.opts.Filters,
Cache: cache,
FilterCache: cache,
IndexCache: cache,
Comparer: s.opts.Comparer,
Filters: s.opts.Filters,
}
defer o.Cache.Unref()
return sstable.NewReader(readable, o, s.comparers, s.mergers,
Expand Down