Skip to content

Commit 2598f97

Browse files
authored
feat: add support for BLMPOP command (#5370)
* feat: add support for `BLMPOP` command This commit adds support for the `BLMPOP` command, which is the blocking variant of the existing `LMPOP` command. This commit resolves GitHub issue #3876 --------- Signed-off-by: Eric <[email protected]>
1 parent 6bde91e commit 2598f97

File tree

4 files changed

+148
-1
lines changed

4 files changed

+148
-1
lines changed

src/server/list_family.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,47 @@ void ListFamily::LMPop(CmdArgList args, const CommandContext& cmd_cntx) {
10081008
}
10091009
}
10101010

1011+
void ListFamily::BLMPop(CmdArgList args, const CommandContext& cmd_cntx) {
1012+
auto* response_builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
1013+
1014+
CmdArgParser parser{args};
1015+
float timeout = parser.Next<float>();
1016+
if (auto err = parser.Error(); err)
1017+
return response_builder->SendError(err->MakeReply());
1018+
1019+
if (timeout < 0)
1020+
return response_builder->SendError("timeout is negative");
1021+
1022+
parser.Skip(parser.Next<size_t>()); // Skip numkeys and keys
1023+
ListDir dir = parser.MapNext("LEFT", ListDir::LEFT, "RIGHT", ListDir::RIGHT);
1024+
1025+
size_t pop_count = 1;
1026+
if (parser.Check("COUNT"))
1027+
pop_count = parser.Next<size_t>();
1028+
1029+
if (!parser.Finalize())
1030+
return response_builder->SendError(parser.Error()->MakeReply());
1031+
1032+
OpResult<StringVec> result;
1033+
auto cb = [&](Transaction* t, EngineShard* shard, string_view key) {
1034+
result = OpPop(t->GetOpArgs(shard), key, dir, pop_count, true, false);
1035+
return result.status();
1036+
};
1037+
1038+
ConnectionContext* conn_cntx = cmd_cntx.conn_cntx;
1039+
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
1040+
cmd_cntx.tx, OBJ_LIST, std::move(cb), unsigned(timeout * 1000), &conn_cntx->blocked,
1041+
&conn_cntx->paused);
1042+
1043+
if (popped_key.ok()) {
1044+
response_builder->StartArray(2);
1045+
response_builder->SendBulkString(*popped_key);
1046+
response_builder->SendBulkStrArr(*result);
1047+
} else {
1048+
response_builder->SendNull();
1049+
}
1050+
}
1051+
10111052
void ListFamily::LPush(CmdArgList args, const CommandContext& cmd_cntx) {
10121053
return PushGeneric(ListDir::LEFT, false, args, cmd_cntx.tx, cmd_cntx.rb);
10131054
}
@@ -1271,6 +1312,7 @@ constexpr uint32_t kLPush = WRITE | LIST | FAST;
12711312
constexpr uint32_t kLPushX = WRITE | LIST | FAST;
12721313
constexpr uint32_t kLPop = WRITE | LIST | FAST;
12731314
constexpr uint32_t kLMPop = WRITE | LIST | FAST;
1315+
constexpr uint32_t kBLMPop = WRITE | LIST | SLOW | BLOCKING;
12741316
constexpr uint32_t kRPush = WRITE | LIST | FAST;
12751317
constexpr uint32_t kRPushX = WRITE | LIST | FAST;
12761318
constexpr uint32_t kRPop = WRITE | LIST | FAST;
@@ -1297,6 +1339,8 @@ void ListFamily::Register(CommandRegistry* registry) {
12971339
<< CI{"LPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, acl::kLPushX}.HFUNC(LPushX)
12981340
<< CI{"LPOP", CO::WRITE | CO::FAST, -2, 1, 1, acl::kLPop}.HFUNC(LPop)
12991341
<< CI{"LMPOP", CO::WRITE | CO::SLOW | CO::VARIADIC_KEYS, -4, 2, 2, acl::kLMPop}.HFUNC(LMPop)
1342+
<< CI{"BLMPOP", CO::WRITE | CO::SLOW | CO::VARIADIC_KEYS, -5, 3, 3, acl::kBLMPop}.HFUNC(
1343+
BLMPop)
13001344
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, acl::kRPush}.HFUNC(RPush)
13011345
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, acl::kRPushX}.HFUNC(RPushX)
13021346
<< CI{"RPOP", CO::WRITE | CO::FAST, -2, 1, 1, acl::kRPop}.HFUNC(RPop)

src/server/list_family.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class ListFamily {
3434
static void BLPop(CmdArgList args, const CommandContext& cmd_cntx);
3535
static void BRPop(CmdArgList args, const CommandContext& cmd_cntx);
3636
static void LMPop(CmdArgList args, const CommandContext& cmd_cntx);
37+
static void BLMPop(CmdArgList args, const CommandContext& cmd_cntx);
3738
static void LLen(CmdArgList args, const CommandContext& cmd_cntx);
3839
static void LPos(CmdArgList args, const CommandContext& cmd_cntx);
3940
static void LIndex(CmdArgList args, const CommandContext& cmd_cntx);

src/server/list_family_test.cc

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,108 @@ TEST_F(ListFamilyTest, Expire) {
8585
EXPECT_THAT(resp, IntArg(1));
8686
}
8787

88+
TEST_F(ListFamilyTest, BLMPopNonblocking) {
89+
auto resp = Run({"lpush", kKey1, "1", "2", "3", "4"});
90+
EXPECT_THAT(resp, IntArg(4));
91+
92+
resp = Run({"blmpop", "0.01", "2", kKey2, kKey1, "LEFT"});
93+
EXPECT_THAT(resp, RespElementsAre(kKey1, RespElementsAre("4")));
94+
95+
resp = Run({"blmpop", "0.01", "2", kKey2, kKey1, "RIGHT", "COUNT", "2"});
96+
EXPECT_THAT(resp, RespElementsAre(kKey1, RespElementsAre("1", "2")));
97+
98+
// If the count exceeds the size of the key's values (but the key is non-empty) then return all of
99+
// the key's values
100+
resp = Run({"blmpop", "0.01", "1", kKey1, "RIGHT", "COUNT", "10"});
101+
EXPECT_THAT(resp, RespElementsAre(kKey1, RespElementsAre("3")));
102+
}
103+
104+
TEST_F(ListFamilyTest, BLMPopInvalidSyntax) {
105+
// Not enough arguments
106+
auto resp = Run({"blmpop", "0.1", "1", kKey1});
107+
EXPECT_THAT(resp, ErrArg("wrong number of arguments"));
108+
109+
// Timeout is not a float
110+
resp = Run({"blmpop", "foo", "1", kKey1, "LEFT", "COUNT", "1"});
111+
EXPECT_THAT(resp, ErrArg("value is not a valid float"));
112+
113+
// Negative timeout
114+
resp = Run({"blmpop", "-0.01", "1", kKey1, "LEFT", "COUNT", "1"});
115+
EXPECT_THAT(resp, ErrArg("timeout is negative"));
116+
117+
// Zero keys
118+
resp = Run({"blmpop", "0.01", "0", "LEFT", "COUNT", "1"});
119+
EXPECT_THAT(resp, ErrArg("at least 1 input key is needed"));
120+
121+
// Number of keys is not uint
122+
resp = Run({"blmpop", "0.01", "aa", kKey1, "LEFT"});
123+
EXPECT_THAT(resp, ErrArg("value is not an integer or out of range"));
124+
125+
// Missing LEFT/RIGHT
126+
resp = Run({"blmpop", "0.01", "1", kKey1, "COUNT", "1"});
127+
EXPECT_THAT(resp, ErrArg("syntax error"));
128+
129+
// Wrong number of keys
130+
resp = Run({"blmpop", "0.01", "1", kKey1, kKey2, "LEFT"});
131+
EXPECT_THAT(resp, ErrArg("syntax error"));
132+
133+
// COUNT without number
134+
resp = Run({"blmpop", "0.01", "1", kKey1, "LEFT", "COUNT"});
135+
EXPECT_THAT(resp, ErrArg("syntax error"));
136+
137+
// COUNT is not uint
138+
resp = Run({"blmpop", "0.01", "1", kKey1, "LEFT", "COUNT", "boo"});
139+
EXPECT_THAT(resp, ErrArg("value is not an integer or out of range"));
140+
141+
// Too many arguments
142+
resp = Run({"blmpop", "0.01", "1", "c", "LEFT", "COUNT", "2", "foo"});
143+
EXPECT_THAT(resp, ErrArg("syntax error"));
144+
}
145+
146+
TEST_F(ListFamilyTest, BLMPopBlocking) {
147+
// attempting to pop from empty key results in blocking and returns
148+
// null if no values are pushed to the key.
149+
RespExpr resp;
150+
auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
151+
resp = Run({"blmpop", "0.1", "1", kKey1, "LEFT"});
152+
});
153+
ThisFiber::SleepFor(50us);
154+
ASSERT_TRUE(IsLocked(0, kKey1));
155+
156+
fb0.Join();
157+
ASSERT_FALSE(IsLocked(0, kKey1));
158+
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
159+
160+
// BLMPOP should not block if there is a non-empty key available
161+
resp = Run({"lpush", kKey1, "0"});
162+
EXPECT_THAT(resp, IntArg(1));
163+
164+
auto fb1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] {
165+
resp = Run({"blmpop", "0.1", "1", kKey1, "LEFT"});
166+
});
167+
ThisFiber::SleepFor(50us);
168+
// shouldn't need to lock the key just pop immediately
169+
ASSERT_FALSE(IsLocked(0, kKey1));
170+
fb1.Join();
171+
172+
// should block until a key is available and then immediately unblock
173+
auto fb2 = pp_->at(2)->LaunchFiber(Launch::dispatch, [&] {
174+
resp = Run({"blmpop", "0.1", "1", kKey1, "LEFT"});
175+
});
176+
ThisFiber::SleepFor(50us);
177+
178+
// key should be locked while waiting
179+
ASSERT_TRUE(IsLocked(0, kKey1));
180+
181+
auto push_resp = Run({"lpush", kKey1, "1"});
182+
EXPECT_THAT(push_resp, IntArg(1));
183+
184+
// key should be unlocked after being inserted to
185+
fb2.Join();
186+
ASSERT_FALSE(IsLocked(0, kKey1));
187+
EXPECT_THAT(resp, RespElementsAre(kKey1, RespElementsAre("1")));
188+
}
189+
88190
TEST_F(ListFamilyTest, BLPopUnblocking) {
89191
auto resp = Run({"lpush", kKey1, "1"});
90192
EXPECT_THAT(resp, IntArg(1));

src/server/transaction.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1588,7 +1588,7 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
15881588
bonus = 0; // Z<xxx>STORE <key> commands
15891589

15901590
unsigned num_keys_index;
1591-
if (absl::StartsWith(name, "EVAL"))
1591+
if (absl::StartsWith(name, "EVAL") || name == "BLMPOP")
15921592
num_keys_index = 1;
15931593
else
15941594
num_keys_index = bonus ? *bonus + 1 : 0;

0 commit comments

Comments
 (0)