Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/blob/blob_file_cache.cc",
"db/blob/blob_file_garbage.cc",
"db/blob/blob_file_meta.cc",
"db/blob/blob_file_partition_manager.cc",
"db/blob/blob_file_reader.cc",
"db/blob/blob_garbage_meter.cc",
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/blob_source.cc",
"db/blob/blob_write_batch_transformer.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ set(SOURCES
db/blob/blob_file_addition.cc
db/blob/blob_file_builder.cc
db/blob/blob_file_cache.cc
db/blob/blob_file_partition_manager.cc
db/blob/blob_file_garbage.cc
db/blob/blob_file_meta.cc
db/blob/blob_file_reader.cc
Expand All @@ -715,6 +716,7 @@ set(SOURCES
db/blob/blob_log_sequential_reader.cc
db/blob/blob_log_writer.cc
db/blob/blob_source.cc
db/blob/blob_write_batch_transformer.cc
db/blob/prefetch_buffer_collection.cc
db/builder.cc
db/c.cc
Expand Down
25 changes: 14 additions & 11 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ void ArenaWrappedDBIter::Init(
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t version_number,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem) {
bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem,
BlobFilePartitionManager* blob_partition_mgr) {
read_options_ = read_options;
if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false;
}
read_options_.total_order_seek |= ioptions.prefix_seek_opt_in_only;

db_iter_ = DBIter::NewIter(
env, read_options_, ioptions, mutable_cf_options,
ioptions.user_comparator, /*internal_iter=*/nullptr, version, sequence,
read_callback, active_mem, cfh, expose_blob_index, &arena_);
db_iter_ =
DBIter::NewIter(env, read_options_, ioptions, mutable_cf_options,
ioptions.user_comparator, /*internal_iter=*/nullptr,
version, sequence, read_callback, active_mem, cfh,
expose_blob_index, &arena_, blob_partition_mgr);

sv_number_ = version_number;
allow_refresh_ = allow_refresh;
Expand Down Expand Up @@ -254,13 +256,14 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, const SequenceNumber& sequence,
ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
bool allow_refresh, bool allow_mark_memtable_for_flush) {
bool allow_refresh, bool allow_mark_memtable_for_flush,
BlobFilePartitionManager* blob_partition_mgr) {
ArenaWrappedDBIter* db_iter = new ArenaWrappedDBIter();
db_iter->Init(env, read_options, cfh->cfd()->ioptions(),
sv->mutable_cf_options, sv->current, sequence,
sv->version_number, read_callback, cfh, expose_blob_index,
allow_refresh,
allow_mark_memtable_for_flush ? sv->mem : nullptr);
db_iter->Init(
env, read_options, cfh->cfd()->ioptions(), sv->mutable_cf_options,
sv->current, sequence, sv->version_number, read_callback, cfh,
expose_blob_index, allow_refresh,
allow_mark_memtable_for_flush ? sv->mem : nullptr, blob_partition_mgr);
if (cfh != nullptr && allow_refresh) {
db_iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
}
Expand Down
8 changes: 6 additions & 2 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ class ArenaWrappedDBIter : public Iterator {

// FIXME: we could just pass SV in for mutable cf option, version and version
// number, but this is used by SstFileReader which does not have a SV.
// `blob_partition_mgr` is passed through to DBIter so reads can resolve
// direct-write blob indexes before the blob file is registered in Version.
void Init(Env* env, const ReadOptions& read_options,
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t version_number,
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
bool expose_blob_index, bool allow_refresh,
ReadOnlyMemTable* active_mem);
ReadOnlyMemTable* active_mem,
BlobFilePartitionManager* blob_partition_mgr = nullptr);

// Store some parameters so we can refresh the iterator at a later point
// with these same params
Expand Down Expand Up @@ -144,5 +147,6 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, const SequenceNumber& sequence,
ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
bool allow_refresh, bool allow_mark_memtable_for_flush);
bool allow_refresh, bool allow_mark_memtable_for_flush,
BlobFilePartitionManager* blob_partition_mgr = nullptr);
} // namespace ROCKSDB_NAMESPACE
121 changes: 118 additions & 3 deletions db/blob/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>

#include "db/blob/blob_file_reader.h"
#include "logging/logging.h"
#include "options/cf_options.h"
#include "rocksdb/cache.h"
#include "rocksdb/slice.h"
Expand Down Expand Up @@ -38,7 +39,8 @@

Status BlobFileCache::GetBlobFileReader(
const ReadOptions& read_options, uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader) {
CacheHandleGuard<BlobFileReader>* blob_file_reader,
bool allow_footer_skip_retry) {
assert(blob_file_reader);
assert(blob_file_reader->IsEmpty());

Expand Down Expand Up @@ -73,10 +75,22 @@

{
assert(file_options_);
const Status s = BlobFileReader::Create(
Status s = BlobFileReader::Create(
*immutable_options_, read_options, *file_options_, column_family_id_,
blob_file_read_hist_, blob_file_number, io_tracer_, &reader);
blob_file_read_hist_, blob_file_number, io_tracer_,
/*skip_footer_validation=*/false, &reader);
if (!s.ok() && s.IsCorruption() && allow_footer_skip_retry) {
reader.reset();
s = BlobFileReader::Create(
*immutable_options_, read_options, *file_options_, column_family_id_,
blob_file_read_hist_, blob_file_number, io_tracer_,
/*skip_footer_validation=*/true, &reader);
}
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_options_->logger,
"BlobFileCache open failed for blob file %" PRIu64
" in CF %u: %s",
blob_file_number, column_family_id_, s.ToString().c_str());
RecordTick(statistics, NO_FILE_ERRORS);
return s;
}
Expand All @@ -99,12 +113,113 @@
return Status::OK();
}

Status BlobFileCache::OpenBlobFileReaderUncached(
const ReadOptions& read_options, uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader,
bool allow_footer_skip_retry) {
assert(blob_file_reader);
assert(!*blob_file_reader);

Statistics* const statistics = immutable_options_->stats;
RecordTick(statistics, NO_FILE_OPENS);

Status s = BlobFileReader::Create(
*immutable_options_, read_options, *file_options_, column_family_id_,
blob_file_read_hist_, blob_file_number, io_tracer_,
/*skip_footer_validation=*/false, blob_file_reader);
if (!s.ok() && s.IsCorruption() && allow_footer_skip_retry) {
blob_file_reader->reset();
s = BlobFileReader::Create(
*immutable_options_, read_options, *file_options_, column_family_id_,
blob_file_read_hist_, blob_file_number, io_tracer_,
/*skip_footer_validation=*/true, blob_file_reader);
}
if (!s.ok()) {
RecordTick(statistics, NO_FILE_ERRORS);
}
return s;
}

Status BlobFileCache::InsertBlobFileReader(
uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader,
CacheHandleGuard<BlobFileReader>* cached_blob_file_reader) {
assert(blob_file_reader);
assert(*blob_file_reader);
assert(cached_blob_file_reader);
assert(cached_blob_file_reader->IsEmpty());

const Slice key = GetSliceForKey(&blob_file_number);
MutexLock lock(&mutex_.Get(key));

TypedHandle* handle = cache_.Lookup(key);
if (handle) {
*cached_blob_file_reader = cache_.Guard(handle);
blob_file_reader->reset();
return Status::OK();
}

constexpr size_t charge = 1;
Status s = cache_.Insert(key, blob_file_reader->get(), charge, &handle);
if (!s.ok()) {
RecordTick(immutable_options_->stats, NO_FILE_ERRORS);
return s;
}

blob_file_reader->release();

Check warning on line 169 in db/blob/blob_file_cache.cc

View workflow job for this annotation

GitHub Actions / clang-tidy

the value returned by this function should not be disregarded; neglecting it may lead to errors [bugprone-unused-return-value]
*cached_blob_file_reader = cache_.Guard(handle);
return Status::OK();
}

Status BlobFileCache::RefreshBlobFileReader(
uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader,
CacheHandleGuard<BlobFileReader>* cached_blob_file_reader) {
assert(blob_file_reader);
assert(*blob_file_reader);
assert(cached_blob_file_reader);
assert(cached_blob_file_reader->IsEmpty());

const Slice key = GetSliceForKey(&blob_file_number);
MutexLock lock(&mutex_.Get(key));

TypedHandle* handle = cache_.Lookup(key);
if (handle) {
BlobFileReader* const cached_reader = cache_.Value(handle);
assert(cached_reader != nullptr);

// Active direct-write blob files can grow between refresh attempts. Keep
// whichever reader observed the larger on-disk size so an older refresh
// cannot overwrite a newer one that another thread already installed.
if (cached_reader->GetFileSize() >= (*blob_file_reader)->GetFileSize()) {
*cached_blob_file_reader = cache_.Guard(handle);
blob_file_reader->reset();
return Status::OK();
}

cache_.Release(handle);
cache_.get()->Erase(key);
}

constexpr size_t charge = 1;
Status s = cache_.Insert(key, blob_file_reader->get(), charge, &handle);
if (!s.ok()) {
RecordTick(immutable_options_->stats, NO_FILE_ERRORS);
return s;
}

blob_file_reader->release();

Check warning on line 211 in db/blob/blob_file_cache.cc

View workflow job for this annotation

GitHub Actions / clang-tidy

the value returned by this function should not be disregarded; neglecting it may lead to errors [bugprone-unused-return-value]
*cached_blob_file_reader = cache_.Guard(handle);
return Status::OK();
}

void BlobFileCache::Evict(uint64_t blob_file_number) {
// NOTE: sharing same Cache with table_cache
const Slice key = GetSliceForKey(&blob_file_number);

assert(cache_);

MutexLock lock(&mutex_.Get(key));
cache_.get()->Erase(key);
}

Expand Down
28 changes: 27 additions & 1 deletion db/blob/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,35 @@ class BlobFileCache {
BlobFileCache(const BlobFileCache&) = delete;
BlobFileCache& operator=(const BlobFileCache&) = delete;

// Returns a cached reader for `blob_file_number`, opening and caching it on
// miss. If `allow_footer_skip_retry` is true, a footer-validation corruption
// retries once without requiring a footer.
Status GetBlobFileReader(const ReadOptions& read_options,
uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader);
CacheHandleGuard<BlobFileReader>* blob_file_reader,
bool allow_footer_skip_retry = false);

// Opens a blob file reader without inserting it into the cache.
Status OpenBlobFileReaderUncached(
const ReadOptions& read_options, uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader,
bool allow_footer_skip_retry = false);

// Inserts a freshly opened uncached reader unless another thread already
// cached the same blob file.
Status InsertBlobFileReader(
uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader,
CacheHandleGuard<BlobFileReader>* cached_blob_file_reader);

// Installs a refresh-opened reader into the cache. If another thread has
// already cached a reader opened on at least as large a file size, keep that
// reader instead so a racing refresh cannot reintroduce an older active-file
// view after the blob file grows.
Status RefreshBlobFileReader(
uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader,
CacheHandleGuard<BlobFileReader>* cached_blob_file_reader);

// Called when a blob file is obsolete to ensure it is removed from the cache
// to avoid effectively leaking the open file and assicated memory
Expand Down
Loading
Loading