Skip to content

Commit 58a9bd1

Browse files
authored
chore(hset): Mutable HMapWrap functions (#5928)
1 parent 6bb5195 commit 58a9bd1

File tree

1 file changed

+93
-132
lines changed

1 file changed

+93
-132
lines changed

src/server/hset_family.cc

Lines changed: 93 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ struct HMapWrap {
6969
size_t Length() const {
7070
Overloaded ov{
7171
[](StringMap* s) { return s->UpperBoundSize(); },
72-
[](detail::ListpackWrap lw) { return lw.size(); },
72+
[](const detail::ListpackWrap& lw) { return lw.size(); },
7373
};
7474
return visit(ov, impl_);
7575
}
@@ -93,6 +93,26 @@ struct HMapWrap {
9393
return base::it::Range(visit2(cb));
9494
}
9595

96+
bool Erase(std::string_view key) {
97+
Overloaded ov{[key](StringMap* s) { return s->Erase(key); },
98+
[key](detail::ListpackWrap& lw) { return lw.Delete(key); }};
99+
return visit(ov, impl_);
100+
}
101+
102+
void AddOrUpdate(std::string_view key, std::string_view value) {
103+
Overloaded ov{[&](StringMap* sm) { sm->AddOrUpdate(key, value); },
104+
[&](detail::ListpackWrap& lw) { lw.Insert(key, value, false); }};
105+
visit(ov, impl_);
106+
}
107+
108+
void Launder(PrimeValue& pv) {
109+
Overloaded ov{
110+
[](StringMap* s) {},
111+
[&](detail::ListpackWrap& lw) { pv.SetRObjPtr(lw.GetPointer()); },
112+
};
113+
visit(ov, impl_);
114+
}
115+
96116
template <typename T> optional<T> Get() const {
97117
if (holds_alternative<T>(impl_))
98118
return get<T>(impl_);
@@ -103,30 +123,63 @@ struct HMapWrap {
103123
variant<StringMap*, detail::ListpackWrap> impl_;
104124
};
105125

126+
// Delete if length is zero
127+
void DeleteHw(HMapWrap& hw, const OpArgs& op_args, std::string_view key) {
128+
auto& db_slice = op_args.GetDbSlice();
129+
if (auto del_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH); del_it) {
130+
del_it->post_updater.Run();
131+
db_slice.Del(op_args.db_cntx, del_it->it);
132+
if (op_args.shard->journal()) {
133+
RecordJournal(op_args, "DEL"sv, {key});
134+
}
135+
}
136+
}
137+
138+
auto KeyAndArgs(Transaction* t, EngineShard* es) {
139+
return std::make_pair(t->GetShardArgs(es->shard_id()).Front(), t->GetOpArgs(es));
140+
}
141+
106142
// Wrap read-only handler
107143
template <typename F> auto WrapRO(F&& f) {
108144
using RT = std::invoke_result_t<F, HMapWrap>;
109145
return [f = std::forward<F>(f)](Transaction* t, EngineShard* es) -> RT {
110-
std::string_view key = t->GetShardArgs(es->shard_id()).Front();
111-
auto op_args = t->GetOpArgs(es);
112-
auto& db_slice = op_args.GetDbSlice();
113-
114-
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);
146+
auto [key, op_args] = KeyAndArgs(t, es);
147+
auto it_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_HASH);
115148
RETURN_ON_BAD_STATUS(it_res);
116149

117150
HMapWrap hw{(*it_res)->second, op_args.db_cntx};
118151
auto res = f(hw);
119152

120-
// Delete value if field expirations made it empty
121-
if (hw.Length() == 0) {
122-
if (auto del_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH); del_it) {
123-
del_it->post_updater.Run();
124-
db_slice.Del(op_args.db_cntx, del_it->it);
125-
if (op_args.shard->journal()) {
126-
RecordJournal(op_args, "DEL"sv, {key});
127-
}
128-
}
129-
}
153+
if (hw.Length() == 0) // Expirations might have emptied it
154+
DeleteHw(hw, op_args, key);
155+
return res;
156+
};
157+
}
158+
159+
// Wrap write handler
160+
template <typename F> auto WrapW(F&& f) {
161+
using RT = std::invoke_result_t<F, HMapWrap&>;
162+
return [f = std::forward<F>(f)](Transaction* t, EngineShard* es) -> RT {
163+
auto [key, op_args] = KeyAndArgs(t, es);
164+
165+
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_HASH);
166+
RETURN_ON_BAD_STATUS(it_res);
167+
auto& pv = it_res->it->second;
168+
169+
// Remove document before modification
170+
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);
171+
172+
HMapWrap hw{pv, op_args.db_cntx};
173+
auto res = f(hw);
174+
hw.Launder(pv);
175+
176+
// Run post updater
177+
it_res->post_updater.Run();
178+
179+
if (hw.Length() == 0)
180+
DeleteHw(hw, op_args, key);
181+
else
182+
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
130183

131184
return res;
132185
};
@@ -203,69 +256,28 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
203256
}
204257
}
205258

206-
unsigned enc = pv.Encoding();
207-
208-
if (enc == kEncodingListPack) {
209-
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
210-
optional<string_view> res;
211-
212-
if (!add_res.is_new) {
213-
if (auto it = lw.Find(field); it != lw.end())
214-
res = (*it).second;
215-
}
216-
217-
OpStatus status = IncrementValue(res, param);
218-
if (status != OpStatus::OK) {
219-
return status;
220-
}
221-
222-
if (holds_alternative<double>(*param)) {
223-
double new_val = get<double>(*param);
224-
char buf[128];
225-
char* str = RedisReplyBuilder::FormatDouble(new_val, buf, sizeof(buf));
226-
lw.Insert(field, str, false);
227-
} else { // integer increment
228-
int64_t new_val = get<int64_t>(*param);
229-
absl::AlphaNum an(new_val);
230-
lw.Insert(field, an.Piece(), false);
231-
}
232-
233-
pv.SetRObjPtr(lw.GetPointer());
234-
} else {
235-
DCHECK_EQ(enc, kEncodingStrMap2);
236-
StringMap* sm = GetStringMap(pv, op_args.db_cntx);
237-
238-
sds val = nullptr;
239-
if (!add_res.is_new) {
240-
auto it = sm->Find(field);
241-
if (it != sm->end()) {
242-
val = it->second;
243-
}
244-
}
245-
246-
optional<string_view> sv;
247-
if (val) {
248-
sv.emplace(val, sdslen(val));
249-
}
250-
251-
OpStatus status = IncrementValue(sv, param);
252-
if (status != OpStatus::OK) {
253-
return status;
254-
}
259+
HMapWrap hw{pv, op_args.db_cntx};
260+
optional<string_view> res;
261+
if (!add_res.is_new) {
262+
if (auto it = hw.Find(field); it)
263+
res = (*it).second;
264+
}
255265

256-
if (holds_alternative<double>(*param)) {
257-
double new_val = get<double>(*param);
266+
if (OpStatus status = IncrementValue(res, param); status != OpStatus::OK)
267+
return status;
258268

259-
char buf[128];
260-
char* str = RedisReplyBuilder::FormatDouble(new_val, buf, sizeof(buf));
261-
sm->AddOrUpdate(field, str);
262-
} else { // integer increment
263-
int64_t new_val = get<int64_t>(*param);
264-
absl::AlphaNum an(new_val);
265-
sm->AddOrUpdate(field, an.Piece());
266-
}
269+
if (holds_alternative<double>(*param)) {
270+
double new_val = get<double>(*param);
271+
char buf[128];
272+
char* str = RedisReplyBuilder::FormatDouble(new_val, buf, sizeof(buf));
273+
hw.AddOrUpdate(field, str);
274+
} else { // integer increment
275+
int64_t new_val = get<int64_t>(*param);
276+
absl::AlphaNum an(new_val);
277+
hw.AddOrUpdate(field, an.Piece());
267278
}
268279

280+
hw.Launder(pv);
269281
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
270282

271283
return OpStatus::OK;
@@ -316,58 +328,6 @@ OpResult<StringVec> OpScan(const HMapWrap& hw, uint64_t* cursor, const ScanOpts&
316328
return res;
317329
}
318330

319-
OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, CmdArgList values) {
320-
DCHECK(!values.empty());
321-
322-
auto& db_slice = op_args.GetDbSlice();
323-
auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH);
324-
RETURN_ON_BAD_STATUS(it_res);
325-
326-
PrimeValue& pv = it_res->it->second;
327-
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);
328-
329-
unsigned deleted = 0;
330-
bool key_remove = false;
331-
unsigned enc = pv.Encoding();
332-
333-
if (enc == kEncodingListPack) {
334-
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
335-
for (string_view s : values) {
336-
if (lw.Delete(s))
337-
++deleted;
338-
}
339-
pv.SetRObjPtr(lw.GetPointer());
340-
key_remove = lw.size() == 0;
341-
} else {
342-
DCHECK_EQ(enc, kEncodingStrMap2);
343-
StringMap* sm = GetStringMap(pv, op_args.db_cntx);
344-
345-
for (auto s : values) {
346-
if (sm->Erase(s)) {
347-
++deleted;
348-
}
349-
350-
// Even if the previous Erase op did not erase anything, it can remove expired fields as a
351-
// side effect.
352-
if (sm->Empty()) {
353-
key_remove = true;
354-
break;
355-
}
356-
}
357-
}
358-
359-
it_res->post_updater.Run();
360-
361-
if (!key_remove)
362-
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
363-
364-
if (key_remove) {
365-
db_slice.Del(op_args.db_cntx, it_res->it);
366-
}
367-
368-
return deleted;
369-
}
370-
371331
OpResult<vector<OptStr>> OpHMGet(const HMapWrap& hw, CmdArgList fields) {
372332
DCHECK(!fields.empty());
373333

@@ -590,12 +550,13 @@ struct HSetReplies {
590550
} // namespace
591551

592552
void HSetFamily::HDel(CmdArgList args, const CommandContext& cmd_cntx) {
593-
string_view key = ArgS(args, 0);
594-
595-
auto cb = [&](Transaction* t, EngineShard* shard) {
596-
return OpDel(t->GetOpArgs(shard), key, args.subspan(1));
553+
auto cb = [&](HMapWrap& hw) -> OpResult<uint32_t> {
554+
unsigned deleted = 0;
555+
for (string_view s : args.subspan(1))
556+
deleted += hw.Erase(s);
557+
return deleted;
597558
};
598-
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(cb));
559+
HSetReplies{cmd_cntx.rb}.Send(cmd_cntx.tx->ScheduleSingleHopT(WrapW(cb)));
599560
}
600561

601562
void HSetFamily::HExpire(CmdArgList args, const CommandContext& cmd_cntx) {

0 commit comments

Comments
 (0)