Skip to content

Commit e695801

Browse files
authored
feat: add ftsearch result parsing for css (#6090)
* feat: add ftsearch result parsing for css
1 parent 1e14f80 commit e695801

File tree

4 files changed

+101
-14
lines changed

4 files changed

+101
-14
lines changed

src/server/cluster/coordinator.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,18 @@ class Coordinator::CrossShardClient : private ProtocolClient {
4848
ShutdownSocket();
4949
}
5050

51-
void SendCommand(std::string_view cmd) {
52-
if (auto ec = SendCommandAndReadResponse(cmd); ec) {
51+
void SendCommand(std::string_view cmd, const RespCB& cb) {
52+
if (auto ec = ProtocolClient::SendCommand(cmd); ec) {
5353
LOG(WARNING) << "Coordinator could not send command to : " << server().Description() << ": "
5454
<< ec.message() << ", socket state: " << GetSocketInfo(Sock()->native_handle());
5555
exec_st_.ReportError(GenericError(ec, "Could not send command."));
5656
}
57+
auto timeout = 30000; // TODO add flag;
58+
if (auto resp = ReadRespReply(timeout); !resp) {
59+
LOG(WARNING) << "Error reading response from " << server().Description() << ": "
60+
<< resp.error() << ", socket state: " + GetSocketInfo(Sock()->native_handle());
61+
}
62+
cb(LastResponseArgs());
5763
// add response processing
5864
}
5965
};
@@ -63,7 +69,7 @@ Coordinator& Coordinator::Current() {
6369
return instance;
6470
}
6571

66-
void Coordinator::DispatchAll(std::string_view command) {
72+
void Coordinator::DispatchAll(std::string_view command, RespCB cb) {
6773
auto cluster_config = ClusterConfig::Current();
6874
if (!cluster_config) {
6975
VLOG(2) << "No cluster config found for coordinator plan creation.";
@@ -79,7 +85,7 @@ void Coordinator::DispatchAll(std::string_view command) {
7985
}
8086
clients.emplace_back(std::make_unique<CrossShardClient>(shard.master.ip, shard.master.port));
8187
clients.back()->Init();
82-
clients.back()->SendCommand(std::string(command));
88+
clients.back()->SendCommand(std::string(command), cb);
8389
}
8490
}
8591

src/server/cluster/coordinator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ namespace dfly::cluster {
1313
// It can be used to exeute commands on all shards or specific shards.
1414
class Coordinator {
1515
public:
16+
using RespCB = std::function<void(const facade::RespVec&)>;
17+
1618
static Coordinator& Current();
17-
void DispatchAll(std::string_view command);
19+
void DispatchAll(std::string_view command, RespCB cb);
1820

1921
private:
2022
Coordinator() = default;

src/server/search/search_family.cc

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
#include "src/core/overloaded.h"
3939

4040
ABSL_FLAG(bool, search_reject_legacy_field, true, "FT.AGGREGATE: Reject legacy field names.");
41+
ABSL_FLAG(bool, cluster_search, false,
42+
"Enable search commands for cross-shard search. turned off by default for safety.");
4143

4244
ABSL_FLAG(size_t, MAXSEARCHRESULTS, 1000000, "Maximum number of results from ft.search command");
4345

@@ -1074,13 +1076,13 @@ void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) {
10741076
return builder->SendError("Index already exists");
10751077
}
10761078

1077-
if (!is_cross_shard && IsClusterEnabled()) {
1079+
if (absl::GetFlag(FLAGS_cluster_search) && !is_cross_shard && IsClusterEnabled()) {
10781080
std::string args_str = absl::StrJoin(args.subspan(1), " ");
10791081
std::string cmd = absl::StrCat("FT.CREATE ", idx_name, " CSS ", args_str);
10801082

10811083
// TODO add processing of the reply to make sure index was created successfully on all shards,
10821084
// and prevent simultaneous creation of the same index.
1083-
cluster::Coordinator::Current().DispatchAll(cmd);
1085+
cluster::Coordinator::Current().DispatchAll(cmd, [](const facade::RespVec&) {});
10841086
}
10851087

10861088
auto idx_ptr = make_shared<DocIndex>(std::move(parsed_index).value());
@@ -1292,6 +1294,66 @@ void SearchFamily::FtList(CmdArgList args, const CommandContext& cmd_cntx) {
12921294
rb->SendBulkStrArr(names);
12931295
}
12941296

1297+
static vector<SearchResult> FtSearchCSS(std::string_view idx, std::string_view query,
1298+
std::string_view args_str) {
1299+
vector<SearchResult> results;
1300+
std::string cmd = absl::StrCat("FT.SEARCH ", idx, " ", query, " CSS ", args_str);
1301+
1302+
// TODO for now we suppose that callback is called synchronously. If not, we need to add
1303+
// synchronization here for results vector modification.
1304+
cluster::Coordinator::Current().DispatchAll(cmd, [&](const facade::RespVec& res) {
1305+
VLOG(3) << "FT.SEARCH CSS reply: " << res;
1306+
1307+
if (res.empty()) {
1308+
LOG(ERROR) << "FT.SEARCH CSS reply is empty";
1309+
return;
1310+
}
1311+
if (res[0].type == facade::RespExpr::Type::ERROR) {
1312+
LOG(WARNING) << "FT.SEARCH CSS reply error: " << res[0].GetView();
1313+
return;
1314+
}
1315+
1316+
const auto size = res[0].GetInt();
1317+
if (!size.has_value()) {
1318+
LOG(ERROR) << "FT.SEARCH CSS reply unexpected type: " << static_cast<int>(res[0].type);
1319+
return;
1320+
}
1321+
1322+
results.emplace_back();
1323+
results.back().total_hits = *size;
1324+
for (size_t i = 2; i < res.size(); i += 2) {
1325+
auto& search_doc = results.back().docs.emplace_back();
1326+
search_doc.key = res[i - 1].GetString();
1327+
if (res[i].type != facade::RespExpr::Type::ARRAY) {
1328+
LOG(ERROR) << "FT.SEARCH CSS reply unexpected type for document data: "
1329+
<< static_cast<int>(res[i].type);
1330+
return;
1331+
}
1332+
const auto& arr_res = res[i].GetVec();
1333+
if (arr_res.size() % 2 != 0) {
1334+
LOG(ERROR) << "FT.SEARCH CSS reply unexpected number of elements for document data: "
1335+
<< arr_res.size();
1336+
return;
1337+
}
1338+
1339+
for (size_t j = 0; j < arr_res.size(); j += 2) {
1340+
if (arr_res[j].type != facade::RespExpr::Type::STRING) {
1341+
LOG(ERROR) << "FT.SEARCH CSS reply unexpected type for document data: "
1342+
<< static_cast<int>(arr_res[j].type);
1343+
return;
1344+
}
1345+
if (arr_res[j + 1].type != facade::RespExpr::Type::STRING) {
1346+
LOG(ERROR) << "FT.SEARCH CSS reply unexpected type for document data: "
1347+
<< static_cast<int>(arr_res[j].type);
1348+
return;
1349+
}
1350+
search_doc.values.emplace(arr_res[j].GetString(), arr_res[j + 1].GetString());
1351+
}
1352+
}
1353+
});
1354+
return results;
1355+
}
1356+
12951357
void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
12961358
CmdArgParser parser{args};
12971359
string_view index_name = parser.Next();
@@ -1311,11 +1373,11 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
13111373
absl::StrCat("Query string is too long, max length is ", max_query_bytes, " bytes"));
13121374
}
13131375

1314-
if (!is_cross_shard && IsClusterEnabled()) {
1376+
vector<SearchResult> css_docs;
1377+
if (absl::GetFlag(FLAGS_cluster_search) && !is_cross_shard && IsClusterEnabled()) {
13151378
std::string args_str = absl::StrJoin(args.subspan(2), " ");
1316-
std::string cmd = absl::StrCat("FT.SEARCH ", index_name, " ", query_str, " CSS ", args_str);
13171379

1318-
cluster::Coordinator::Current().DispatchAll(cmd);
1380+
css_docs = FtSearchCSS(index_name, query_str, args_str);
13191381
}
13201382

13211383
search::SearchAlgorithm search_algo;
@@ -1342,6 +1404,10 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
13421404
return builder->SendError(*res.error);
13431405
}
13441406

1407+
// TODO add merging of CSS results with local results (SORT, LIMIT, etc)
1408+
docs.insert(docs.end(), std::make_move_iterator(css_docs.begin()),
1409+
std::make_move_iterator(css_docs.end()));
1410+
13451411
SearchReply(*params, search_algo.GetKnnScoreSortOption(), absl::MakeSpan(docs), builder);
13461412
}
13471413

tests/dragonfly/cluster_test.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3452,15 +3452,19 @@ async def test_replica_takeover_moved(
34523452
assert await m1.client.execute_command("GET newk") == "foo"
34533453

34543454

3455-
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
3455+
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_search": "yes"})
34563456
async def test_SearchRequestDistribution(df_factory: DflyInstanceFactory):
34573457
"""
34583458
Create cluster of 3 nodes.
34593459
Send FT.CREATE to first node and check that index was created on all nodes.
34603460
"""
34613461

34623462
instances = [
3463-
df_factory.create(port=next(next_port), admin_port=next(next_port), vmodule="coordinator=2")
3463+
df_factory.create(
3464+
port=next(next_port),
3465+
admin_port=next(next_port),
3466+
vmodule="coordinator=2,search_family=3",
3467+
)
34643468
for i in range(3)
34653469
]
34663470

@@ -3480,10 +3484,19 @@ async def test_SearchRequestDistribution(df_factory: DflyInstanceFactory):
34803484
== "OK"
34813485
)
34823486

3483-
await asyncio.sleep(3)
34843487
for node in nodes:
34853488
await wait_for_ft_index_creation(node.client, "idx")
34863489

3490+
cclient = instances[0].cluster_client()
3491+
3492+
for i in range(0, 10):
3493+
assert await cclient.execute_command("HSET", f"s{i}", "title", f"test {i}") == 1
3494+
3495+
res = await nodes[0].client.execute_command("FT.SEARCH", "idx", "@title:test", "text")
3496+
assert res[0] == 10
3497+
for i in range(0, 10):
3498+
assert f"s{i}" in res
3499+
34873500

34883501
async def verify_keys_match_number_of_index_docs(client, expected_num_keys):
34893502
# Get number of docs in index
@@ -3499,7 +3512,7 @@ async def verify_keys_match_number_of_index_docs(client, expected_num_keys):
34993512
assert keyspace_keys == expected_num_keys
35003513

35013514

3502-
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
3515+
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cluster_search": "yes"})
35033516
async def test_remove_docs_on_cluster_migration(df_factory):
35043517
instances = [
35053518
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)

0 commit comments

Comments
 (0)