Skip to content

Commit e964b99

Browse files
shirly121zhanglei1949liulx20
authored
refactor(interactive): Support ScanEarlyStopRule for Query Optimization (#4431)
<!-- Thanks for your contribution! please review https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before opening an issue. --> ## What do these changes do? Leverage 3 rules for `Scan Early Stop` Optimization: 1. ScanEarlyStopRule <img width="939" alt="image" src="https://github.com/user-attachments/assets/f85098b2-c9e9-4c93-ac51-5dec59484bee" /> 2. ScanExpandFusionRule <img width="1133" alt="image" src="https://github.com/user-attachments/assets/0366e6df-d113-4379-afb4-ce0359534192" /> 3. TopKPushDownRule <img width="1212" alt="image" src="https://github.com/user-attachments/assets/5de53d9d-e8ca-4868-9857-d3ed7dc40b3d" /> <!-- Please give a short brief about these changes. --> ## Related issue number <!-- Are there any issues opened that will be resolved by merging this change? --> Fixes #4356 --------- Co-authored-by: xiaolei.zl <[email protected]> Co-authored-by: liulx20 <[email protected]>
1 parent 17ceda1 commit e964b99

File tree

20 files changed

+968
-70
lines changed

20 files changed

+968
-70
lines changed

docs/interactive_engine/gopt.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,4 +349,21 @@ Design of GOpt
349349
:::
350350
351351
### Detailed Introduction
352-
A comprehensive introduction to GOpt will be provided in subsequent sections. Please stay tuned for detailed updates and information.
352+
353+
#### Rules
354+
355+
Rules play a pivotal role in GOpt’s optimization framework, enabling efficient and effective query transformations. Below is an outline of some key rules implemented in GOpt:
356+
357+
**ScanEarlyStopRule**: Pushes the limit operation down to the scan node. During the scan process, the scan stops as soon as the specified limit count is reached.
358+
359+
**ScanExpandFusionRule**: This rule transforms edge expansion into edge scan wherever possible. For example, consider the following Cypher query:
360+
```cypher
361+
Match (a:PERSON)-[b:KNOWS]->(c:PERSON) Return b.name;
362+
```
363+
Although the query involves Scan and GetV steps, their results are not directly utilized by subsequent project operations. The only effectively used data is the edge data produced by the Expand operation. In such cases, we can perform a fusion operation, transforming the pattern
364+
`(a:PERSON)-[b:KNOWS]->(c:PERSON)` into a scan operation on the KNOWS edge. It is important to note that whether fusion is feasible also depends on the label dependencies between nodes and edges. If the edge label is determined strictly by the triplet (src_label, edge_label, dst_label), fusion cannot be performed. For example, consider the following query:
365+
```cypher
366+
Match (a:PERSON)-[b:LIKES]->(c:COMMENT) Return b.name;
367+
```
368+
369+
**TopKPushDownRule**: This rule pushes down topK operations to the project node and is based on Calcite's [SortProjectTransposeRule](https://calcite.apache.org/javadocAggregate/org/apache/calcite/rel/rules/SortProjectTransposeRule.html), leveraging the original rule wherever possible. However, in our more complex distributed scenario, deferring the execution of the project node can disrupt already sorted data. To address this, we modified the matching logic in `SortProjectTransposeRule`. Currently, the PushDown operation is applied only when the sort fields are empty, which means only the limit is pushed down to the project node.

flex/engines/graph_db/runtime/common/operators/retrieve/scan.cc

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ namespace gs {
1919
namespace runtime {
2020

2121
Context Scan::find_vertex_with_oid(const GraphReadInterface& graph,
22-
label_t label, const Any& oid, int alias) {
22+
label_t label, const Any& oid,
23+
int32_t alias) {
2324
SLVertexColumnBuilder builder(label);
2425
vid_t vid;
2526
if (graph.GetVertexIndex(label, oid, vid)) {
@@ -31,7 +32,7 @@ Context Scan::find_vertex_with_oid(const GraphReadInterface& graph,
3132
}
3233

3334
Context Scan::find_vertex_with_gid(const GraphReadInterface& graph,
34-
label_t label, int64_t gid, int alias) {
35+
label_t label, int64_t gid, int32_t alias) {
3536
SLVertexColumnBuilder builder(label);
3637
if (GlobalId::get_label_id(gid) == label) {
3738
builder.push_back_opt(GlobalId::get_vid(gid));
@@ -44,42 +45,6 @@ Context Scan::find_vertex_with_gid(const GraphReadInterface& graph,
4445
return ctx;
4546
}
4647

47-
Context Scan::find_vertex_with_id(const GraphReadInterface& graph,
48-
label_t label, const Any& pk, int alias,
49-
bool scan_oid) {
50-
if (scan_oid) {
51-
SLVertexColumnBuilder builder(label);
52-
vid_t vid;
53-
if (graph.GetVertexIndex(label, pk, vid)) {
54-
builder.push_back_opt(vid);
55-
}
56-
Context ctx;
57-
ctx.set(alias, builder.finish());
58-
return ctx;
59-
} else {
60-
SLVertexColumnBuilder builder(label);
61-
vid_t vid{};
62-
int64_t gid{};
63-
if (pk.type == PropertyType::kInt64) {
64-
gid = pk.AsInt64();
65-
} else if (pk.type == PropertyType::kInt32) {
66-
gid = pk.AsInt32();
67-
} else {
68-
LOG(FATAL) << "Unsupported primary key type";
69-
}
70-
if (GlobalId::get_label_id(gid) == label) {
71-
vid = GlobalId::get_vid(gid);
72-
} else {
73-
LOG(ERROR) << "Global id " << gid << " does not match label " << label;
74-
return Context();
75-
}
76-
builder.push_back_opt(vid);
77-
Context ctx;
78-
ctx.set(alias, builder.finish());
79-
return ctx;
80-
}
81-
}
82-
8348
template <typename T>
8449
static Context _scan_vertex_with_special_vertex_predicate(
8550
const GraphReadInterface& graph, const ScanParams& params,

flex/engines/graph_db/runtime/common/operators/retrieve/scan.h

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ namespace runtime {
2626
struct ScanParams {
2727
int alias;
2828
std::vector<label_t> tables;
29+
int32_t limit;
30+
31+
ScanParams() : alias(-1), limit(std::numeric_limits<int32_t>::max()) {}
2932
};
3033
class Scan {
3134
public:
@@ -61,6 +64,50 @@ class Scan {
6164
return ctx;
6265
}
6366

67+
template <typename PRED_T>
68+
static Context scan_vertex_with_limit(const GraphReadInterface& graph,
69+
const ScanParams& params,
70+
const PRED_T& predicate) {
71+
Context ctx;
72+
int32_t cur_limit = params.limit;
73+
if (params.tables.size() == 1) {
74+
label_t label = params.tables[0];
75+
SLVertexColumnBuilder builder(label);
76+
auto vertices = graph.GetVertexSet(label);
77+
for (auto vid : vertices) {
78+
if (cur_limit <= 0) {
79+
break;
80+
}
81+
if (predicate(label, vid)) {
82+
builder.push_back_opt(vid);
83+
cur_limit--;
84+
}
85+
}
86+
ctx.set(params.alias, builder.finish());
87+
} else if (params.tables.size() > 1) {
88+
MSVertexColumnBuilder builder;
89+
90+
for (auto label : params.tables) {
91+
if (cur_limit <= 0) {
92+
break;
93+
}
94+
auto vertices = graph.GetVertexSet(label);
95+
builder.start_label(label);
96+
for (auto vid : vertices) {
97+
if (cur_limit <= 0) {
98+
break;
99+
}
100+
if (predicate(label, vid)) {
101+
builder.push_back_opt(vid);
102+
cur_limit--;
103+
}
104+
}
105+
}
106+
ctx.set(params.alias, builder.finish());
107+
}
108+
return ctx;
109+
}
110+
64111
static Context scan_vertex_with_special_vertex_predicate(
65112
const GraphReadInterface& graph, const ScanParams& params,
66113
const SPVertexPredicate& pred);
@@ -70,24 +117,36 @@ class Scan {
70117
const ScanParams& params, const PRED_T& predicate,
71118
const std::vector<int64_t>& gids) {
72119
Context ctx;
120+
int32_t cur_limit = params.limit;
73121
if (params.tables.size() == 1) {
74122
label_t label = params.tables[0];
75123
SLVertexColumnBuilder builder(label);
76124
for (auto gid : gids) {
125+
if (cur_limit <= 0) {
126+
break;
127+
}
77128
vid_t vid = GlobalId::get_vid(gid);
78129
if (GlobalId::get_label_id(gid) == label && predicate(label, vid)) {
79130
builder.push_back_opt(vid);
131+
cur_limit--;
80132
}
81133
}
82134
ctx.set(params.alias, builder.finish());
83135
} else if (params.tables.size() > 1) {
84136
MLVertexColumnBuilder builder;
85137

86138
for (auto label : params.tables) {
139+
if (cur_limit <= 0) {
140+
break;
141+
}
87142
for (auto gid : gids) {
143+
if (cur_limit <= 0) {
144+
break;
145+
}
88146
vid_t vid = GlobalId::get_vid(gid);
89147
if (GlobalId::get_label_id(gid) == label && predicate(label, vid)) {
90148
builder.push_back_vertex({label, vid});
149+
cur_limit--;
91150
}
92151
}
93152
}
@@ -105,14 +164,19 @@ class Scan {
105164
const ScanParams& params, const PRED_T& predicate,
106165
const std::vector<Any>& oids) {
107166
Context ctx;
167+
auto limit = params.limit;
108168
if (params.tables.size() == 1) {
109169
label_t label = params.tables[0];
110170
SLVertexColumnBuilder builder(label);
111171
for (auto oid : oids) {
172+
if (limit <= 0) {
173+
break;
174+
}
112175
vid_t vid;
113176
if (graph.GetVertexIndex(label, oid, vid)) {
114177
if (predicate(label, vid)) {
115178
builder.push_back_opt(vid);
179+
--limit;
116180
}
117181
}
118182
}
@@ -121,11 +185,18 @@ class Scan {
121185
std::vector<std::pair<label_t, vid_t>> vids;
122186

123187
for (auto label : params.tables) {
188+
if (limit <= 0) {
189+
break;
190+
}
124191
for (auto oid : oids) {
192+
if (limit <= 0) {
193+
break;
194+
}
125195
vid_t vid;
126196
if (graph.GetVertexIndex(label, oid, vid)) {
127197
if (predicate(label, vid)) {
128198
vids.emplace_back(label, vid);
199+
--limit;
129200
}
130201
}
131202
}
@@ -149,15 +220,12 @@ class Scan {
149220
const GraphReadInterface& graph, const ScanParams& params,
150221
const SPVertexPredicate& predicate, const std::vector<Any>& oids);
151222

152-
static Context find_vertex_with_id(const GraphReadInterface& graph,
153-
label_t label, const Any& pk, int alias,
154-
bool scan_oid);
155-
156223
static Context find_vertex_with_oid(const GraphReadInterface& graph,
157-
label_t label, const Any& pk, int alias);
224+
label_t label, const Any& pk,
225+
int32_t alias);
158226

159227
static Context find_vertex_with_gid(const GraphReadInterface& graph,
160-
label_t label, int64_t pk, int alias);
228+
label_t label, int64_t pk, int32_t alias);
161229
};
162230

163231
} // namespace runtime

flex/engines/graph_db/runtime/execute/ops/retrieve/scan.cc

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -495,17 +495,31 @@ class ScanWithGPredOpr : public IReadOperator {
495495
auto expr =
496496
parse_expression(graph, tmp, params, pred_, VarType::kVertexVar);
497497
if (expr->is_optional()) {
498-
auto ret = Scan::scan_vertex(
499-
graph, scan_params_, [&expr](label_t label, vid_t vid) {
500-
return expr->eval_vertex(label, vid, 0, 0).as_bool();
501-
});
502-
return ret;
498+
if (scan_params_.limit == std::numeric_limits<int32_t>::max()) {
499+
return Scan::scan_vertex(
500+
graph, scan_params_, [&expr](label_t label, vid_t vid) {
501+
return expr->eval_vertex(label, vid, 0, 0).as_bool();
502+
});
503+
} else {
504+
return Scan::scan_vertex_with_limit(
505+
graph, scan_params_, [&expr](label_t label, vid_t vid) {
506+
return expr->eval_vertex(label, vid, 0, 0).as_bool();
507+
});
508+
}
503509
} else {
504-
auto ret = Scan::scan_vertex(
505-
graph, scan_params_, [&expr](label_t label, vid_t vid) {
506-
return expr->eval_vertex(label, vid, 0).as_bool();
507-
});
508-
return ret;
510+
if (scan_params_.limit == std::numeric_limits<int32_t>::max()) {
511+
auto ret = Scan::scan_vertex(
512+
graph, scan_params_, [&expr](label_t label, vid_t vid) {
513+
return expr->eval_vertex(label, vid, 0).as_bool();
514+
});
515+
return ret;
516+
} else {
517+
auto ret = Scan::scan_vertex_with_limit(
518+
graph, scan_params_, [&expr](label_t label, vid_t vid) {
519+
return expr->eval_vertex(label, vid, 0).as_bool();
520+
});
521+
return ret;
522+
}
509523
}
510524
}
511525

@@ -523,8 +537,13 @@ class ScanWithoutPredOpr : public IReadOperator {
523537
const std::map<std::string, std::string>& params,
524538
gs::runtime::Context&& ctx,
525539
gs::runtime::OprTimer& timer) override {
526-
return Scan::scan_vertex(graph, scan_params_,
527-
[](label_t, vid_t) { return true; });
540+
if (scan_params_.limit == std::numeric_limits<int32_t>::max()) {
541+
return Scan::scan_vertex(graph, scan_params_,
542+
[](label_t, vid_t) { return true; });
543+
} else {
544+
return Scan::scan_vertex_with_limit(graph, scan_params_,
545+
[](label_t, vid_t) { return true; });
546+
}
528547
}
529548

530549
private:
@@ -566,6 +585,17 @@ std::pair<std::unique_ptr<IReadOperator>, ContextMeta> ScanOprBuilder::Build(
566585

567586
ScanParams scan_params;
568587
scan_params.alias = scan_opr.has_alias() ? scan_opr.alias().value() : -1;
588+
scan_params.limit = std::numeric_limits<int32_t>::max();
589+
if (scan_opr.params().has_limit()) {
590+
auto& limit_range = scan_opr.params().limit();
591+
if (limit_range.lower() != 0) {
592+
LOG(FATAL) << "Scan with lower limit expect 0, but got "
593+
<< limit_range.lower();
594+
}
595+
if (limit_range.upper() > 0) {
596+
scan_params.limit = limit_range.upper();
597+
}
598+
}
569599
for (auto& table : scan_opr.params().tables()) {
570600
// bug here, exclude invalid vertex label id
571601
if (schema.vertex_label_num() <= table.id()) {

flex/interactive/sdk/python/gs_interactive/tests/test_robustness.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,40 @@ def test_call_proc_in_cypher(interactive_session, neo4j_session, create_modern_g
310310
assert cnt == 8
311311

312312

313+
@pytest.mark.skipif(
314+
os.environ.get("RUN_ON_PROTO", None) != "ON",
315+
reason="Scan+Limit fuse only works on proto",
316+
)
317+
def test_scan_limit_fuse(interactive_session, neo4j_session, create_modern_graph):
318+
print("[Test call procedure in cypher]")
319+
import_data_to_full_modern_graph(interactive_session, create_modern_graph)
320+
start_service_on_graph(interactive_session, create_modern_graph)
321+
ensure_compiler_schema_ready(
322+
interactive_session, neo4j_session, create_modern_graph
323+
)
324+
result = neo4j_session.run(
325+
'MATCH(p: person) with p.id as oid CALL k_neighbors("person", oid, 1) return label_name, vertex_oid;'
326+
)
327+
cnt = 0
328+
for record in result:
329+
cnt += 1
330+
assert cnt == 8
331+
332+
# Q: Why we could use this query to verify whether Scan+Limit fuse works?
333+
# A: If Scan+Limit fuse works, the result of this query should be 2, otherwise it should be 6
334+
result = neo4j_session.run("MATCH(n) return n.id limit 2")
335+
cnt = 0
336+
for record in result:
337+
cnt += 1
338+
assert cnt == 2
339+
340+
result = neo4j_session.run("MATCH(n) return n.id limit 0")
341+
cnt = 0
342+
for record in result:
343+
cnt += 1
344+
assert cnt == 0
345+
346+
313347
def test_custom_pk_name(
314348
interactive_session, neo4j_session, create_graph_with_custom_pk_name
315349
):

flex/tests/hqps/interactive_config_test_cbo.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ compiler:
2020
- NotMatchToAntiJoinRule
2121
- ExtendIntersectRule
2222
- ExpandGetVFusionRule
23+
- ScanExpandFusionRule
24+
- TopKPushDownRule
25+
- ScanEarlyStopRule # This rule must be placed after TopKPushDownRule and ScanExpandFusionRule
2326
meta:
2427
reader:
2528
schema:

0 commit comments

Comments
 (0)