Skip to content

Commit 6bcd9c5

Browse files
authored
feat(interactive): Introducing sharding mode to Interactive (#4410)
Introducing new configuration `sharding_mode` to Interactive. `sharding_mode` could be configured to `exclusive` or `cooperative` - exclusive: One shard(shard_id = shard_num - 1) will be reserved for only processing admin request - cooperative: All shard will processing both admin requests and query requests. To specify this configuration, add in interactive_config.yaml ```yaml http_service: sharding_mode: exclusive ``` By default, `sharding_mode` is configured to `exclusive` to forbid admin service from being blocked by long-run queries. User could switch the mode via a config file. For the reason why this mode is introduced, see #4409 And also adding a CI and fix a bug.
1 parent ebdf9d1 commit 6bcd9c5

File tree

14 files changed

+648
-111
lines changed

14 files changed

+648
-111
lines changed

docs/flex/interactive/configuration.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,27 @@ compiler:
8282
query_timeout: 20000 # query timeout in milliseconds, default 20000
8383
```
8484
85+
#### Sharded Service
86+
87+
The core query engine of Interactive is developed using [hiactor](https://github.com/alibaba/hiactor) which is based on [Seastar](https://github.com/scylladb/seastar). Seastar operates on a Share-nothing SMP architecture, where each core functions autonomously, without sharing memory, data structures, or CPU resources. Each Seastar core is commonly referred to as a shard.
88+
89+
Leveraging the future-promise API and a Cooperative micro-task scheduler, the sharded service significantly boosts performance and throughput. However, this setup can also lead to potential issues: an incoming request might experience delays even if some shards are idle, due to the shard scheduling algorithm potentially routing it to a busy shard. This can be problematic in Interactive, which typically hosts two services—`QueryService` and `AdminService`. Crucially, `AdminService` must remain responsive even when `QueryService` is under heavy load.
90+
91+
As discussed in [discussion-4409](https://github.com/alibaba/GraphScope/discussions/4409), one potential solution is to allocate different shards for handling distinct requests. This approach presents three scenarios:
92+
93+
- **Routine Scenario**: Here, users may execute both complex and simple queries, thus dedicating a shard exclusively for admin requests. However, since this shard won’t process queries, overall system performance may decline.
94+
95+
- **Performance-Critical Scenario**: In this scenario, users aim for peak performance from Interactive. All shards are used to process query requests, with admin requests being handled concurrently by them. Consequently, there may be instances of request delays.
96+
97+
By default, Interactive is configured for routine with the following:
98+
99+
```yaml
100+
http_service:
101+
sharding_mode: exclusive # In exclusive mode, a shard is exclusively reserved for admin requests. In cooperative mode, both query and admin requests can be processed by any shard.
102+
```
103+
104+
By changing to `sharding_mode: cooperative`, you can fully utilize all the computational power for the QueryService.
105+
85106

86107
##### Available Configurations
87108

@@ -99,6 +120,7 @@ In this following table, we use the `.` notation to represent the hierarchy with
99120
| compiler.planner.rules.FilterIntoJoinRule | N/A | A native Calcite optimization rule that pushes filter conditions to the Join participants before performing the join | 0.0.1 |
100121
| compiler.planner.rules.NotMatchToAntiJoinRule | N/A | An optimization rule that transforms a "not exist" pattern into an anti-join operation | 0.0.1 |
101122
| compiler.query_timeout | 3000000 | The maximum time for compiler to wait engine's reply, in `ms` | 0.0.3 |
123+
| http_service.sharding_mode | exclusive | The sharding mode for http service, In exclusive mode, one shard is reserved exclusively for service admin request. In cooperative, both query request and admin request could be served by any shard. | 0.5 |
102124

103125
#### TODOs
104126

flex/bin/rt_server.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ int main(int argc, char** argv) {
3838
"data directory path")(
3939
"warmup,w", bpo::value<bool>()->default_value(false),
4040
"warmup graph data")("memory-level,m",
41-
bpo::value<int>()->default_value(1));
41+
bpo::value<int>()->default_value(1))(
42+
"sharding-mode", bpo::value<std::string>()->default_value("cooperative"));
4243
google::InitGoogleLogging(argv[0]);
4344
FLAGS_logtostderr = true;
4445

@@ -99,6 +100,7 @@ int main(int argc, char** argv) {
99100
service_config.query_port = http_port;
100101
service_config.start_admin_service = false;
101102
service_config.start_compiler = false;
103+
service_config.set_sharding_mode(vm["sharding-mode"].as<std::string>());
102104
server::GraphDBService::get().init(service_config);
103105
server::GraphDBService::get().run_and_wait_for_exit();
104106

flex/engines/http_server/actor/admin_actor.act.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,14 @@ seastar::future<admin_query_result> admin_actor::run_create_graph(
335335
query_param&& query_param) {
336336
LOG(INFO) << "Creating Graph: " << query_param.content;
337337

338-
auto request = gs::CreateGraphMetaRequest::FromJson(query_param.content);
338+
gs::Result<std::string> preprocess_schema_str =
339+
gs::preprocess_and_check_schema_json_string(query_param.content);
340+
if (!preprocess_schema_str.ok()) {
341+
return seastar::make_ready_future<admin_query_result>(
342+
gs::Result<seastar::sstring>(preprocess_schema_str.status()));
343+
}
344+
auto request =
345+
gs::CreateGraphMetaRequest::FromJson(preprocess_schema_str.value());
339346
if (!request.ok()) {
340347
LOG(ERROR) << "Fail to parse graph meta: "
341348
<< request.status().error_message();

flex/engines/http_server/graph_db_service.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ ServiceConfig::ServiceConfig()
4747
enable_bolt(false),
4848
metadata_store_type_(gs::MetadataStoreType::kLocalFile),
4949
log_level(DEFAULT_LOG_LEVEL),
50-
verbose_level(DEFAULT_VERBOSE_LEVEL) {}
50+
verbose_level(DEFAULT_VERBOSE_LEVEL),
51+
sharding_mode(DEFAULT_SHARDING_MODE) {}
5152

5253
const std::string GraphDBService::DEFAULT_GRAPH_NAME = "modern_graph";
5354
const std::string GraphDBService::DEFAULT_INTERACTIVE_HOME = "/opt/flex/";
@@ -113,10 +114,14 @@ void GraphDBService::init(const ServiceConfig& config) {
113114
actor_sys_ = std::make_unique<actor_system>(
114115
config.shard_num, config.dpdk_mode, config.enable_thread_resource_pool,
115116
config.external_thread_num, [this]() { set_exit_state(); });
117+
// NOTE that in sharding mode EXCLUSIVE, the last shard is reserved for admin
118+
// requests.
116119
query_hdl_ = std::make_unique<graph_db_http_handler>(
117-
config.query_port, config.shard_num, config.enable_adhoc_handler);
120+
config.query_port, config.shard_num, config.get_cooperative_shard_num(),
121+
config.enable_adhoc_handler);
118122
if (config.start_admin_service) {
119-
admin_hdl_ = std::make_unique<admin_http_handler>(config.admin_port);
123+
admin_hdl_ = std::make_unique<admin_http_handler>(
124+
config.admin_port, config.get_exclusive_shard_id());
120125
}
121126

122127
initialized_.store(true);

flex/engines/http_server/graph_db_service.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ namespace server {
3636
/* Stored service configuration, read from interactive_config.yaml
3737
*/
3838
struct ServiceConfig {
39+
enum class ShardingMode { EXCLUSIVE, COOPERATIVE };
3940
static constexpr const uint32_t DEFAULT_SHARD_NUM = 1;
4041
static constexpr const uint32_t DEFAULT_QUERY_PORT = 10000;
4142
static constexpr const uint32_t DEFAULT_ADMIN_PORT = 7777;
@@ -44,6 +45,8 @@ struct ServiceConfig {
4445
static constexpr const uint32_t DEFAULT_VERBOSE_LEVEL = 0;
4546
static constexpr const uint32_t DEFAULT_LOG_LEVEL =
4647
0; // 0 = INFO, 1 = WARNING, 2 = ERROR, 3 = FATAL
48+
static constexpr const ShardingMode DEFAULT_SHARDING_MODE =
49+
ShardingMode::EXCLUSIVE;
4750

4851
// Those has default value
4952
uint32_t bolt_port;
@@ -67,12 +70,40 @@ struct ServiceConfig {
6770
// If we found GLOG_v in the environment, we will at the first place.
6871
int log_level;
6972
int verbose_level;
73+
ShardingMode sharding_mode; // exclusive or cooperative. With exclusive mode,
74+
// we will reserve one shard for only processing
75+
// admin requests, and the other shards for
76+
// processing query requests. With cooperative
77+
// mode, all shards will process both admin and
78+
// query requests. With only one shard available,
79+
// the sharding mode must be cooperative.
7080

7181
// Those has not default value
7282
std::string default_graph;
7383
std::string engine_config_path; // used for codegen.
7484

7585
ServiceConfig();
86+
87+
void set_sharding_mode(const std::string& mode) {
88+
VLOG(10) << "Set sharding mode: " << mode;
89+
if (mode == "exclusive") {
90+
sharding_mode = ShardingMode::EXCLUSIVE;
91+
} else if (mode == "cooperative") {
92+
sharding_mode = ShardingMode::COOPERATIVE;
93+
} else {
94+
LOG(FATAL) << "Invalid sharding mode: " << mode;
95+
}
96+
}
97+
98+
int32_t get_exclusive_shard_id() const {
99+
return sharding_mode == ShardingMode::EXCLUSIVE ? shard_num - 1 : -1;
100+
}
101+
102+
int32_t get_cooperative_shard_num() const {
103+
return sharding_mode == ShardingMode::EXCLUSIVE
104+
? std::max((int32_t) shard_num - 1, 1)
105+
: shard_num; // shard_num >= 1
106+
}
76107
};
77108

78109
class GraphDBService {
@@ -241,6 +272,20 @@ struct convert<server::ServiceConfig> {
241272
LOG(INFO) << "admin_port not found, use default value "
242273
<< service_config.admin_port;
243274
}
275+
if (http_service_node["sharding_mode"]) {
276+
auto sharding_mode =
277+
http_service_node["sharding_mode"].as<std::string>();
278+
if (sharding_mode != "exclusive" && sharding_mode != "cooperative") {
279+
LOG(ERROR) << "Unsupported sharding mode: " << sharding_mode;
280+
return false;
281+
}
282+
if (sharding_mode == "exclusive" && service_config.shard_num == 1) {
283+
LOG(ERROR) << "exclusive sharding mode requires at least 2 shards";
284+
return false;
285+
}
286+
service_config.set_sharding_mode(sharding_mode);
287+
VLOG(1) << "sharding_mode: " << sharding_mode;
288+
}
244289
} else {
245290
LOG(ERROR) << "Fail to find http_service configuration";
246291
return false;

0 commit comments

Comments
 (0)