Skip to content
This repository was archived by the owner on Nov 4, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion plugin/pmem_scache/pmem_scache.mk
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pmem_scache_SOURCES = scache/pmem_scache.cc
pmem_scache_HEADERS = scache/pmem_scache.h scache/pmem_scache_util.h
pmem_scache_LDFLAGS = -lpmem -u pmem_scache_reg
#pmem_scache_LDFLAGS = -lpmem -ldouble-conversion -lfmtd -lglog -lthrift-core -lthriftprotocol -lthriftcpp2 -lthrifttype -lthriftmetadata -lthriftfrozen2 -lrpcmetadata -lfolly -lcachelib_allocator -lcachelib_common -lcachelib_datatype -lcachelib_shm -lcachelib_navy -u pmem_scache_reg
#pmem_scache_LDFLAGS = -lpmem -lcachelib_allocator -L/home/chenyou/code/WIP/docker_code/CacheLib/opt/cachelib/lib64 -lfolly -L/home/chenyou/code/WIP/docker_code/CacheLib/opt/cachelib/lib64 -lfmtd -lglog -ldouble-conversion -lcachelib_common -lcachelib_datatype -lcachelib_shm -lcachelib_navy -u pmem_scache_reg
pmem_scache_LDFLAGS = -lpmem -lcachelib_allocator -lcachelib_shm -lcachelib_navy -lcachelib_common -lthriftcpp2 -lthriftfrozen2 -lthriftmetadata -lthrifttype -lthriftprotocol -lasync -lwangle -lfizz -ltransport -lrpcmetadata -lthrift-core -lconcurrency -lfollybenchmark -lfolly_exception_counter -lfolly_exception_tracer -lfolly_exception_tracer_base -lfolly -lfmtd -lgflags_debug -lglog -ldouble-conversion -lboost_context -u pmem_scache_reg
87 changes: 77 additions & 10 deletions plugin/pmem_scache/scache/pmem_scache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,37 @@
namespace ROCKSDB_NAMESPACE {

static std::unordered_map<std::string, OptionTypeInfo> scache_type_info = {
{"is_kmem_dax",
{offsetof(struct PMemSecondaryCacheOptions, is_kmem_dax),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"path",
{offsetof(struct PMemSecondaryCacheOptions, path), OptionType::kString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"capacity",
{offsetof(struct PMemSecondaryCacheOptions, capacity), OptionType::kSizeT,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"ratio",
{offsetof(struct PMemSecondaryCacheOptions, ratio), OptionType::kDouble,
{"bucket_power",
{offsetof(struct PMemSecondaryCacheOptions, bucket_power), OptionType::kUInt32T,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"locks_power",
{offsetof(struct PMemSecondaryCacheOptions, locks_power), OptionType::kUInt32T,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
};
//using MemoryTierCacheConfig = typename facebook::cachelib::MemoryTierCacheConfig;
using LruAllocatorConfig = facebook::cachelib::CacheAllocatorConfig<facebook::cachelib::LruAllocator>;
using CacheKey = typename CacheLibAllocator::Key;
void PMemSecondaryCache::initialize_cache(const PMemSecondaryCacheOptions& option) {
LruAllocatorConfig cfg;
//.configureMemoryTiers(config_tier).validate(); // will throw if bad config
cfg.setCacheSize(option.capacity)
.setCacheName("MultiTierCache")
.enableCachePersistence(option.path)
.setAccessConfig(
{option.bucket_power, option.locks_power}) // assuming caching 20 million items
.configureMemoryTiers({
facebook::cachelib::MemoryTierCacheConfig::fromShm().setRatio(1),
facebook::cachelib::MemoryTierCacheConfig::fromFile(option.path + "/file1").setRatio(1)})
.validate(); // will throw if bad config
cache_lib_ = std::make_unique<CacheLibAllocator>(CacheLibAllocator::SharedMemNew, cfg);
default_pool_ = cache_lib_->addPool("default", cache_lib_->getCacheMemoryStats().cacheSize);
}

PMemSecondaryCache::PMemSecondaryCache(const PMemSecondaryCacheOptions& opt)
: opt_(opt) {
Expand Down Expand Up @@ -57,7 +74,14 @@ Status PMemSecondaryCache::Insert(const Slice& key, void* value,
return Status::IOError("failed to compress");
}

auto* entry = new CacheEntry(allocator_);
auto alloc_handle = cache_lib_->allocate(default_pool_, key.data(), output.size());
if (!alloc_handle) {
return Status::IOError("cache may fail to evict due to too many pending writes"); // cache may fail to evict due to too many pending writes
}
std::memcpy(alloc_handle->getMemory(), output.c_str(), output.size());
cache_lib_->insertOrReplace(alloc_handle);
/*
auto* entry = new CacheEntry(allocator_);
entry->data = (char*)allocator_->Allocate(output.size());
entry->size = output.size();
if (entry->data == nullptr) {
Expand All @@ -69,21 +93,58 @@ Status PMemSecondaryCache::Insert(const Slice& key, void* value,
PMEM_F_MEM_NONTEMPORAL);

s = cache_->Insert(key, entry, output.size(),
[](const Slice& /*key*/, void* val) -> void {
[](const Slice& key, void* val) -> void {
delete static_cast<CacheEntry*>(val);
});
if (!s.ok()) {
fprintf(stderr, "internal lru cache failed to insert entry\n");
}
*/
return Status::OK();
}

std::unique_ptr<SecondaryCacheResultHandle> PMemSecondaryCache::Lookup(
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) {
std::string key_str = key.ToString();

std::unique_ptr<SecondaryCacheResultHandle> secondary_handle;
CacheLibAllocator::Key cacheKey(key.data(), key.size());
CacheLibAllocator::ReadHandle handle = cache_lib_->find(cacheKey);
auto* item = handle.get();
if(item) {
void* value = nullptr;
size_t charge = 0;
Status s;
size_t csize = item->getSize();
const char* cdata = (const char*)item->getMemory();
UncompressionContext uncompressionContext(kSnappyCompression);
UncompressionInfo uncompressionInfo(uncompressionContext,
UncompressionDict::GetEmptyDict(),
kSnappyCompression);
size_t uncompress_size;
std::string output;
CacheAllocationPtr uncompress_ptr =
UncompressData(uncompressionInfo, cdata, csize, &uncompress_size, -1);
char* uncompress_raw = uncompress_ptr.get();

if (!uncompress_ptr) {
fprintf(stderr, "failed to decompress\n");
// TODO release cachelib handle
//cache_->Release(handle);
return secondary_handle;
}

size_t size = DecodeFixed64(uncompress_raw);
uncompress_raw += sizeof(uint64_t);
s = create_cb(uncompress_raw, size, &value, &charge);
if (s.ok()) {
// TODO null handle ?
secondary_handle.reset(
new PMemSCacheResultHandle(/*cache_.get()*/nullptr, nullptr, value, charge));
} else {
// TODO relase Cachelib handler
//cache_->Release(handle);
}
}
/*
Cache::Handle* handle = cache_->Lookup(key);
if (handle) {
void* value = nullptr;
Expand Down Expand Up @@ -119,11 +180,13 @@ std::unique_ptr<SecondaryCacheResultHandle> PMemSecondaryCache::Lookup(
cache_->Release(handle);
}
}
*/

return secondary_handle;
}

Status PMemSecondaryCache::PrepareOptions(const ConfigOptions& /*config_options*/) {
/*
if (opt_.capacity < (1L * 1024 * 1024 * 1024)) {
return Status::InvalidArgument("capacity should be larger than 1GB");
}
Expand All @@ -143,6 +206,10 @@ Status PMemSecondaryCache::PrepareOptions(const ConfigOptions& /*config_options*
}
cache_ = NewLRUCache(opt_.capacity * opt_.ratio, 6, true, 0.0, allocator_,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
*/
// TODO cachelib
initialize_cache(opt_);

return Status::OK();
}

Expand Down
31 changes: 25 additions & 6 deletions plugin/pmem_scache/scache/pmem_scache.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,36 @@
#pragma once

#include <libpmem.h>

#include <thread>

#include <cachelib/allocator/CacheAllocator.h>
#include <cachelib/allocator/CacheAllocatorConfig.h>
#include <cachelib/allocator/MemoryTierCacheConfig.h>
//#include <folly/init/Init.h>

#include "pmem_scache_util.h"

namespace ROCKSDB_NAMESPACE {

using CacheLibAllocator = facebook::cachelib::LruAllocator; // or Lru2QAllocator, or TinyLFUAllocator

struct PMemSecondaryCacheOptions {
static const char* kName() { return "PMemSecondaryCacheOptions"; }
bool is_kmem_dax = false;
std::string path;
size_t capacity = 32L * 1024 * 1024 * 1024;
double ratio = 0.85;

unsigned int bucket_power = 25;

unsigned int locks_power = 10;
};


class PMemSecondaryCache : public SecondaryCache {
public:
explicit PMemSecondaryCache(const PMemSecondaryCacheOptions& opt);
~PMemSecondaryCache() override {
cache_.reset();
//cache_.reset();
}

Status PrepareOptions(const ConfigOptions& config_options) override;
Expand All @@ -38,7 +48,11 @@ class PMemSecondaryCache : public SecondaryCache {
const Slice& key, const Cache::CreateCallback& create_cb,
bool /*wait*/) override;

void Erase(const Slice& key) override { cache_->Erase(key); }
void Erase(const Slice& key) override {
//cache_->Erase(key);
CacheLibAllocator::Key cacheKey(key.data(), key.size());
cache_lib_->remove(cacheKey);
}

void WaitAll(std::vector<SecondaryCacheResultHandle*> handles) override {
for (SecondaryCacheResultHandle* handle : handles) {
Expand All @@ -51,9 +65,14 @@ class PMemSecondaryCache : public SecondaryCache {

private:

std::shared_ptr<Cache> cache_;
void initialize_cache(const PMemSecondaryCacheOptions& option);

//std::shared_ptr<Cache> cache_;
PMemSecondaryCacheOptions opt_;
std::shared_ptr<MemoryAllocator> allocator_;
//std::shared_ptr<MemoryAllocator> allocator_;

std::unique_ptr<CacheLibAllocator> cache_lib_;
facebook::cachelib::PoolId default_pool_;
};

} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion plugin/pmem_scache/scache/pmem_scache_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PMemSCacheResultHandle : public SecondaryCacheResultHandle {
size_(size),
is_ready_(true) {}

~PMemSCacheResultHandle() override { cache_->Release(handle_); }
~PMemSCacheResultHandle() override { }

bool IsReady() override { return is_ready_; }

Expand Down