Skip to content

Commit 3471fc8

Browse files
authored
Clean up blocking (#5836)
1 parent 530c71f commit 3471fc8

File tree

6 files changed

+38
-69
lines changed

6 files changed

+38
-69
lines changed

src/server/blocking_controller.cc

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,10 @@ struct WatchItem {
3131

3232
struct BlockingController::WatchQueue {
3333
deque<WatchItem> items;
34-
TxId notify_txid = UINT64_MAX;
3534

3635
// Updated by both coordinator and shard threads but at different times.
3736
enum State { SUSPENDED, ACTIVE } state = SUSPENDED;
3837

39-
void Suspend() {
40-
state = SUSPENDED;
41-
notify_txid = UINT64_MAX;
42-
}
43-
4438
auto Find(Transaction* tx) const {
4539
return find_if(items.begin(), items.end(),
4640
[tx](const WatchItem& wi) { return wi.get() == tx; });
@@ -49,7 +43,8 @@ struct BlockingController::WatchQueue {
4943

5044
// Watch state per db.
5145
struct BlockingController::DbWatchTable {
52-
WatchQueueMap queue_map;
46+
// Watch queues per key
47+
absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>> queue_map;
5348

5449
// awakened keys point to blocked keys that can potentially be unblocked.
5550
absl::flat_hash_set<std::string> awakened_keys;
@@ -163,32 +158,16 @@ void BlockingController::NotifyPending() {
163158
continue;
164159

165160
context.db_index = index;
166-
DbWatchTable& wt = *dbit->second;
167-
for (const auto& key : wt.awakened_keys) {
168-
string_view sv_key = key;
169-
DVLOG(1) << "Processing awakened key " << sv_key;
170-
auto w_it = wt.queue_map.find(sv_key);
171-
if (w_it == wt.queue_map.end()) {
172-
// This should not happen because we remove keys from awakened_keys every type we remove
173-
// the entry from queue_map. TODO: to make it a CHECK after Dec 2024
174-
LOG(ERROR) << "Internal error: Key " << sv_key
175-
<< " was not found in the watch queue, wt.awakened_keys len is "
176-
<< wt.awakened_keys.size() << " wt.queue_map len is " << wt.queue_map.size();
177-
for (const auto& item : wt.awakened_keys) {
178-
LOG(ERROR) << "Awakened key: " << item;
179-
}
180-
181-
continue;
182-
}
183-
161+
DbWatchTable& wt = *dbit->second; // pointer stability due to node_hash_map
162+
for (string_view key : wt.awakened_keys) {
163+
DVLOG(1) << "Processing awakened key " << key;
164+
auto w_it = wt.queue_map.find(key);
184165
CHECK(w_it != wt.queue_map.end());
185-
DVLOG(1) << "Notify WQ: [" << owner_->shard_id() << "] " << key;
166+
186167
WatchQueue* wq = w_it->second.get();
187-
NotifyWatchQueue(sv_key, wq, context);
188-
if (wq->items.empty()) {
189-
// we erase awakened_keys right after this loop finishes running.
168+
NotifyWatchQueue(key, wq, context);
169+
if (wq->items.empty())
190170
wt.queue_map.erase(w_it);
191-
}
192171
}
193172
wt.awakened_keys.clear();
194173

@@ -202,16 +181,15 @@ void BlockingController::NotifyPending() {
202181
void BlockingController::AddWatched(Keys watch_keys, KeyReadyChecker krc, Transaction* trans) {
203182
auto [dbit, added] = watched_dbs_.emplace(trans->GetDbIndex(), nullptr);
204183
if (added) {
205-
dbit->second.reset(new DbWatchTable);
184+
dbit->second = make_unique<DbWatchTable>();
206185
}
207186

208187
DbWatchTable& wt = *dbit->second;
209188

210189
for (auto key : watch_keys) {
211190
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
212-
if (inserted) {
213-
res->second.reset(new WatchQueue);
214-
}
191+
if (inserted)
192+
res->second = make_unique<WatchQueue>();
215193

216194
if (!res->second->items.empty()) {
217195
Transaction* last = res->second->items.back().get();
@@ -227,7 +205,7 @@ void BlockingController::AddWatched(Keys watch_keys, KeyReadyChecker krc, Transa
227205
}
228206

229207
// Called from commands like lpush.
230-
void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
208+
void BlockingController::Awaken(DbIndex db_index, string_view db_key) {
231209
auto it = watched_dbs_.find(db_index);
232210
if (it == watched_dbs_.end())
233211
return;
@@ -236,8 +214,7 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
236214
DCHECK(!wt.queue_map.empty());
237215

238216
if (wt.AddAwakeEvent(db_key)) {
239-
VLOG(1) << "AwakeWatched: db(" << db_index << ") " << db_key;
240-
217+
VLOG(1) << "Touch: db(" << db_index << ") " << db_key;
241218
awakened_indices_.insert(db_index);
242219
}
243220
}
@@ -263,7 +240,6 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueue* wq,
263240
wq->state = WatchQueue::ACTIVE;
264241
// We deliberately keep the notified transaction in the queue to know which queue
265242
// must handled when this transaction finished.
266-
wq->notify_txid = owner_->committed_txid();
267243
awakened_transactions_.insert(head);
268244
break;
269245
}

src/server/blocking_controller.h

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ namespace dfly {
1717
class Transaction;
1818
class Namespace;
1919

20+
// Used for tracking keys of blocking transactions and properly notifying them.
21+
// First, keys are marked as watched and associated with an owner transaction. A mutating
22+
// transaction marks them as touched, and once it concludes, the watching transactions are notified.
2023
class BlockingController {
2124
public:
2225
explicit BlockingController(EngineShard* owner, Namespace* ns);
@@ -32,20 +35,17 @@ class BlockingController {
3235
return awakened_transactions_;
3336
}
3437

35-
// Removes transaction from watching these keys.
36-
void RemovedWatched(Keys keys, Transaction* tx);
38+
// Associate given keys with transaction, checked via the krc checker
39+
void AddWatched(Keys watch_keys, KeyReadyChecker krc, Transaction* me);
3740

38-
// go over potential wakened keys, verify them and activate watch queues.
39-
void NotifyPending();
41+
// Remove transaction from watching these keys
42+
void RemovedWatched(Keys keys, Transaction* tx);
4043

41-
// Blocking API
42-
// TODO: consider moving all watched functions to
43-
// EngineShard with separate per db map.
44-
//! AddWatched adds a transaction to the blocking queue.
45-
void AddWatched(Keys watch_keys, KeyReadyChecker krc, Transaction* me);
44+
// Mark given key as awakened. Called by commands mutating this key.
45+
void Awaken(DbIndex db_index, std::string_view key);
4646

47-
// Called from operations that create keys like lpush, rename etc.
48-
void AwakeWatched(DbIndex db_index, std::string_view db_key);
47+
// Notify transactions of awakened keys
48+
void NotifyPending();
4949

5050
// Used in tests and debugging functions.
5151
size_t NumWatched(DbIndex db_indx) const;
@@ -55,24 +55,17 @@ class BlockingController {
5555
struct WatchQueue;
5656
struct DbWatchTable;
5757

58-
using WatchQueueMap = absl::flat_hash_map<std::string, std::unique_ptr<WatchQueue>>;
59-
6058
void NotifyWatchQueue(std::string_view key, WatchQueue* wqm, const DbContext& context);
6159

62-
// void NotifyConvergence(Transaction* tx);
63-
6460
EngineShard* owner_;
6561
Namespace* ns_;
6662

67-
absl::flat_hash_map<DbIndex, std::unique_ptr<DbWatchTable>> watched_dbs_;
68-
69-
// serves as a temporary queue that aggregates all the possible awakened dbs.
70-
// flushed by RunStep().
71-
absl::flat_hash_set<DbIndex> awakened_indices_;
63+
// TODO: check if unique_ptr indirection is required
64+
absl::flat_hash_map<DbIndex, std::unique_ptr<DbWatchTable>> watched_dbs_; // watched keys
65+
absl::flat_hash_set<DbIndex> awakened_indices_; // watched_dbs_ with awakened keys
7266

73-
// tracks currently notified and awaked transactions.
74-
// There can be multiple transactions like this because a transaction
75-
// could awaken arbitrary number of keys.
67+
// Transactions that got awakened with NotifySuspended
68+
// TODO: Used only for one DCHECK
7669
absl::flat_hash_set<Transaction*> awakened_transactions_;
7770
};
7871
} // namespace dfly

src/server/generic_family.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {
507507
<< "Unexpected override for key " << dest_key_ << " " << dest_found_;
508508
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
509509
if (bc) {
510-
bc->AwakeWatched(t->GetDbIndex(), dest_key_);
510+
bc->Awaken(t->GetDbIndex(), dest_key_);
511511
}
512512

513513
if (shard->journal()) {
@@ -886,7 +886,7 @@ OpStatus OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
886886

887887
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
888888
if (add_res.it->second.ObjType() == OBJ_LIST && bc) {
889-
bc->AwakeWatched(target_db, key);
889+
bc->Awaken(target_db, key);
890890
}
891891

892892
return OpStatus::OK;
@@ -956,7 +956,7 @@ OpResult<void> OpRen(const OpArgs& op_args, string_view from_key, string_view to
956956

957957
auto bc = op_args.db_cntx.ns->GetBlockingController(es->shard_id());
958958
if (!is_prior_list && to_res.it->second.ObjType() == OBJ_LIST && bc) {
959-
bc->AwakeWatched(op_args.db_cntx.db_index, to_key);
959+
bc->Awaken(op_args.db_cntx.db_index, to_key);
960960
}
961961
return OpStatus::OK;
962962
}

src/server/list_family.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, strin
177177
dest_res.it->second.InitRobj(OBJ_LIST, kEncodingQL2, destql_v2);
178178
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
179179
if (blocking_controller) {
180-
blocking_controller->AwakeWatched(op_args.db_cntx.db_index, dest);
180+
blocking_controller->Awaken(op_args.db_cntx.db_index, dest);
181181
}
182182
} else {
183183
destql_v2 = GetQLV2(dest_res.it->second);
@@ -257,7 +257,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
257257
if (res.is_new) {
258258
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(es->shard_id());
259259
if (blocking_controller) {
260-
blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key);
260+
blocking_controller->Awaken(op_args.db_cntx.db_index, key);
261261
}
262262
}
263263

src/server/stream_family.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
765765

766766
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
767767
if (blocking_controller) {
768-
blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key);
768+
blocking_controller->Awaken(op_args.db_cntx.db_index, key);
769769
}
770770

771771
return result_id;
@@ -1439,7 +1439,7 @@ OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gnam
14391439
// Awake readers blocked on this group
14401440
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
14411441
if (blocking_controller) {
1442-
blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key);
1442+
blocking_controller->Awaken(op_args.db_cntx.db_index, key);
14431443
}
14441444

14451445
return OpStatus::OK;

src/server/zset_family.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ OpResult<DbSlice::ItAndUpdater> PrepareZEntry(const ZSetFamily::ZParams& zparams
182182

183183
auto* blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
184184
if (add_res.is_new && blocking_controller) {
185-
blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key);
185+
blocking_controller->Awaken(op_args.db_cntx.db_index, key);
186186
}
187187

188188
return DbSlice::ItAndUpdater{add_res.it, add_res.exp_it, std::move(add_res.post_updater)};

0 commit comments

Comments
 (0)