Skip to content
2 changes: 1 addition & 1 deletion src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ add_library(dfly_core allocation_tracker.cc bloom.cc compact_object.cc dense_set
dragonfly_core.cc extent_tree.cc huff_coder.cc
interpreter.cc glob_matcher.cc mi_memory_resource.cc qlist.cc sds_utils.cc
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc
tx_queue.cc string_set.cc string_map.cc top_keys.cc detail/bitpacking.cc)
tx_queue.cc string_set.cc string_map.cc top_keys.cc detail/bitpacking.cc prob/cuckoo_filter.cc)

cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules
fibers2 ${SEARCH_LIB} jsonpath OpenSSL::Crypto TRDP::dconv TRDP::lz4)
Expand Down
92 changes: 91 additions & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern "C" {
#include "redis/quicklist.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/tdigest.h"
#include "redis/util.h"
#include "redis/zmalloc.h" // for non-string objects.
#include "redis/zset.h"
Expand All @@ -26,6 +27,7 @@ extern "C" {
#include "core/bloom.h"
#include "core/detail/bitpacking.h"
#include "core/huff_coder.h"
#include "core/prob/cuckoo_filter.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
Expand Down Expand Up @@ -398,6 +400,12 @@ static_assert(sizeof(CompactObj) == 18);

namespace detail {

size_t MallocUsedTDigest(const td_histogram_t* tdigest) {
size_t size = sizeof(tdigest);
size += (2 * (tdigest->cap * sizeof(double)));
return size;
}

size_t RobjWrapper::MallocUsed(bool slow) const {
if (!inner_obj_)
return 0;
Expand All @@ -418,6 +426,8 @@ size_t RobjWrapper::MallocUsed(bool slow) const {
return MallocUsedZSet(encoding_, inner_obj_);
case OBJ_STREAM:
return slow ? MallocUsedStream((stream*)inner_obj_) : sz_;
case OBJ_TDIGEST:
return MallocUsedTDigest((td_histogram_t*)inner_obj_);

default:
LOG(FATAL) << "Not supported " << type_;
Expand Down Expand Up @@ -477,11 +487,17 @@ size_t RobjWrapper::Size() const {
case OBJ_STREAM:
// Size mean malloc bytes for streams
return sz_;
case OBJ_TDIGEST:
return 0;
default:;
}
return 0;
}

inline void FreeObjTDigest(void* ptr) {
td_free((td_histogram*)ptr);
}

void RobjWrapper::Free(MemoryResource* mr) {
if (!inner_obj_)
return;
Expand Down Expand Up @@ -511,6 +527,9 @@ void RobjWrapper::Free(MemoryResource* mr) {
case OBJ_STREAM:
FreeObjStream(inner_obj_);
break;
case OBJ_TDIGEST:
FreeObjTDigest(inner_obj_);
break;
default:
LOG(FATAL) << "Unknown object type";
break;
Expand Down Expand Up @@ -603,6 +622,9 @@ bool RobjWrapper::DefragIfNeeded(float ratio) {
return do_defrag(DefragSet);
} else if (type() == OBJ_ZSET) {
return do_defrag(DefragZSet);
} else if (type() == OBJ_TDIGEST) {
// TODO implement this
return false;
}
return false;
}
Expand Down Expand Up @@ -826,6 +848,14 @@ size_t CompactObj::Size() const {
DCHECK_EQ(mask_bits_.encoding, NONE_ENC);
raw_size = u_.sbf->current_size();
break;
case TOPK_TAG:
DCHECK_EQ(mask_bits_.encoding, NONE_ENC);
raw_size = 0;
break;
case CUCKOO_FILTER_TAG:
DCHECK_EQ(mask_bits_.encoding, NONE_ENC);
raw_size = GetCuckooFilter()->NumItems();
break;
default:
LOG(DFATAL) << "Should not reach " << int(taglen_);
}
Expand Down Expand Up @@ -892,6 +922,14 @@ CompactObjType CompactObj::ObjType() const {
return OBJ_SBF;
}

if (taglen_ == TOPK_TAG) {
return OBJ_TOPK;
}

if (taglen_ == CUCKOO_FILTER_TAG) {
return OBJ_CUCKOO_FILTER;
}

LOG(FATAL) << "TBD " << int(taglen_);
return kInvalidCompactObjType;
}
Expand Down Expand Up @@ -995,11 +1033,49 @@ void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_f
}
}

void CompactObj::SetTopK(size_t topk, size_t width, size_t depth, double decay) {
TopKeys::Options opts;
size_t total_buckets = 4;
// Heuristic
if (topk > 4) {
total_buckets = topk / 4;
}
opts.buckets = total_buckets;
opts.depth = 4;
// fingerprints = buckets * depth = topk
opts.decay_base = decay;
// We need this so we can set the key. The problem with this is upon cell reset,
// we don't set the key and a query for TopK won't return that key because we never set it.
opts.min_key_count_to_record = 0;
SetMeta(TOPK_TAG);
u_.topk = AllocateMR<TopKeys>(opts);
}

void CompactObj::SetCuckooFilter(prob::CuckooFilter filter) {
SetMeta(CUCKOO_FILTER_TAG);
u_.cuckoo_filter = AllocateMR<prob::CuckooFilter>(std::move(filter));
}

prob::CuckooFilter* CompactObj::GetCuckooFilter() {
DCHECK(taglen_ == CUCKOO_FILTER_TAG);
return u_.cuckoo_filter;
}

const prob::CuckooFilter* CompactObj::GetCuckooFilter() const {
DCHECK(taglen_ == CUCKOO_FILTER_TAG);
return u_.cuckoo_filter;
}

SBF* CompactObj::GetSBF() const {
DCHECK_EQ(SBF_TAG, taglen_);
return u_.sbf;
}

TopKeys* CompactObj::GetTopK() const {
DCHECK_EQ(TOPK_TAG, taglen_);
return u_.topk;
}

void CompactObj::SetString(std::string_view str) {
CHECK(!IsExternal());
mask_bits_.encoding = NONE_ENC;
Expand Down Expand Up @@ -1090,6 +1166,9 @@ bool CompactObj::DefragIfNeeded(float ratio) {
return false;
case EXTERNAL_TAG:
return false;
case CUCKOO_FILTER_TAG:
// TODO: implement this
return false;
default:
// This is the case when the object is at inline_str
return false;
Expand All @@ -1101,7 +1180,8 @@ bool CompactObj::HasAllocated() const {
(taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr))
return false;

DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG || taglen_ == SBF_TAG);
DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG || taglen_ == SBF_TAG ||
taglen_ == TOPK_TAG || taglen_ == CUCKOO_FILTER_TAG);
return true;
}

Expand Down Expand Up @@ -1295,6 +1375,10 @@ void CompactObj::Free() {
}
} else if (taglen_ == SBF_TAG) {
DeleteMR<SBF>(u_.sbf);
} else if (taglen_ == TOPK_TAG) {
DeleteMR<TopKeys>(u_.topk);
} else if (taglen_ == CUCKOO_FILTER_TAG) {
DeleteMR<prob::CuckooFilter>(u_.cuckoo_filter);
} else {
LOG(FATAL) << "Unsupported tag " << int(taglen_);
}
Expand Down Expand Up @@ -1327,6 +1411,12 @@ size_t CompactObj::MallocUsed(bool slow) const {
if (taglen_ == SBF_TAG) {
return u_.sbf->MallocUsed();
}
if (taglen_ == TOPK_TAG) {
return 0;
}
if (taglen_ == CUCKOO_FILTER_TAG) {
return GetCuckooFilter()->UsedBytes();
}
LOG(DFATAL) << "should not reach";
return 0;
}
Expand Down
17 changes: 17 additions & 0 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "core/mi_memory_resource.h"
#include "core/small_string.h"
#include "core/string_or_view.h"
#include "core/top_keys.h"

namespace dfly {

Expand All @@ -27,6 +28,11 @@ constexpr unsigned kEncodingJsonFlat = 1;

class SBF;

namespace prob {
class CuckooFilter;
class CuckooReserveParams;
} // namespace prob

namespace detail {

// redis objects or blobs of upto 4GB size.
Expand Down Expand Up @@ -123,6 +129,8 @@ class CompactObj {
EXTERNAL_TAG = 20,
JSON_TAG = 21,
SBF_TAG = 22,
TOPK_TAG = 23,
CUCKOO_FILTER_TAG = 24,
};

// String encoding types.
Expand Down Expand Up @@ -311,6 +319,13 @@ class CompactObj {
void SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor);
SBF* GetSBF() const;

void SetTopK(size_t topk, size_t width, size_t depth, double decay);
TopKeys* GetTopK() const;

void SetCuckooFilter(prob::CuckooFilter filter);
prob::CuckooFilter* GetCuckooFilter();
const prob::CuckooFilter* GetCuckooFilter() const;

// dest must have at least Size() bytes available
void GetString(char* dest) const;

Expand Down Expand Up @@ -479,6 +494,8 @@ class CompactObj {
// using 'packed' to reduce alignement of U to 1.
JsonWrapper json_obj __attribute__((packed));
SBF* sbf __attribute__((packed));
TopKeys* topk __attribute__((packed));
prob::CuckooFilter* cuckoo_filter __attribute__((packed));
int64_t ival __attribute__((packed));
ExternalPtr ext_ptr;

Expand Down
20 changes: 20 additions & 0 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ extern "C" {
#include "redis/intset.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/td_malloc.h"
#include "redis/tdigest.h"
#include "redis/zmalloc.h"
}

Expand Down Expand Up @@ -682,6 +684,24 @@ TEST_F(CompactObjectTest, HuffMan) {
}
}

TEST_F(CompactObjectTest, TDigst) {
// Allocators
ASSERT_EQ(zmalloc, __td_malloc);
ASSERT_EQ(zcalloc, __td_calloc);
ASSERT_EQ(zrealloc, __td_realloc);
ASSERT_EQ(zfree, __td_free);

// Basic usage
td_histogram_t* hist = td_new(10);
cobj_.InitRobj(OBJ_TDIGEST, 0, hist);
ASSERT_EQ(cobj_.GetRobjWrapper()->type(), OBJ_TDIGEST);
ASSERT_EQ(cobj_.RObjPtr(), hist);
ASSERT_EQ(0, hist->unmerged_weight);
ASSERT_EQ(0, hist->merged_weight);
ASSERT_EQ(td_add(hist, 0.0, 1), 0);
cobj_.Reset();
}

static void ascii_pack_naive(const char* ascii, size_t len, uint8_t* bin) {
const char* end = ascii + len;

Expand Down
Loading
Loading