diff --git a/plugin/pmem_scache/pmem_scache.mk b/plugin/pmem_scache/pmem_scache.mk index 79ace00..6842362 100644 --- a/plugin/pmem_scache/pmem_scache.mk +++ b/plugin/pmem_scache/pmem_scache.mk @@ -1,3 +1,5 @@ pmem_scache_SOURCES = scache/pmem_scache.cc pmem_scache_HEADERS = scache/pmem_scache.h scache/pmem_scache_util.h -pmem_scache_LDFLAGS = -lpmem -u pmem_scache_reg \ No newline at end of file +#pmem_scache_LDFLAGS = -lpmem -ldouble-conversion -lfmtd -lglog -lthrift-core -lthriftprotocol -lthriftcpp2 -lthrifttype -lthriftmetadata -lthriftfrozen2 -lrpcmetadata -lfolly -lcachelib_allocator -lcachelib_common -lcachelib_datatype -lcachelib_shm -lcachelib_navy -u pmem_scache_reg +#pmem_scache_LDFLAGS = -lpmem -lcachelib_allocator -L/home/chenyou/code/WIP/docker_code/CacheLib/opt/cachelib/lib64 -lfolly -L/home/chenyou/code/WIP/docker_code/CacheLib/opt/cachelib/lib64 -lfmtd -lglog -ldouble-conversion -lcachelib_common -lcachelib_datatype -lcachelib_shm -lcachelib_navy -u pmem_scache_reg +pmem_scache_LDFLAGS = -lpmem -lcachelib_allocator -lcachelib_shm -lcachelib_navy -lcachelib_common -lthriftcpp2 -lthriftfrozen2 -lthriftmetadata -lthrifttype -lthriftprotocol -lasync -lwangle -lfizz -ltransport -lrpcmetadata -lthrift-core -lconcurrency -lfollybenchmark -lfolly_exception_counter -lfolly_exception_tracer -lfolly_exception_tracer_base -lfolly -lfmtd -lgflags_debug -lglog -ldouble-conversion -lboost_context -u pmem_scache_reg \ No newline at end of file diff --git a/plugin/pmem_scache/scache/pmem_scache.cc b/plugin/pmem_scache/scache/pmem_scache.cc index 52e5918..850e8c2 100644 --- a/plugin/pmem_scache/scache/pmem_scache.cc +++ b/plugin/pmem_scache/scache/pmem_scache.cc @@ -10,20 +10,37 @@ namespace ROCKSDB_NAMESPACE { static std::unordered_map scache_type_info = { - {"is_kmem_dax", - {offsetof(struct PMemSecondaryCacheOptions, is_kmem_dax), - OptionType::kBoolean, OptionVerificationType::kNormal, - OptionTypeFlags::kNone}}, {"path", {offsetof(struct PMemSecondaryCacheOptions, path), OptionType::kString, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, {"capacity", {offsetof(struct PMemSecondaryCacheOptions, capacity), OptionType::kSizeT, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, - {"ratio", - {offsetof(struct PMemSecondaryCacheOptions, ratio), OptionType::kDouble, + {"bucket_power", + {offsetof(struct PMemSecondaryCacheOptions, bucket_power), OptionType::kUInt32T, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"locks_power", + {offsetof(struct PMemSecondaryCacheOptions, locks_power), OptionType::kUInt32T, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, }; +//using MemoryTierCacheConfig = typename facebook::cachelib::MemoryTierCacheConfig; +using LruAllocatorConfig = facebook::cachelib::CacheAllocatorConfig; +using CacheKey = typename CacheLibAllocator::Key; +void PMemSecondaryCache::initialize_cache(const PMemSecondaryCacheOptions& option) { + LruAllocatorConfig cfg; + //.configureMemoryTiers(config_tier).validate(); // will throw if bad config + cfg.setCacheSize(option.capacity) + .setCacheName("MultiTierCache") + .enableCachePersistence(option.path) + .setAccessConfig( + {option.bucket_power, option.locks_power}) // assuming caching 20 million items + .configureMemoryTiers({ + facebook::cachelib::MemoryTierCacheConfig::fromShm().setRatio(1), + facebook::cachelib::MemoryTierCacheConfig::fromFile(option.path + "/file1").setRatio(1)}) + .validate(); // will throw if bad config + cache_lib_ = std::make_unique(CacheLibAllocator::SharedMemNew, cfg); + default_pool_ = cache_lib_->addPool("default", cache_lib_->getCacheMemoryStats().cacheSize); +} PMemSecondaryCache::PMemSecondaryCache(const PMemSecondaryCacheOptions& opt) : opt_(opt) { @@ -57,7 +74,14 @@ Status PMemSecondaryCache::Insert(const Slice& key, void* value, return Status::IOError("failed to compress"); } - auto* entry = new CacheEntry(allocator_); + auto alloc_handle = cache_lib_->allocate(default_pool_, key.data(), output.size()); + if (!alloc_handle) { + return Status::IOError("cache may fail to evict due to too many pending writes"); // cache may fail to evict due to too many pending writes + } + std::memcpy(alloc_handle->getMemory(), output.c_str(), output.size()); + cache_lib_->insertOrReplace(alloc_handle); + /* +auto* entry = new CacheEntry(allocator_); entry->data = (char*)allocator_->Allocate(output.size()); entry->size = output.size(); if (entry->data == nullptr) { @@ -69,21 +93,58 @@ Status PMemSecondaryCache::Insert(const Slice& key, void* value, PMEM_F_MEM_NONTEMPORAL); s = cache_->Insert(key, entry, output.size(), - [](const Slice& /*key*/, void* val) -> void { + [](const Slice& key, void* val) -> void { delete static_cast(val); }); if (!s.ok()) { fprintf(stderr, "internal lru cache failed to insert entry\n"); } + */ return Status::OK(); } std::unique_ptr PMemSecondaryCache::Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) { - std::string key_str = key.ToString(); - std::unique_ptr secondary_handle; + CacheLibAllocator::Key cacheKey(key.data(), key.size()); + CacheLibAllocator::ReadHandle handle = cache_lib_->find(cacheKey); + auto* item = handle.get(); + if(item) { + void* value = nullptr; + size_t charge = 0; + Status s; + size_t csize = item->getSize(); + const char* cdata = (const char*)item->getMemory(); + UncompressionContext uncompressionContext(kSnappyCompression); + UncompressionInfo uncompressionInfo(uncompressionContext, + UncompressionDict::GetEmptyDict(), + kSnappyCompression); + size_t uncompress_size; + std::string output; + CacheAllocationPtr uncompress_ptr = + UncompressData(uncompressionInfo, cdata, csize, &uncompress_size, -1); + char* uncompress_raw = uncompress_ptr.get(); + + if (!uncompress_ptr) { + fprintf(stderr, "failed to decompress\n"); + // TODO release cachelib handle + //cache_->Release(handle); + return secondary_handle; + } + size_t size = DecodeFixed64(uncompress_raw); + uncompress_raw += sizeof(uint64_t); + s = create_cb(uncompress_raw, size, &value, &charge); + if (s.ok()) { + // TODO null handle ? + secondary_handle.reset( + new PMemSCacheResultHandle(/*cache_.get()*/nullptr, nullptr, value, charge)); + } else { + // TODO relase Cachelib handler + //cache_->Release(handle); + } + } +/* Cache::Handle* handle = cache_->Lookup(key); if (handle) { void* value = nullptr; @@ -119,11 +180,13 @@ std::unique_ptr PMemSecondaryCache::Lookup( cache_->Release(handle); } } + */ return secondary_handle; } Status PMemSecondaryCache::PrepareOptions(const ConfigOptions& /*config_options*/) { + /* if (opt_.capacity < (1L * 1024 * 1024 * 1024)) { return Status::InvalidArgument("capacity should be larger than 1GB"); } @@ -143,6 +206,10 @@ Status PMemSecondaryCache::PrepareOptions(const ConfigOptions& /*config_options* } cache_ = NewLRUCache(opt_.capacity * opt_.ratio, 6, true, 0.0, allocator_, kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + */ + // TODO cachelib + initialize_cache(opt_); + return Status::OK(); } diff --git a/plugin/pmem_scache/scache/pmem_scache.h b/plugin/pmem_scache/scache/pmem_scache.h index 0fd7d80..44e9129 100644 --- a/plugin/pmem_scache/scache/pmem_scache.h +++ b/plugin/pmem_scache/scache/pmem_scache.h @@ -4,18 +4,28 @@ #pragma once #include + #include +#include +#include +#include +//#include + #include "pmem_scache_util.h" namespace ROCKSDB_NAMESPACE { +using CacheLibAllocator = facebook::cachelib::LruAllocator; // or Lru2QAllocator, or TinyLFUAllocator + struct PMemSecondaryCacheOptions { static const char* kName() { return "PMemSecondaryCacheOptions"; } - bool is_kmem_dax = false; std::string path; size_t capacity = 32L * 1024 * 1024 * 1024; - double ratio = 0.85; + + unsigned int bucket_power = 25; + + unsigned int locks_power = 10; }; @@ -23,7 +33,7 @@ class PMemSecondaryCache : public SecondaryCache { public: explicit PMemSecondaryCache(const PMemSecondaryCacheOptions& opt); ~PMemSecondaryCache() override { - cache_.reset(); + //cache_.reset(); } Status PrepareOptions(const ConfigOptions& config_options) override; @@ -38,7 +48,11 @@ class PMemSecondaryCache : public SecondaryCache { const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) override; - void Erase(const Slice& key) override { cache_->Erase(key); } + void Erase(const Slice& key) override { + //cache_->Erase(key); + CacheLibAllocator::Key cacheKey(key.data(), key.size()); + cache_lib_->remove(cacheKey); + } void WaitAll(std::vector handles) override { for (SecondaryCacheResultHandle* handle : handles) { @@ -51,9 +65,14 @@ class PMemSecondaryCache : public SecondaryCache { private: - std::shared_ptr cache_; + void initialize_cache(const PMemSecondaryCacheOptions& option); + + //std::shared_ptr cache_; PMemSecondaryCacheOptions opt_; - std::shared_ptr allocator_; + //std::shared_ptr allocator_; + + std::unique_ptr cache_lib_; + facebook::cachelib::PoolId default_pool_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/plugin/pmem_scache/scache/pmem_scache_util.h b/plugin/pmem_scache/scache/pmem_scache_util.h index 175d2b4..b66a76d 100644 --- a/plugin/pmem_scache/scache/pmem_scache_util.h +++ b/plugin/pmem_scache/scache/pmem_scache_util.h @@ -77,7 +77,7 @@ class PMemSCacheResultHandle : public SecondaryCacheResultHandle { size_(size), is_ready_(true) {} - ~PMemSCacheResultHandle() override { cache_->Release(handle_); } + ~PMemSCacheResultHandle() override { } bool IsReady() override { return is_ready_; }