Skip to content

Commit bbc809a

Browse files
committed
feat(server): integrate string_map into rdb code to support save/load
Signed-off-by: Roman Gershman <[email protected]>
1 parent 69b00b9 commit bbc809a

File tree

9 files changed

+92
-125
lines changed

9 files changed

+92
-125
lines changed

src/core/compact_object.cc

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -548,11 +548,7 @@ void CompactObj::ImportRObj(robj* o) {
548548
enc = GetFlag(FLAGS_use_set2) ? kEncodingStrMap2 : kEncodingStrMap;
549549
}
550550
} else if (o->type == OBJ_HASH) {
551-
if (o->encoding == OBJ_ENCODING_HT) {
552-
enc = kEncodingStrMap2;
553-
} else {
554-
enc = kEncodingListPack;
555-
}
551+
LOG(FATAL) << "Should not reach";
556552
}
557553
u_.r_obj.Init(type, enc, o->ptr);
558554
if (o->refcount == 1)
@@ -564,15 +560,14 @@ robj* CompactObj::AsRObj() const {
564560
CHECK_EQ(ROBJ_TAG, taglen_);
565561

566562
robj* res = &tl.tmp_robj;
563+
unsigned enc = u_.r_obj.encoding();
567564
res->type = u_.r_obj.type();
568565

569-
if (res->type == OBJ_SET) {
566+
if (res->type == OBJ_SET || res->type == OBJ_HASH) {
570567
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
571568
}
572569

573-
if (res->type == OBJ_HASH) {
574-
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
575-
}
570+
res->encoding = enc;
576571
res->lru = 0; // u_.r_obj.unneeded;
577572
res->ptr = u_.r_obj.inner_obj();
578573

src/core/compact_object_test.cc

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -229,23 +229,6 @@ TEST_F(CompactObjectTest, IntSet) {
229229
EXPECT_GT(cobj_.MallocUsed(), 0);
230230
}
231231

232-
TEST_F(CompactObjectTest, HSet) {
233-
robj* src = createHashObject();
234-
cobj_.ImportRObj(src);
235-
236-
EXPECT_EQ(OBJ_HASH, cobj_.ObjType());
237-
EXPECT_EQ(kEncodingListPack, cobj_.Encoding());
238-
239-
robj* os = cobj_.AsRObj();
240-
241-
sds key1 = sdsnew("key1");
242-
sds val1 = sdsnew("val1");
243-
244-
// returns 0 on insert.
245-
EXPECT_EQ(0, hashTypeSet(os, key1, val1, HASH_SET_TAKE_FIELD | HASH_SET_TAKE_VALUE));
246-
cobj_.SyncRObj();
247-
}
248-
249232
TEST_F(CompactObjectTest, ZSet) {
250233
// unrelated, checking that sds static encoding works.
251234
// it is used in zset special strings.

src/core/dense_set.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,7 @@ class DenseSet {
249249
// Returns null if obj was added.
250250
void* AddOrFind(void* obj, bool has_ttl);
251251

252-
void* FindInternal(const void* obj, uint32_t cookie) const {
253-
DensePtr* ptr = const_cast<DenseSet*>(this)->Find(obj, BucketId(obj, cookie), cookie).second;
254-
return ptr ? ptr->GetObject() : nullptr;
255-
}
256-
252+
void* FindInternal(const void* obj, uint32_t cookie) const;
257253
void* PopInternal();
258254

259255
// Note this does not free any dynamic allocations done by derived classes, that a DensePtr
@@ -331,4 +327,12 @@ class DenseSet {
331327
uint32_t time_now_ = 0;
332328
};
333329

330+
inline void* DenseSet::FindInternal(const void* obj, uint32_t cookie) const {
331+
if (entries_.empty())
332+
return nullptr;
333+
334+
DensePtr* ptr = const_cast<DenseSet*>(this)->Find(obj, BucketId(obj, cookie), cookie).second;
335+
return ptr ? ptr->GetObject() : nullptr;
336+
}
337+
334338
} // namespace dfly

src/core/string_map_test.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,8 @@ TEST_F(StringMapTest, Basic) {
9292
EXPECT_STREQ("baraaaaaaaaaaaa2", it->second);
9393
}
9494

95+
TEST_F(StringMapTest, EmptyFind) {
96+
sm_->Find("bar");
97+
}
98+
9599
} // namespace dfly

src/server/hset_family.cc

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -117,31 +117,6 @@ pair<uint8_t*, bool> LpInsert(uint8_t* lp, string_view field, string_view val, b
117117
return make_pair(lp, !updated);
118118
}
119119

120-
StringMap* ConvertToStrMap(uint8_t* lp) {
121-
StringMap* sm = new StringMap(CompactObj::memory_resource());
122-
size_t lplen = lpLength(lp);
123-
if (lplen == 0)
124-
return sm;
125-
126-
sm->Reserve(lplen / 2);
127-
128-
uint8_t* lp_elem = lpFirst(lp);
129-
uint8_t intbuf[LP_INTBUF_SIZE];
130-
131-
DCHECK(lp_elem); // empty containers are not allowed.
132-
133-
do {
134-
string_view key = LpGetView(lp_elem, intbuf);
135-
lp_elem = lpNext(lp, lp_elem); // switch to value
136-
DCHECK(lp_elem);
137-
string_view value = LpGetView(lp_elem, intbuf);
138-
lp_elem = lpNext(lp, lp_elem); // switch to next key
139-
CHECK(sm->AddOrUpdate(key, value)); // must be unique
140-
} while (lp_elem);
141-
142-
return sm;
143-
}
144-
145120
size_t HMapLength(const CompactObj& co) {
146121
void* ptr = co.RObjPtr();
147122
if (co.Encoding() == kEncodingStrMap2) {
@@ -217,7 +192,7 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
217192

218193
if (lpb >= kMaxListPackLen) {
219194
stats->listpack_blob_cnt--;
220-
StringMap* sm = ConvertToStrMap(lp);
195+
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
221196
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
222197
}
223198
}
@@ -651,7 +626,7 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
651626

652627
if (!IsGoodForListpack(values, lp)) {
653628
stats->listpack_blob_cnt--;
654-
StringMap* sm = ConvertToStrMap(lp);
629+
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
655630
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
656631
lp = nullptr;
657632
}
@@ -1052,4 +1027,28 @@ uint32_t HSetFamily::MaxListPackLen() {
10521027
return kMaxListPackLen;
10531028
}
10541029

1030+
StringMap* HSetFamily::ConvertToStrMap(uint8_t* lp) {
1031+
StringMap* sm = new StringMap(CompactObj::memory_resource());
1032+
size_t lplen = lpLength(lp);
1033+
if (lplen == 0)
1034+
return sm;
1035+
1036+
sm->Reserve(lplen / 2);
1037+
1038+
uint8_t* lp_elem = lpFirst(lp);
1039+
uint8_t intbuf[LP_INTBUF_SIZE];
1040+
1041+
DCHECK(lp_elem); // empty containers are not allowed.
1042+
1043+
do {
1044+
string_view key = LpGetView(lp_elem, intbuf);
1045+
lp_elem = lpNext(lp, lp_elem); // switch to value
1046+
DCHECK(lp_elem);
1047+
string_view value = LpGetView(lp_elem, intbuf);
1048+
lp_elem = lpNext(lp, lp_elem); // switch to next key
1049+
CHECK(sm->AddOrUpdate(key, value)); // must be unique
1050+
} while (lp_elem);
1051+
1052+
return sm;
1053+
}
10551054
} // namespace dfly

src/server/hset_family.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ namespace dfly {
1313

1414
class ConnectionContext;
1515
class CommandRegistry;
16+
class StringMap;
17+
1618
using facade::OpResult;
1719
using facade::OpStatus;
1820

@@ -21,8 +23,10 @@ class HSetFamily {
2123
static void Register(CommandRegistry* registry);
2224
static uint32_t MaxListPackLen();
2325

24-
private:
26+
// Does not free lp.
27+
static StringMap* ConvertToStrMap(uint8_t* lp);
2528

29+
private:
2630
static void HDel(CmdArgList args, ConnectionContext* cntx);
2731
static void HLen(CmdArgList args, ConnectionContext* cntx);
2832
static void HExists(CmdArgList args, ConnectionContext* cntx);

src/server/rdb_load.cc

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ extern "C" {
2424
#include "base/endian.h"
2525
#include "base/flags.h"
2626
#include "base/logging.h"
27+
#include "core/string_map.h"
2728
#include "core/string_set.h"
2829
#include "server/engine_shard_set.h"
2930
#include "server/error.h"
@@ -534,8 +535,6 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
534535
}
535536
}
536537

537-
robj* res = nullptr;
538-
539538
if (keep_lp) {
540539
uint8_t* lp = lpNew(lp_size);
541540

@@ -554,44 +553,30 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
554553
}
555554

556555
lp = lpShrinkToFit(lp);
557-
res = createObject(OBJ_HASH, lp);
558-
res->encoding = OBJ_ENCODING_LISTPACK;
556+
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
559557
} else {
560-
dict* hmap = dictCreate(&hashDictType);
561-
562-
auto cleanup = absl::MakeCleanup([&] { dictRelease(hmap); });
563-
564-
if (len > DICT_HT_INITIAL_SIZE) {
565-
if (dictTryExpand(hmap, len) != DICT_OK) {
566-
LOG(ERROR) << "OOM in dictTryExpand " << len;
567-
ec_ = RdbError(errc::out_of_memory);
568-
return;
569-
}
570-
}
558+
StringMap* string_map = new StringMap;
571559

560+
auto cleanup = absl::MakeCleanup([&] { delete string_map; });
561+
string_map->Reserve(len);
572562
for (size_t i = 0; i < len; ++i) {
573-
sds key = ToSds(ltrace->arr[i * 2].rdb_var);
574-
sds val = ToSds(ltrace->arr[i * 2 + 1].rdb_var);
563+
// ToSV may reference an internal buffer, therefore we can use only before the
564+
// next call to ToSV. To workaround, I copy the key to string.
565+
string key(ToSV(ltrace->arr[i * 2].rdb_var));
566+
string_view val = ToSV(ltrace->arr[i * 2 + 1].rdb_var);
575567

576-
if (!key || !val)
568+
if (ec_)
577569
return;
578570

579-
/* Add pair to hash table */
580-
int ret = dictAdd(hmap, key, val);
581-
if (ret == DICT_ERR) {
571+
if (!string_map->AddOrSkip(key, val)) {
582572
LOG(ERROR) << "Duplicate hash fields detected";
583573
ec_ = RdbError(errc::rdb_file_corrupted);
584574
return;
585575
}
586576
}
587-
588-
res = createObject(OBJ_HASH, hmap);
589-
res->encoding = OBJ_ENCODING_HT;
577+
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
590578
std::move(cleanup).Cancel();
591579
}
592-
593-
DCHECK(res);
594-
pv_->ImportRObj(res);
595580
}
596581

597582
void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
@@ -871,13 +856,15 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
871856
return;
872857
}
873858

874-
res = createObject(OBJ_HASH, lp);
875-
res->encoding = OBJ_ENCODING_LISTPACK;
876-
877-
if (lpBytes(lp) > HSetFamily::MaxListPackLen())
878-
hashTypeConvert(res, OBJ_ENCODING_HT);
879-
else
880-
res->ptr = lpShrinkToFit((uint8_t*)res->ptr);
859+
if (lpBytes(lp) > HSetFamily::MaxListPackLen()) {
860+
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
861+
lpFree(lp);
862+
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
863+
} else {
864+
lp = lpShrinkToFit(lp);
865+
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
866+
}
867+
return;
881868
} else if (rdb_type_ == RDB_TYPE_ZSET_ZIPLIST) {
882869
unsigned char* lp = lpNew(blob.size());
883870
if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)blob.data(), blob.size(), &lp)) {

0 commit comments

Comments
 (0)