@@ -99,6 +99,7 @@ class PrimeEvictionPolicy {
99
99
DVLOG (2 ) << " split: " << segment->SlowSize () << " /" << segment->capacity ();
100
100
}
101
101
void OnMove (PrimeTable::Cursor source, PrimeTable::Cursor dest) {
102
+ moved_items_.push_back (std::make_pair (source, dest));
102
103
}
103
104
104
105
bool CanGrow (const PrimeTable& tbl) const ;
@@ -113,8 +114,12 @@ class PrimeEvictionPolicy {
113
114
unsigned checked () const {
114
115
return checked_;
115
116
}
117
+ const DbSlice::MovedItemsVec& moved_items () {
118
+ return moved_items_;
119
+ }
116
120
117
121
private:
122
+ DbSlice::MovedItemsVec moved_items_;
118
123
DbSlice* db_slice_;
119
124
ssize_t mem_offset_;
120
125
ssize_t soft_limit_ = 0 ;
@@ -364,7 +369,15 @@ class DbSlice::PrimeBumpPolicy {
364
369
return !obj.IsSticky ();
365
370
}
366
371
void OnMove (PrimeTable::Cursor source, PrimeTable::Cursor dest) {
372
+ moved_items_.push_back (std::make_pair (source, dest));
373
+ }
374
+
375
+ const DbSlice::MovedItemsVec& moved_items () {
376
+ return moved_items_;
367
377
}
378
+
379
+ private:
380
+ DbSlice::MovedItemsVec moved_items_;
368
381
};
369
382
370
383
DbSlice::DbSlice (uint32_t index, bool cache_mode, EngineShard* owner)
@@ -708,6 +721,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
708
721
events_.insertion_rejections ++;
709
722
return OpStatus::OUT_OF_MEMORY;
710
723
}
724
+ CallMovedCallbacks (cntx.db_index , evp.moved_items ());
711
725
712
726
events_.mutations ++;
713
727
ssize_t table_increase = db.prime .mem_usage () - table_before;
@@ -1252,6 +1266,12 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
1252
1266
return change_cb_.emplace_back (NextVersion (), std::move (cb)).first ;
1253
1267
}
1254
1268
1269
+ uint64_t DbSlice::RegisterOnMove (MovedCallback cb) {
1270
+ ++next_moved_id_;
1271
+ moved_cb_.emplace_back (next_moved_id_, cb);
1272
+ return next_moved_id_;
1273
+ }
1274
+
1255
1275
void DbSlice::FlushChangeToEarlierCallbacks (DbIndex db_ind, Iterator it, uint64_t upper_bound) {
1256
1276
unique_lock<LocalLatch> lk (serialization_latch_);
1257
1277
@@ -1284,6 +1304,14 @@ void DbSlice::UnregisterOnChange(uint64_t id) {
1284
1304
change_cb_.erase (it);
1285
1305
}
1286
1306
1307
+ void DbSlice::UnregisterOnMoved (uint64_t id) {
1308
+ serialization_latch_.Wait ();
1309
+ auto it =
1310
+ find_if (moved_cb_.begin (), moved_cb_.end (), [id](const auto & cb) { return cb.first == id; });
1311
+ CHECK (it != moved_cb_.end ());
1312
+ moved_cb_.erase (it);
1313
+ }
1314
+
1287
1315
auto DbSlice::DeleteExpiredStep (const Context& cntx, unsigned count) -> DeleteExpiredStats {
1288
1316
auto & db = *db_arr_[cntx.db_index ];
1289
1317
DeleteExpiredStats result;
@@ -1754,6 +1782,7 @@ void DbSlice::OnCbFinishBlocking() {
1754
1782
if (bump_it != it) { // the item was bumped
1755
1783
++events_.bumpups ;
1756
1784
}
1785
+ CallMovedCallbacks (db_index, policy.moved_items ());
1757
1786
}
1758
1787
}
1759
1788
@@ -1777,4 +1806,21 @@ void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
1777
1806
}
1778
1807
}
1779
1808
1809
+ void DbSlice::CallMovedCallbacks (
1810
+ DbIndex id, const std::vector<std::pair<PrimeTable::Cursor, PrimeTable::Cursor>>& moved_items) {
1811
+ if (moved_cb_.empty ())
1812
+ return ;
1813
+
1814
+ // does not preempt, just increments the counter.
1815
+ unique_lock<LocalLatch> lk (serialization_latch_);
1816
+
1817
+ const size_t limit = moved_cb_.size ();
1818
+ auto ccb = moved_cb_.begin ();
1819
+ for (size_t i = 0 ; i < limit; ++i) {
1820
+ CHECK (ccb->second );
1821
+ ccb->second (id, moved_items);
1822
+ ++ccb;
1823
+ }
1824
+ }
1825
+
1780
1826
} // namespace dfly
0 commit comments