Skip to content

Commit 3b212c2

Browse files
fix(interactive): Support sinking paths in Interactive (#4541)
Fix #4540 --------- Co-authored-by: liulx20 <[email protected]>
1 parent d7929f6 commit 3b212c2

File tree

6 files changed

+151
-64
lines changed

6 files changed

+151
-64
lines changed

flex/engines/graph_db/runtime/common/operators/retrieve/path_expand.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ bl::result<Context> PathExpand::edge_expand_p(const GraphReadInterface& graph,
203203
label_triplet.edge_label);
204204
while (oe_iter.IsValid()) {
205205
std::unique_ptr<PathImpl> new_path = path->expand(
206-
label_triplet.dst_label, oe_iter.GetNeighbor());
206+
label_triplet.edge_label, label_triplet.dst_label,
207+
oe_iter.GetNeighbor());
207208
output.emplace_back(std::move(new_path), index);
208209
oe_iter.Next();
209210
}
@@ -250,7 +251,8 @@ bl::result<Context> PathExpand::edge_expand_p(const GraphReadInterface& graph,
250251
label_triplet.edge_label);
251252
while (ie_iter.IsValid()) {
252253
std::unique_ptr<PathImpl> new_path = path->expand(
253-
label_triplet.src_label, ie_iter.GetNeighbor());
254+
label_triplet.edge_label, label_triplet.src_label,
255+
ie_iter.GetNeighbor());
254256
output.emplace_back(std::move(new_path), index);
255257
ie_iter.Next();
256258
}
@@ -296,7 +298,8 @@ bl::result<Context> PathExpand::edge_expand_p(const GraphReadInterface& graph,
296298
label_triplet.dst_label,
297299
label_triplet.edge_label);
298300
while (oe_iter.IsValid()) {
299-
auto new_path = path->expand(label_triplet.dst_label,
301+
auto new_path = path->expand(label_triplet.edge_label,
302+
label_triplet.dst_label,
300303
oe_iter.GetNeighbor());
301304
output.emplace_back(std::move(new_path), index);
302305
oe_iter.Next();
@@ -307,7 +310,8 @@ bl::result<Context> PathExpand::edge_expand_p(const GraphReadInterface& graph,
307310
label_triplet.src_label,
308311
label_triplet.edge_label);
309312
while (ie_iter.IsValid()) {
310-
auto new_path = path->expand(label_triplet.src_label,
313+
auto new_path = path->expand(label_triplet.edge_label,
314+
label_triplet.src_label,
311315
ie_iter.GetNeighbor());
312316
output.emplace_back(std::move(new_path), index);
313317
ie_iter.Next();
@@ -503,7 +507,8 @@ bl::result<Context> PathExpand::single_source_single_dest_shortest_path(
503507
dest.second, path)) {
504508
builder.push_back_opt(dest.second);
505509
shuffle_offset.push_back(index);
506-
auto impl = PathImpl::make_path_impl(label_triplet.src_label, path);
510+
auto impl = PathImpl::make_path_impl(label_triplet.src_label,
511+
label_triplet.edge_label, path);
507512
path_builder.push_back_opt(Path(impl.get()));
508513
arena->emplace_back(std::move(impl));
509514
}
@@ -752,7 +757,8 @@ bl::result<Context> PathExpand::all_shortest_paths_with_given_source_and_dest(
752757
all_shortest_path_with_given_source_and_dest_impl(graph, params, v,
753758
dest.second, paths);
754759
for (auto& path : paths) {
755-
auto ptr = PathImpl::make_path_impl(label_triplet.src_label, path);
760+
auto ptr = PathImpl::make_path_impl(label_triplet.src_label,
761+
label_triplet.edge_label, path);
756762
builder.push_back_opt(dest.second);
757763
path_builder.push_back_opt(Path(ptr.get()));
758764
arena->emplace_back(std::move(ptr));

flex/engines/graph_db/runtime/common/operators/retrieve/path_expand_impl.h

Lines changed: 70 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ path_expand_vertex_without_predicate_impl(
177177

178178
template <typename EDATA_T, typename PRED_T>
179179
void sssp_dir(const GraphReadInterface::graph_view_t<EDATA_T>& view,
180-
label_t v_label, vid_t v,
180+
label_t v_label, vid_t v, label_t e_label,
181181
const GraphReadInterface::vertex_set_t& vertices, size_t idx,
182182
int lower, int upper, SLVertexColumnBuilder& dest_col_builder,
183183
GeneralPathColumnBuilder& path_col_builder, Arena& path_impls,
@@ -202,7 +202,7 @@ void sssp_dir(const GraphReadInterface::graph_view_t<EDATA_T>& view,
202202
}
203203

204204
dest_col_builder.push_back_opt(u);
205-
auto impl = PathImpl::make_path_impl(v_label, path);
205+
auto impl = PathImpl::make_path_impl(v_label, e_label, path);
206206
path_col_builder.push_back_opt(Path(impl.get()));
207207
path_impls.emplace_back(std::move(impl));
208208
offsets.push_back(idx);
@@ -219,7 +219,7 @@ void sssp_dir(const GraphReadInterface::graph_view_t<EDATA_T>& view,
219219
}
220220

221221
dest_col_builder.push_back_opt(u);
222-
auto impl = PathImpl::make_path_impl(v_label, path);
222+
auto impl = PathImpl::make_path_impl(v_label, e_label, path);
223223
path_col_builder.push_back_opt(Path(impl.get()));
224224
path_impls.emplace_back(std::move(impl));
225225
offsets.push_back(idx);
@@ -253,7 +253,7 @@ void sssp_dir(const GraphReadInterface::graph_view_t<EDATA_T>& view,
253253
template <typename EDATA_T, typename PRED_T>
254254
void sssp_both_dir(const GraphReadInterface::graph_view_t<EDATA_T>& view0,
255255
const GraphReadInterface::graph_view_t<EDATA_T>& view1,
256-
label_t v_label, vid_t v,
256+
label_t v_label, vid_t v, label_t e_label,
257257
const GraphReadInterface::vertex_set_t& vertices, size_t idx,
258258
int lower, int upper,
259259
SLVertexColumnBuilder& dest_col_builder,
@@ -280,7 +280,7 @@ void sssp_both_dir(const GraphReadInterface::graph_view_t<EDATA_T>& view0,
280280
}
281281

282282
dest_col_builder.push_back_opt(u);
283-
auto impl = PathImpl::make_path_impl(v_label, path);
283+
auto impl = PathImpl::make_path_impl(v_label, e_label, path);
284284
path_col_builder.push_back_opt(Path(impl.get()));
285285
path_impls.emplace_back(std::move(impl));
286286
offsets.push_back(idx);
@@ -297,7 +297,7 @@ void sssp_both_dir(const GraphReadInterface::graph_view_t<EDATA_T>& view0,
297297
}
298298

299299
dest_col_builder.push_back_opt(u);
300-
auto impl = PathImpl::make_path_impl(v_label, path);
300+
auto impl = PathImpl::make_path_impl(v_label, e_label, path);
301301
path_col_builder.push_back_opt(Path(impl.get()));
302302
path_impls.emplace_back(std::move(impl));
303303
offsets.push_back(idx);
@@ -467,8 +467,8 @@ single_source_shortest_path_impl(const GraphReadInterface& graph,
467467
? graph.GetIncomingGraphView<EDATA_T>(v_label, v_label, e_label)
468468
: graph.GetOutgoingGraphView<EDATA_T>(v_label, v_label, e_label);
469469
foreach_vertex(input, [&](size_t idx, label_t label, vid_t v) {
470-
sssp_dir(view, label, v, vertices, idx, lower, upper, dest_col_builder,
471-
path_col_builder, *path_impls, offsets, pred);
470+
sssp_dir(view, label, v, e_label, vertices, idx, lower, upper,
471+
dest_col_builder, path_col_builder, *path_impls, offsets, pred);
472472
});
473473
} else {
474474
CHECK(dir == Direction::kBoth);
@@ -477,9 +477,9 @@ single_source_shortest_path_impl(const GraphReadInterface& graph,
477477
auto ie_view =
478478
graph.GetIncomingGraphView<EDATA_T>(v_label, v_label, e_label);
479479
foreach_vertex(input, [&](size_t idx, label_t label, vid_t v) {
480-
sssp_both_dir(oe_view, ie_view, v_label, v, vertices, idx, lower, upper,
481-
dest_col_builder, path_col_builder, *path_impls, offsets,
482-
pred);
480+
sssp_both_dir(oe_view, ie_view, v_label, v, e_label, vertices, idx, lower,
481+
upper, dest_col_builder, path_col_builder, *path_impls,
482+
offsets, pred);
483483
});
484484
}
485485
return std::make_tuple(dest_col_builder.finish(nullptr),
@@ -529,44 +529,55 @@ default_single_source_shortest_path_impl(
529529
SLVertexColumnBuilder::builder(*dest_labels.begin());
530530

531531
foreach_vertex(input, [&](size_t idx, label_t label, vid_t v) {
532-
std::vector<std::pair<label_t, vid_t>> cur;
533-
std::vector<std::pair<label_t, vid_t>> next;
534-
cur.emplace_back(label, v);
535-
std::map<std::pair<label_t, vid_t>, std::pair<label_t, vid_t>> parent;
532+
std::vector<std::tuple<label_t, label_t, vid_t>> cur;
533+
std::vector<std::tuple<label_t, label_t, vid_t>> next;
534+
cur.emplace_back(std::numeric_limits<label_t>::max(), label, v);
535+
std::map<std::tuple<label_t, label_t, vid_t>,
536+
std::tuple<label_t, label_t, vid_t>>
537+
parent;
538+
std::set<std::pair<label_t, vid_t>> visited;
539+
visited.insert(std::make_pair(label, v));
536540
int depth = 0;
537541
while (depth < upper && !cur.empty()) {
538-
for (auto u : cur) {
539-
if (depth >= lower && pred(u.first, u.second)) {
542+
for (auto [edge_label, v_label, vid] : cur) {
543+
if (depth >= lower && pred(v_label, vid)) {
540544
std::vector<VertexRecord> path;
541-
auto x = u;
542-
while (!(x.first == label && x.second == v)) {
543-
path.emplace_back(VertexRecord{x.first, x.second});
545+
std::vector<label_t> edge_labels;
546+
auto x = std::tie(edge_label, label, vid);
547+
while (!(v_label == label && vid == v)) {
548+
path.push_back(VertexRecord{std::get<1>(x), std::get<2>(x)});
549+
edge_labels.push_back(std::get<0>(x));
544550
x = parent[x];
545551
}
546552
path.emplace_back(VertexRecord{label, v});
553+
std::reverse(edge_labels.begin(), edge_labels.end());
547554
std::reverse(path.begin(), path.end());
548555

549556
if (path.size() > 1) {
550-
auto impl = PathImpl::make_path_impl(std::move(path));
557+
auto impl = PathImpl::make_path_impl(edge_labels, path);
558+
551559
path_col_builder.push_back_opt(Path(impl.get()));
552560
path_impls->emplace_back(std::move(impl));
553561

554-
dest_col_builder.push_back_opt(u.second);
562+
dest_col_builder.push_back_opt(vid);
555563
offsets.push_back(idx);
556564
}
557565
}
558566

559-
for (auto& l : labels_map[u.first]) {
567+
for (auto& l : labels_map[v_label]) {
560568
label_t nbr_label = std::get<0>(l);
561569
auto iter = (std::get<2>(l) == Direction::kOut)
562-
? graph.GetOutEdgeIterator(
563-
u.first, u.second, nbr_label, std::get<1>(l))
564-
: graph.GetInEdgeIterator(
565-
u.first, u.second, nbr_label, std::get<1>(l));
570+
? graph.GetOutEdgeIterator(v_label, vid, nbr_label,
571+
std::get<1>(l))
572+
: graph.GetInEdgeIterator(v_label, vid, nbr_label,
573+
std::get<1>(l));
566574
while (iter.IsValid()) {
567-
auto nbr = std::make_pair(nbr_label, iter.GetNeighbor());
568-
if (parent.find(nbr) == parent.end()) {
569-
parent[nbr] = u;
575+
auto nbr = std::make_tuple(std::get<1>(l), nbr_label,
576+
iter.GetNeighbor());
577+
auto vertex = std::make_pair(nbr_label, iter.GetNeighbor());
578+
if (visited.find(vertex) == visited.end()) {
579+
visited.insert(vertex);
580+
parent[nbr] = std::tie(edge_label, v_label, vid);
570581
next.push_back(nbr);
571582
}
572583
iter.Next();
@@ -585,45 +596,54 @@ default_single_source_shortest_path_impl(
585596
auto dest_col_builder = MLVertexColumnBuilder::builder();
586597

587598
foreach_vertex(input, [&](size_t idx, label_t label, vid_t v) {
588-
std::vector<std::pair<label_t, vid_t>> cur;
589-
std::vector<std::pair<label_t, vid_t>> next;
590-
cur.emplace_back(label, v);
591-
std::map<std::pair<label_t, vid_t>, std::pair<label_t, vid_t>> parent;
599+
std::vector<std::tuple<label_t, label_t, vid_t>> cur;
600+
std::vector<std::tuple<label_t, label_t, vid_t>> next;
601+
cur.emplace_back(std::numeric_limits<label_t>::max(), label, v);
602+
std::map<std::tuple<label_t, label_t, vid_t>,
603+
std::tuple<label_t, label_t, vid_t>>
604+
parent;
605+
std::set<std::pair<label_t, vid_t>> visited;
606+
visited.insert(std::make_pair(label, v));
592607
int depth = 0;
593608
while (depth < upper && !cur.empty()) {
594-
for (auto u : cur) {
595-
if (depth >= lower && pred(u.first, u.second)) {
609+
for (auto [edge_label, v_label, vid] : cur) {
610+
if (depth >= lower && pred(v_label, vid)) {
596611
std::vector<VertexRecord> path;
597-
auto x = u;
598-
while (!(x.first == label && x.second == v)) {
599-
path.emplace_back(VertexRecord{x.first, x.second});
612+
std::vector<label_t> edge_labels;
613+
auto x = std::tie(edge_label, v_label, vid);
614+
while (!(v_label == label && vid == v)) {
615+
path.push_back(VertexRecord{std::get<1>(x), std::get<2>(x)});
616+
edge_labels.push_back(std::get<0>(x));
600617
x = parent[x];
601618
}
602619
path.emplace_back(VertexRecord{label, v});
620+
std::reverse(edge_labels.begin(), edge_labels.end());
603621
std::reverse(path.begin(), path.end());
604622

605623
if (path.size() > 1) {
606-
auto impl = PathImpl::make_path_impl(std::move(path));
607-
624+
auto impl = PathImpl::make_path_impl(edge_labels, path);
608625
path_col_builder.push_back_opt(Path(impl.get()));
609626
path_impls->emplace_back(std::move(impl));
610627

611-
dest_col_builder.push_back_vertex({u.first, u.second});
628+
dest_col_builder.push_back_vertex({v_label, vid});
612629
offsets.push_back(idx);
613630
}
614631
}
615632

616-
for (auto& l : labels_map[u.first]) {
633+
for (auto& l : labels_map[v_label]) {
617634
label_t nbr_label = std::get<0>(l);
618635
auto iter = (std::get<2>(l) == Direction::kOut)
619-
? graph.GetOutEdgeIterator(
620-
u.first, u.second, nbr_label, std::get<1>(l))
621-
: graph.GetInEdgeIterator(
622-
u.first, u.second, nbr_label, std::get<1>(l));
636+
? graph.GetOutEdgeIterator(v_label, vid, nbr_label,
637+
std::get<1>(l))
638+
: graph.GetInEdgeIterator(v_label, vid, nbr_label,
639+
std::get<1>(l));
623640
while (iter.IsValid()) {
624-
auto nbr = std::make_pair(nbr_label, iter.GetNeighbor());
625-
if (parent.find(nbr) == parent.end()) {
626-
parent[nbr] = u;
641+
auto nbr = std::make_tuple(std::get<1>(l), nbr_label,
642+
iter.GetNeighbor());
643+
auto vertex = std::make_pair(nbr_label, iter.GetNeighbor());
644+
if (visited.find(vertex) == visited.end()) {
645+
visited.insert(vertex);
646+
parent[nbr] = std::tie(edge_label, v_label, vid);
627647
next.push_back(nbr);
628648
}
629649
iter.Next();

flex/engines/graph_db/runtime/common/rt_any.cc

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -846,8 +846,43 @@ void RTAny::sink(const GraphReadInterface& graph, int id,
846846
}
847847
}
848848
} else if (type_ == RTAnyType::kPath) {
849-
LOG(FATAL) << "not support path sink";
850-
849+
auto mutable_path =
850+
col->mutable_entry()->mutable_element()->mutable_graph_path();
851+
auto path_nodes = this->as_path().nodes();
852+
auto edge_labels = this->as_path().edge_labels();
853+
// same label for all edges
854+
if (edge_labels.size() == 1) {
855+
for (size_t i = 0; i + 2 < path_nodes.size(); ++i) {
856+
edge_labels.emplace_back(edge_labels[0]);
857+
}
858+
}
859+
assert(edge_labels.size() + 1 == path_nodes.size());
860+
size_t len = path_nodes.size();
861+
for (size_t i = 0; i + 1 < len; ++i) {
862+
auto vertex_in_path = mutable_path->add_path();
863+
864+
auto node = vertex_in_path->mutable_vertex();
865+
node->mutable_label()->set_id(path_nodes[i].label());
866+
node->set_id(
867+
encode_unique_vertex_id(path_nodes[i].label(), path_nodes[i].vid()));
868+
auto edge_in_path = mutable_path->add_path();
869+
870+
auto edge = edge_in_path->mutable_edge();
871+
edge->mutable_src_label()->set_id(path_nodes[i].label());
872+
edge->mutable_dst_label()->set_id(path_nodes[i + 1].label());
873+
edge->mutable_label()->set_id(edge_labels[i]);
874+
edge->set_id(encode_unique_edge_id(edge_labels[i], path_nodes[i].vid(),
875+
path_nodes[i + 1].vid()));
876+
edge->set_src_id(
877+
encode_unique_vertex_id(path_nodes[i].label(), path_nodes[i].vid()));
878+
edge->set_dst_id(encode_unique_vertex_id(path_nodes[i + 1].label(),
879+
path_nodes[i + 1].vid()));
880+
}
881+
auto vertex_in_path = mutable_path->add_path();
882+
auto node = vertex_in_path->mutable_vertex();
883+
node->mutable_label()->set_id(path_nodes[len - 1].label());
884+
node->set_id(encode_unique_vertex_id(path_nodes[len - 1].label(),
885+
path_nodes[len - 1].vid()));
851886
} else {
852887
sink_impl(col->mutable_entry()->mutable_element()->mutable_object());
853888
}

flex/engines/graph_db/runtime/common/rt_any.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,24 +91,27 @@ class PathImpl : public CObject {
9191
return new_path;
9292
}
9393
static std::unique_ptr<PathImpl> make_path_impl(
94-
label_t label, std::vector<vid_t>& path_ids) {
94+
label_t label, label_t edge_label, std::vector<vid_t>& path_ids) {
9595
auto new_path = std::make_unique<PathImpl>();
9696
for (auto id : path_ids) {
9797
new_path->path_.push_back({label, id});
9898
}
99+
new_path->edge_labels_.push_back(edge_label);
99100
return new_path;
100101
}
101-
102102
static std::unique_ptr<PathImpl> make_path_impl(
103+
const std::vector<label_t>& edge_labels,
103104
const std::vector<VertexRecord>& path) {
104105
auto new_path = std::make_unique<PathImpl>();
105-
new_path->path_.insert(new_path->path_.end(), path.begin(), path.end());
106+
new_path->path_ = path;
107+
new_path->edge_labels_ = edge_labels;
106108
return new_path;
107109
}
108-
109-
std::unique_ptr<PathImpl> expand(label_t label, vid_t v) const {
110+
std::unique_ptr<PathImpl> expand(label_t edge_label, label_t label,
111+
vid_t v) const {
110112
auto new_path = std::make_unique<PathImpl>();
111113
new_path->path_ = path_;
114+
new_path->edge_labels_.emplace_back(edge_label);
112115
new_path->path_.push_back({label, v});
113116
return new_path;
114117
}
@@ -130,6 +133,7 @@ class PathImpl : public CObject {
130133
bool operator<(const PathImpl& p) const { return path_ < p.path_; }
131134
bool operator==(const PathImpl& p) const { return path_ == p.path_; }
132135
std::vector<VertexRecord> path_;
136+
std::vector<label_t> edge_labels_;
133137
};
134138
class Path {
135139
public:
@@ -155,6 +159,8 @@ class Path {
155159

156160
std::vector<VertexRecord> nodes() { return impl_->path_; }
157161

162+
std::vector<label_t> edge_labels() const { return impl_->edge_labels_; }
163+
158164
VertexRecord get_start() const { return impl_->get_start(); }
159165
bool operator<(const Path& p) const { return *impl_ < *(p.impl_); }
160166
bool operator==(const Path& p) const { return *(impl_) == *(p.impl_); }

0 commit comments

Comments
 (0)