Skip to content

Commit 060add0

Browse files
adiholdenromange
andauthored
feat (server): add on move hook to dash insert new and bumpup (#5278)
* server: add on move hook to dash insert new and bumpup --------- Signed-off-by: adi_holden <[email protected]> Signed-off-by: Roman Gershman <[email protected]> Co-authored-by: Roman Gershman <[email protected]>
1 parent 87e88b3 commit 060add0

File tree

4 files changed

+142
-60
lines changed

4 files changed

+142
-60
lines changed

src/core/dash.h

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ class DashTable : public detail::DashTableBase {
8585
return true;
8686
}
8787

88+
void OnMove(Cursor source, Cursor dest) {
89+
}
90+
8891
void RecordSplit(SegmentType* segment) {
8992
}
9093
/*
@@ -286,9 +289,13 @@ class DashTable : public detail::DashTableBase {
286289
// Returns true if an element was deleted i.e the rightmost slot was busy.
287290
bool ShiftRight(bucket_iterator it);
288291

289-
template <typename BumpPolicy> iterator BumpUp(iterator it, const BumpPolicy& bp) {
290-
SegmentIterator seg_it =
291-
segment_[it.seg_id_]->BumpUp(it.bucket_id_, it.slot_id_, DoHash(it->first), bp);
292+
template <typename BumpPolicy> iterator BumpUp(iterator it, BumpPolicy& bp) {
293+
SegmentIterator seg_it = segment_[it.seg_id_]->BumpUp(
294+
it.bucket_id_, it.slot_id_, DoHash(it->first), bp,
295+
[&](uint32_t segment_id, detail::PhysicalBid from, detail::PhysicalBid to) {
296+
// OnMove is used to notify policy about the items moves across buckets.
297+
bp.OnMove(Cursor{global_depth_, segment_id, from}, Cursor{global_depth_, segment_id, to});
298+
});
292299

293300
return iterator{this, it.seg_id_, seg_it.index, seg_it.slot};
294301
}
@@ -314,7 +321,7 @@ class DashTable : public detail::DashTableBase {
314321
InsertMode mode);
315322

316323
void IncreaseDepth(unsigned new_depth);
317-
void Split(uint32_t seg_id);
324+
template <typename EvictionPolicy> void Split(uint32_t seg_id, EvictionPolicy& ev);
318325

319326
// Segment directory contains multiple segment pointers, some of them pointing to
320327
// the same object. IterateDistinct goes over all distinct segments in the table.
@@ -786,12 +793,19 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
786793
typename SegmentType::Iterator it;
787794
bool res = true;
788795
unsigned num_buckets = target->num_buckets();
796+
797+
auto move_cb = [&](uint32_t segment_id, detail::PhysicalBid from, detail::PhysicalBid to) {
798+
// OnMove is used to notify policy about the move of items across buckets.
799+
ev.OnMove(Cursor{global_depth_, segment_id, from}, Cursor{global_depth_, segment_id, to});
800+
};
801+
789802
if (mode == InsertMode::kForceInsert) {
790-
it = target->InsertUniq(std::forward<U>(key), std::forward<V>(value), key_hash, true);
803+
it =
804+
target->InsertUniq(std::forward<U>(key), std::forward<V>(value), key_hash, true, move_cb);
791805
res = it.found();
792806
} else {
793-
std::tie(it, res) =
794-
target->Insert(std::forward<U>(key), std::forward<V>(value), key_hash, EqPred(key));
807+
std::tie(it, res) = target->Insert(std::forward<U>(key), std::forward<V>(value), key_hash,
808+
EqPred(key), move_cb);
795809
}
796810

797811
if (res) { // success
@@ -846,7 +860,7 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
846860
}
847861

848862
auto hash_fn = [this](const auto& k) { return policy_.HashFn(k); };
849-
unsigned moved = target->UnloadStash(hash_fn);
863+
unsigned moved = target->UnloadStash(hash_fn, move_cb);
850864
if (moved > 0) {
851865
stash_unloaded_ += moved;
852866
continue;
@@ -876,7 +890,7 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
876890
}
877891

878892
ev.RecordSplit(target);
879-
Split(target_seg_id);
893+
Split(target_seg_id, ev);
880894
}
881895

882896
return std::make_pair(iterator{}, false);
@@ -893,12 +907,14 @@ void DashTable<_Key, _Value, Policy>::IncreaseDepth(unsigned new_depth) {
893907
for (int i = prev_sz - 1; i >= 0; --i) {
894908
size_t offs = i * repl_cnt;
895909
std::fill(segment_.begin() + offs, segment_.begin() + offs + repl_cnt, segment_[i]);
910+
segment_[i]->set_segment_id(offs); // update segment id.
896911
}
897912
global_depth_ = new_depth;
898913
}
899914

900915
template <typename _Key, typename _Value, typename Policy>
901-
void DashTable<_Key, _Value, Policy>::Split(uint32_t seg_id) {
916+
template <typename EvictionPolicy>
917+
void DashTable<_Key, _Value, Policy>::Split(uint32_t seg_id, EvictionPolicy& ev) {
902918
SegmentType* source = segment_[seg_id];
903919

904920
uint32_t chunk_size = 1u << (global_depth_ - source->local_depth());
@@ -911,7 +927,15 @@ void DashTable<_Key, _Value, Policy>::Split(uint32_t seg_id) {
911927

912928
// remove current segment bucket count.
913929
bucket_count_ -= (source->num_buckets() + target->num_buckets());
914-
source->Split(std::move(hash_fn), target); // increases the depth.
930+
931+
source->Split(
932+
std::move(hash_fn), target,
933+
[&](uint32_t segment_from, detail::PhysicalBid from, uint32_t segment_to,
934+
detail::PhysicalBid to) {
935+
// OnMove is used to notify eviction policy about the moves across buckets/segments
936+
// during the split.
937+
ev.OnMove(Cursor{global_depth_, segment_from, from}, Cursor{global_depth_, segment_to, to});
938+
});
915939

916940
// add back the updated bucket count.
917941
bucket_count_ += (target->num_buckets() + source->num_buckets());

src/core/dash_internal.h

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,12 @@ class Segment {
413413

414414
// Returns (iterator, true) if insert succeeds,
415415
// (iterator, false) for duplicate and (invalid-iterator, false) if it's full
416-
template <typename K, typename V, typename Pred>
417-
std::pair<Iterator, bool> Insert(K&& key, V&& value, Hash_t key_hash, Pred&& pred);
416+
template <typename K, typename V, typename Pred, typename OnMoveCb>
417+
std::pair<Iterator, bool> Insert(K&& key, V&& value, Hash_t key_hash, Pred&& pred,
418+
OnMoveCb&& on_move_cb);
418419

419-
template <typename HashFn> void Split(HashFn&& hfunc, Segment* dest);
420+
template <typename HashFn, typename OnMoveCb>
421+
void Split(HashFn&& hfunc, Segment* dest, OnMoveCb&& on_move_cb);
420422

421423
void Delete(const Iterator& it, Hash_t key_hash);
422424

@@ -523,8 +525,8 @@ class Segment {
523525
// otherwise chooses home bucket first.
524526
// TODO: I am actually not sure if spread optimization is helpful. Worth checking
525527
// whether we get higher occupancy rates when using it.
526-
template <typename U, typename V>
527-
Iterator InsertUniq(U&& key, V&& value, Hash_t key_hash, bool spread);
528+
template <typename U, typename V, typename OnMoveCb>
529+
Iterator InsertUniq(U&& key, V&& value, Hash_t key_hash, bool spread, OnMoveCb&& on_move_cb);
528530

529531
// capture version change in case of insert.
530532
// Returns ids of buckets whose version would cross ver_threshold upon insertion of key_hash
@@ -558,19 +560,25 @@ class Segment {
558560
}
559561

560562
// Bumps up this entry making it more "important" for the eviction policy.
561-
template <typename BumpPolicy>
562-
Iterator BumpUp(PhysicalBid bid, SlotId slot, Hash_t key_hash, const BumpPolicy& ev);
563+
template <typename BumpPolicy, typename OnMoveCb>
564+
Iterator BumpUp(PhysicalBid bid, SlotId slot, Hash_t key_hash, const BumpPolicy& ev,
565+
OnMoveCb&& cb);
563566

564567
// Tries to move stash entries back to their normal buckets (exact or neighbour).
565568
// Returns number of entries that succeeded to unload.
566569
// Important! Affects versions of the moved items and the items in the destination
567570
// buckets.
568-
template <typename HFunc> unsigned UnloadStash(HFunc&& hfunc);
571+
template <typename HFunc, typename OnMoveCb> unsigned UnloadStash(HFunc&& hfunc, OnMoveCb&& cb);
569572

570573
unsigned num_buckets() const {
571574
return kBucketNum + kStashBucketNum;
572575
}
573576

577+
// needed only when DashTable grows its segment table.
578+
void set_segment_id(uint32_t new_id) {
579+
segment_id_ = new_id;
580+
}
581+
574582
private:
575583
static_assert(sizeof(Iterator) == 2);
576584

@@ -1084,15 +1092,16 @@ auto Segment<Key, Value, Policy>::TryMoveFromStash(unsigned stash_id, unsigned s
10841092
}
10851093

10861094
template <typename Key, typename Value, typename Policy>
1087-
template <typename U, typename V, typename Pred>
1088-
auto Segment<Key, Value, Policy>::Insert(U&& key, V&& value, Hash_t key_hash, Pred&& pred)
1089-
-> std::pair<Iterator, bool> {
1095+
template <typename U, typename V, typename Pred, typename OnMoveCb>
1096+
auto Segment<Key, Value, Policy>::Insert(U&& key, V&& value, Hash_t key_hash, Pred&& pred,
1097+
OnMoveCb&& on_move_cb) -> std::pair<Iterator, bool> {
10901098
Iterator it = FindIt(key_hash, pred);
10911099
if (it.found()) {
10921100
return std::make_pair(it, false); /* duplicate insert*/
10931101
}
10941102

1095-
it = InsertUniq(std::forward<U>(key), std::forward<V>(value), key_hash, true);
1103+
it = InsertUniq(std::forward<U>(key), std::forward<V>(value), key_hash, true,
1104+
std::forward<OnMoveCb>(on_move_cb));
10961105

10971106
return std::make_pair(it, it.found());
10981107
}
@@ -1210,8 +1219,8 @@ void Segment<Key, Value, Policy>::Delete(const Iterator& it, Hash_t key_hash) {
12101219
// Split items from the left segment to the right during the growth phase.
12111220
// right segment will have all the items with lsb at local_depth ==1 .
12121221
template <typename Key, typename Value, typename Policy>
1213-
template <typename HFunc>
1214-
void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right) {
1222+
template <typename HFunc, typename MoveCb>
1223+
void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right, MoveCb&& on_move_cb) {
12151224
++local_depth_;
12161225
dest_right->local_depth_ = local_depth_;
12171226

@@ -1243,8 +1252,11 @@ void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right) {
12431252

12441253
invalid_mask |= (1u << slot);
12451254

1255+
// We pass dummy callback because we are not interested to track movements in the newly
1256+
// created segment.
12461257
Iterator it = dest_right->InsertUniq(std::forward<Key_t>(bucket->key[slot]),
1247-
std::forward<Value_t>(bucket->value[slot]), hash, false);
1258+
std::forward<Value_t>(bucket->value[slot]), hash, false,
1259+
[](auto&&...) {});
12481260

12491261
// we move items residing in a regular bucket to a new segment.
12501262
// Note 1: in case we are somehow attacked with items that after the split
@@ -1266,6 +1278,7 @@ void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right) {
12661278
// selective bias will be able to hit our dashtable with items with the same bucket id.
12671279
assert(it.found());
12681280
update_version(*bucket, it.index);
1281+
on_move_cb(segment_id_, i, dest_right->segment_id_, it.index);
12691282
};
12701283

12711284
bucket_[i].ForEachSlot(std::move(cb));
@@ -1286,17 +1299,20 @@ void Segment<Key, Value, Policy>::Split(HFunc&& hfn, Segment* dest_right) {
12861299
Iterator it = TryMoveFromStash(i, slot, hash);
12871300
if (it.found()) {
12881301
invalid_mask |= (1u << slot);
1302+
on_move_cb(segment_id_, i, segment_id_, it.index);
12891303
}
12901304

12911305
return;
12921306
}
12931307

12941308
invalid_mask |= (1u << slot);
12951309
auto it = dest_right->InsertUniq(std::forward<Key_t>(bucket->key[slot]),
1296-
std::forward<Value_t>(bucket->value[slot]), hash, false);
1310+
std::forward<Value_t>(bucket->value[slot]), hash, false,
1311+
/* not interested in these movements */ [](auto&&...) {});
12971312
(void)it;
12981313
assert(it.index != kNanBid);
12991314
update_version(*bucket, it.index);
1315+
on_move_cb(segment_id_, i, dest_right->segment_id_, it.index);
13001316

13011317
// Remove stash reference pointing to stash bucket i.
13021318
RemoveStashReference(i, hash);
@@ -1348,9 +1364,9 @@ bool Segment<Key, Value, Policy>::CheckIfMovesToOther(bool own_items, unsigned f
13481364
}
13491365

13501366
template <typename Key, typename Value, typename Policy>
1351-
template <typename U, typename V>
1352-
auto Segment<Key, Value, Policy>::InsertUniq(U&& key, V&& value, Hash_t key_hash, bool spread)
1353-
-> Iterator {
1367+
template <typename U, typename V, typename OnMoveCb>
1368+
auto Segment<Key, Value, Policy>::InsertUniq(U&& key, V&& value, Hash_t key_hash, bool spread,
1369+
OnMoveCb&& on_move_cb) -> Iterator {
13541370
const uint8_t bid = HomeIndex(key_hash);
13551371
const uint8_t nid = NextBid(bid);
13561372

@@ -1385,13 +1401,15 @@ auto Segment<Key, Value, Policy>::InsertUniq(U&& key, V&& value, Hash_t key_hash
13851401
int displace_index = MoveToOther(true, nid, NextBid(nid));
13861402
if (displace_index >= 0) {
13871403
neighbor.Insert(displace_index, std::forward<U>(key), std::forward<V>(value), meta_hash, true);
1404+
on_move_cb(segment_id_, nid, NextBid(nid));
13881405
return Iterator{nid, uint8_t(displace_index)};
13891406
}
13901407

13911408
unsigned prev_idx = PrevBid(bid);
13921409
displace_index = MoveToOther(false, bid, prev_idx);
13931410
if (displace_index >= 0) {
13941411
target.Insert(displace_index, std::forward<U>(key), std::forward<V>(value), meta_hash, false);
1412+
on_move_cb(segment_id_, bid, prev_idx);
13951413
return Iterator{bid, uint8_t(displace_index)};
13961414
}
13971415

@@ -1597,9 +1615,9 @@ auto Segment<Key, Value, Policy>::FindValidStartingFrom(PhysicalBid bid, unsigne
15971615
}
15981616

15991617
template <typename Key, typename Value, typename Policy>
1600-
template <typename BumpPolicy>
1618+
template <typename BumpPolicy, typename OnMoveCb>
16011619
auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash,
1602-
const BumpPolicy& bp) -> Iterator {
1620+
const BumpPolicy& bp, OnMoveCb&& on_move_cb) -> Iterator {
16031621
auto& from = GetBucket(bid);
16041622

16051623
if (!bp.CanBump(from.key[slot])) {
@@ -1624,6 +1642,7 @@ auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha
16241642
if (Iterator it = TryMoveFromStash(stash_pos, slot, key_hash); it.found()) {
16251643
// TryMoveFromStash handles versions internally.
16261644
from.Delete(slot);
1645+
on_move_cb(segment_id_, bid, it.index);
16271646
return it;
16281647
}
16291648

@@ -1690,12 +1709,14 @@ auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha
16901709
swapb.SetStashPtr(stash_pos, swap_fp, bucket_ + next_bid);
16911710
}
16921711

1712+
on_move_cb(segment_id_, bid, swap_bid);
1713+
on_move_cb(segment_id_, swap_bid, bid);
16931714
return Iterator{swap_bid, kLastSlot};
16941715
}
16951716

16961717
template <typename Key, typename Value, typename Policy>
1697-
template <typename HFunc>
1698-
unsigned Segment<Key, Value, Policy>::UnloadStash(HFunc&& hfunc) {
1718+
template <typename HFunc, typename OnMoveCb>
1719+
unsigned Segment<Key, Value, Policy>::UnloadStash(HFunc&& hfunc, OnMoveCb&& on_move_cb) {
16991720
unsigned moved = 0;
17001721

17011722
for (unsigned i = 0; i < kStashBucketNum; ++i) {
@@ -1710,6 +1731,7 @@ unsigned Segment<Key, Value, Policy>::UnloadStash(HFunc&& hfunc) {
17101731
if (res.found()) {
17111732
++moved;
17121733
invalid_mask |= (1u << slot);
1734+
on_move_cb(segment_id_, i, res.index);
17131735
}
17141736
};
17151737

0 commit comments

Comments
 (0)