Skip to content

Commit 6d9c325

Browse files
authored
feat: add CSS option for FT.CREATE (#6057)
* feat: add CSS option for FT.CREATE
1 parent 978961f commit 6d9c325

File tree

6 files changed

+182
-3
lines changed

6 files changed

+182
-3
lines changed

src/server/cluster/cluster_config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class ClusterConfig {
3232
bool IsMySlot(SlotId id) const;
3333
bool IsMySlot(std::string_view key) const;
3434

35+
const std::string& MyId() const {
36+
return my_id_;
37+
}
38+
3539
// Returns the master configured for `id`.
3640
ClusterNodeInfo GetMasterNodeForSlot(SlotId id) const;
3741

src/server/cluster/coordinator.cc

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "server/cluster/coordinator.h"
6+
7+
#include "base/logging.h"
8+
#include "facade/redis_parser.h"
9+
#include "facade/socket_utils.h"
10+
#include "server/cluster/cluster_config.h"
11+
12+
using namespace std;
13+
using namespace facade;
14+
15+
namespace dfly::cluster {
16+
17+
class Coordinator::CrossShardClient : private ProtocolClient {
18+
public:
19+
CrossShardClient(std::string host, uint16_t port) : ProtocolClient(std::move(host), port) {
20+
}
21+
22+
using ProtocolClient::CloseSocket;
23+
~CrossShardClient() {
24+
CloseSocket();
25+
}
26+
27+
void Init() {
28+
VLOG(1) << "Resolving host DNS to " << server().Description();
29+
if (error_code ec = ResolveHostDns(); ec) {
30+
LOG(WARNING) << "Could not resolve host DNS to " << server().Description() << ": "
31+
<< ec.message();
32+
exec_st_.ReportError(GenericError(ec, "Could not resolve host dns."));
33+
return;
34+
}
35+
VLOG(1) << "Start coordinator connection to " << server().Description();
36+
auto timeout = 3000ms; // TODO add flag;
37+
if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) {
38+
LOG(WARNING) << "Couldn't connect to " << server().Description() << ": " << ec.message()
39+
<< ", socket state: " << GetSocketInfo(Sock()->native_handle());
40+
exec_st_.ReportError(GenericError(ec, "Couldn't connect to source."));
41+
return;
42+
}
43+
44+
ResetParser(RedisParser::Mode::CLIENT);
45+
}
46+
47+
void Cancel() {
48+
ShutdownSocket();
49+
}
50+
51+
void SendCommand(std::string_view cmd) {
52+
if (auto ec = SendCommandAndReadResponse(cmd); ec) {
53+
LOG(WARNING) << "Coordinator could not send command to : " << server().Description() << ": "
54+
<< ec.message() << ", socket state: " << GetSocketInfo(Sock()->native_handle());
55+
exec_st_.ReportError(GenericError(ec, "Could not send command."));
56+
}
57+
// add response processing
58+
}
59+
};
60+
61+
Coordinator& Coordinator::Current() {
62+
static Coordinator instance;
63+
return instance;
64+
}
65+
66+
void Coordinator::DispatchAll(std::string_view command) {
67+
auto cluster_config = ClusterConfig::Current();
68+
if (!cluster_config) {
69+
VLOG(2) << "No cluster config found for coordinator plan creation.";
70+
return;
71+
}
72+
VLOG(2) << "Dispatching command to all shards: " << command;
73+
auto shards_config = cluster_config->GetConfig();
74+
75+
std::vector<std::unique_ptr<CrossShardClient>> clients;
76+
for (const auto& shard : shards_config) {
77+
if (shard.master.id == cluster_config->MyId()) {
78+
continue;
79+
}
80+
clients.emplace_back(std::make_unique<CrossShardClient>(shard.master.ip, shard.master.port));
81+
clients.back()->Init();
82+
clients.back()->SendCommand(std::string(command));
83+
}
84+
}
85+
86+
} // namespace dfly::cluster

src/server/cluster/coordinator.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
#pragma once
5+
6+
#include "server/cluster/cluster_defs.h"
7+
#include "server/protocol_client.h"
8+
9+
namespace dfly::cluster {
10+
11+
// Coordinator needs to create and manage connections between nodes in the cluster for cross shard
12+
// commands. All cross-shard commands are dispatched through the Coordinator.
13+
// It can be used to exeute commands on all shards or specific shards.
14+
class Coordinator {
15+
public:
16+
static Coordinator& Current();
17+
void DispatchAll(std::string_view command);
18+
19+
private:
20+
Coordinator() = default;
21+
class CrossShardClient;
22+
};
23+
24+
} // namespace dfly::cluster

src/server/search/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ if (NOT WITH_SEARCH)
44
return()
55
endif()
66

7-
add_library(dfly_search_server aggregator.cc doc_accessors.cc doc_index.cc search_family.cc index_join.cc)
7+
add_library(dfly_search_server aggregator.cc doc_accessors.cc doc_index.cc search_family.cc index_join.cc
8+
../cluster/coordinator.cc)
89
target_link_libraries(dfly_search_server dfly_transaction dragonfly_lib dfly_facade redis_lib jsonpath TRDP::jsoncons)
910

1011

src/server/search/search_family.cc

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <absl/strings/ascii.h>
1010
#include <absl/strings/match.h>
1111
#include <absl/strings/str_format.h>
12+
#include <absl/strings/str_join.h>
1213
#include <absl/strings/str_split.h>
1314
#include <absl/strings/string_view.h>
1415

@@ -24,6 +25,8 @@
2425
#include "facade/error.h"
2526
#include "facade/reply_builder.h"
2627
#include "server/acl/acl_commands_def.h"
28+
#include "server/cluster/cluster_config.h"
29+
#include "server/cluster/coordinator.h"
2730
#include "server/command_registry.h"
2831
#include "server/config_registry.h"
2932
#include "server/conn_context.h"
@@ -316,7 +319,7 @@ ParseResult<bool> ParseSchema(CmdArgParser* parser, DocIndex* index) {
316319
#pragma GCC diagnostic pop
317320
#endif
318321

319-
ParseResult<DocIndex> ParseCreateParams(CmdArgParser* parser) {
322+
ParseResult<DocIndex> CreateDocIndex(CmdArgParser* parser) {
320323
DocIndex index{};
321324

322325
while (parser->HasNext()) {
@@ -1047,7 +1050,9 @@ void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) {
10471050
CmdArgParser parser{args};
10481051
string_view idx_name = parser.Next();
10491052

1050-
auto parsed_index = ParseCreateParams(&parser);
1053+
bool is_cross_shard = parser.Check("CSS");
1054+
1055+
auto parsed_index = CreateDocIndex(&parser);
10511056
if (SendErrorIfOccurred(parsed_index, &parser, builder)) {
10521057
return;
10531058
}
@@ -1069,6 +1074,15 @@ void SearchFamily::FtCreate(CmdArgList args, const CommandContext& cmd_cntx) {
10691074
return builder->SendError("Index already exists");
10701075
}
10711076

1077+
if (!is_cross_shard && IsClusterEnabled()) {
1078+
std::string args_str = absl::StrJoin(args.subspan(1), " ");
1079+
std::string cmd = absl::StrCat("FT.CREATE ", idx_name, " CSS ", args_str);
1080+
1081+
// TODO add processing of the reply to make sure index was created successfully on all shards,
1082+
// and prevent simultaneous creation of the same index.
1083+
cluster::Coordinator::Current().DispatchAll(cmd);
1084+
}
1085+
10721086
auto idx_ptr = make_shared<DocIndex>(std::move(parsed_index).value());
10731087
cmd_cntx.tx->Execute(
10741088
[idx_name, idx_ptr](auto* tx, auto* es) {
@@ -1283,6 +1297,8 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
12831297
string_view index_name = parser.Next();
12841298
string_view query_str = parser.Next();
12851299

1300+
bool is_cross_shard = parser.Check("CSS");
1301+
12861302
auto* builder = cmd_cntx.rb;
12871303
auto params = ParseSearchParams(&parser);
12881304
if (SendErrorIfOccurred(params, &parser, builder))
@@ -1295,6 +1311,13 @@ void SearchFamily::FtSearch(CmdArgList args, const CommandContext& cmd_cntx) {
12951311
absl::StrCat("Query string is too long, max length is ", max_query_bytes, " bytes"));
12961312
}
12971313

1314+
if (!is_cross_shard && IsClusterEnabled()) {
1315+
std::string args_str = absl::StrJoin(args.subspan(2), " ");
1316+
std::string cmd = absl::StrCat("FT.SEARCH ", index_name, " ", query_str, " CSS ", args_str);
1317+
1318+
cluster::Coordinator::Current().DispatchAll(cmd);
1319+
}
1320+
12981321
search::SearchAlgorithm search_algo;
12991322
if (!search_algo.Init(query_str, &params->query_params, &params->optional_filters))
13001323
return builder->SendError("Query syntax error");

tests/dragonfly/cluster_test.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ async def wait_for_status(admin_client, node_id, status, timeout=10):
181181
assert len(states) != 0 and all(state[2] in status for state in states), states
182182

183183

184+
async def wait_for_ft_index_creation(client, idx_name, timeout=5):
185+
get_status = lambda: client.execute_command("FT.INFO", idx_name)
186+
187+
async for states, breaker in tick_timer(get_status, timeout=timeout):
188+
with breaker:
189+
assert len(states) != 0, states
190+
191+
184192
async def wait_for_error(admin_client, node_id, error, timeout=10):
185193
get_status = lambda: admin_client.execute_command(
186194
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id
@@ -3442,3 +3450,36 @@ async def test_replica_takeover_moved(
34423450
await m1.client.execute_command(f"replicaof localhost {r1.instance.port}")
34433451
await check_all_replicas_finished([m1.client], r1.client)
34443452
assert await m1.client.execute_command("GET newk") == "foo"
3453+
3454+
3455+
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
3456+
async def test_SearchRequestDistribution(df_factory: DflyInstanceFactory):
3457+
"""
3458+
Create cluster of 3 nodes.
3459+
Send FT.CREATE to first node and check that index was created on all nodes.
3460+
"""
3461+
3462+
instances = [
3463+
df_factory.create(port=next(next_port), admin_port=next(next_port), vmodule="coordinator=2")
3464+
for i in range(3)
3465+
]
3466+
3467+
df_factory.start_all(instances)
3468+
3469+
nodes = [(await create_node_info(instance)) for instance in instances]
3470+
nodes[0].slots = [(0, 5259)]
3471+
nodes[1].slots = [(5260, 10519)]
3472+
nodes[2].slots = [(10520, 16383)]
3473+
3474+
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
3475+
3476+
assert (
3477+
await nodes[0].client.execute_command(
3478+
"FT.CREATE", "idx", "ON", "HASH", "SCHEMA", "title", "TEXT"
3479+
)
3480+
== "OK"
3481+
)
3482+
3483+
await asyncio.sleep(3)
3484+
for node in nodes:
3485+
await wait_for_ft_index_creation(node.client, "idx")

0 commit comments

Comments
 (0)