Skip to content

Commit ebdf9d1

Browse files
authored
refactor(interactive): refactor runtime (#4418)
Fixes
1 parent 76097db commit ebdf9d1

File tree

186 files changed

+23442
-8309
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

186 files changed

+23442
-8309
lines changed

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@
1010
path = learning_engine/graphlearn-for-pytorch
1111
url = https://github.com/alibaba/graphlearn-for-pytorch.git
1212

13+
[submodule "flex/third_party/parallel-hashmap"]
14+
path = flex/third_party/parallel-hashmap
15+
url = https://github.com/greg7mdp/parallel-hashmap.git

flex/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ add_definitions(-DRAPIDJSON_HAS_CXX11_RANGE_FOR=1)
207207
if (BUILD_ODPS_FRAGMENT_LOADER)
208208
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/odps/include)
209209
endif()
210+
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/parallel-hashmap)
210211

211212
macro(install_flex_target target)
212213
install(TARGETS ${target}

flex/bin/adhoc_runner.cc

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
#include <iostream>
2121
#include <vector>
2222
#include "flex/engines/graph_db/database/graph_db.h"
23-
#include "flex/engines/graph_db/runtime/adhoc/runtime.h"
23+
#include "flex/engines/graph_db/runtime/common/operators/retrieve/sink.h"
24+
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
25+
#include "flex/engines/graph_db/runtime/utils/opr_timer.h"
26+
#include "flex/proto_generated_gie/physical.pb.h"
2427

2528
namespace bpo = boost::program_options;
26-
namespace bl = boost::leaf;
2729

2830
std::string read_pb(const std::string& filename) {
2931
std::ifstream file(filename, std::ios::binary);
@@ -78,27 +80,11 @@ void load_params(const std::string& filename,
7880
gs::runtime::Context eval_plan(
7981
const physical::PhysicalPlan& plan, gs::ReadTransaction& txn,
8082
const std::map<std::string, std::string>& params) {
81-
gs::runtime::Context ctx;
82-
{
83-
ctx = bl::try_handle_all(
84-
[&plan, &txn, &params]() {
85-
return gs::runtime::runtime_eval(plan, txn, params);
86-
},
87-
[&ctx](const gs::Status& err) {
88-
LOG(FATAL) << "Error in execution: " << err.error_message();
89-
return ctx;
90-
},
91-
[&](const bl::error_info& err) {
92-
LOG(FATAL) << "boost leaf error: " << err.error().value() << ", "
93-
<< err.exception()->what();
94-
return ctx;
95-
},
96-
[&]() {
97-
LOG(FATAL) << "Unknown error in execution";
98-
return ctx;
99-
});
100-
}
101-
return ctx;
83+
gs::runtime::GraphReadInterface gri(txn);
84+
gs::runtime::OprTimer timer;
85+
return gs::runtime::PlanParser::get()
86+
.parse_read_pipeline(gri.schema(), gs::runtime::ContextMeta(), plan)
87+
.Execute(gri, gs::runtime::Context(), params, timer);
10288
}
10389

10490
int main(int argc, char** argv) {
@@ -187,7 +173,7 @@ int main(int argc, char** argv) {
187173
auto& m = map[i % params_num];
188174
auto ctx = eval_plan(pb, txn, m);
189175
gs::Encoder output(outputs[i]);
190-
gs::runtime::eval_sink(ctx, txn, output);
176+
gs::runtime::Sink::sink(ctx, txn, output);
191177
}
192178
t1 += grape::GetCurrentTime();
193179

@@ -197,7 +183,7 @@ int main(int argc, char** argv) {
197183
auto ctx = eval_plan(pb, txn, m);
198184
outputs[i].clear();
199185
gs::Encoder output(outputs[i]);
200-
gs::runtime::eval_sink(ctx, txn, output);
186+
gs::runtime::Sink::sink(ctx, txn, output);
201187
}
202188
t2 += grape::GetCurrentTime();
203189

@@ -207,7 +193,7 @@ int main(int argc, char** argv) {
207193
auto ctx = eval_plan(pb, txn, m);
208194
outputs[i].clear();
209195
gs::Encoder output(outputs[i]);
210-
gs::runtime::eval_sink(ctx, txn, output);
196+
gs::runtime::Sink::sink(ctx, txn, output);
211197
}
212198
t3 += grape::GetCurrentTime();
213199

@@ -231,4 +217,4 @@ int main(int argc, char** argv) {
231217
}
232218

233219
return 0;
234-
}
220+
}

flex/bin/bulk_loader.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ int main(int argc, char** argv) {
152152
return -1;
153153
}
154154

155+
{
156+
std::error_code ec;
157+
std::filesystem::copy(graph_schema_path, data_dir_path / "graph.yaml",
158+
std::filesystem::copy_options::overwrite_existing,
159+
ec);
160+
if (ec) {
161+
LOG(FATAL) << "Failed to copy graph schema file: " << ec.message();
162+
}
163+
}
164+
155165
work_dir = data_dir_path.string();
156166

157167
// Register handlers for SIGKILL, SIGINT, SIGTERM, SIGSEGV, SIGABRT

flex/bin/rt_server.cc

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ int main(int argc, char** argv) {
3434
bpo::value<uint32_t>()->default_value(1),
3535
"shard number of actor system")(
3636
"http-port,p", bpo::value<uint16_t>()->default_value(10000),
37-
"http port of query handler")("graph-config,g", bpo::value<std::string>(),
38-
"graph schema config file")(
39-
"data-path,d", bpo::value<std::string>(), "data directory path")(
37+
"http port of query handler")("data-path,d", bpo::value<std::string>(),
38+
"data directory path")(
4039
"warmup,w", bpo::value<bool>()->default_value(false),
4140
"warmup graph data")("memory-level,m",
4241
bpo::value<int>()->default_value(1));
@@ -62,14 +61,8 @@ int main(int argc, char** argv) {
6261
uint32_t shard_num = vm["shard-num"].as<uint32_t>();
6362
uint16_t http_port = vm["http-port"].as<uint16_t>();
6463

65-
std::string graph_schema_path = "";
6664
std::string data_path = "";
6765

68-
if (!vm.count("graph-config")) {
69-
LOG(ERROR) << "graph-config is required";
70-
return -1;
71-
}
72-
graph_schema_path = vm["graph-config"].as<std::string>();
7366
if (!vm.count("data-path")) {
7467
LOG(ERROR) << "data-path is required";
7568
return -1;
@@ -81,7 +74,7 @@ int main(int argc, char** argv) {
8174

8275
double t0 = -grape::GetCurrentTime();
8376
auto& db = gs::GraphDB::get();
84-
77+
std::string graph_schema_path = data_path + "/graph.yaml";
8578
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
8679
if (!schema.ok()) {
8780
LOG(FATAL) << "Failed to load schema: " << schema.status().error_message();

flex/engines/graph_db/CMakeLists.txt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})
77

88
target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
99
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
10-
target_link_libraries(flex_graph_db flex_plan_proto runtime_adhoc)
10+
target_link_libraries(flex_graph_db runtime_execute)
1111
install_flex_target(flex_graph_db)
1212

1313
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
@@ -28,7 +28,5 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/app_base.h
2828
DESTINATION include/flex/engines/graph_db/app)
2929
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h
3030
DESTINATION include/flex/engines/graph_db/app)
31-
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/adhoc_app.h
32-
DESTINATION include/flex/engines/graph_db/app)
3331

3432

flex/engines/graph_db/app/adhoc_app.cc

Lines changed: 0 additions & 77 deletions
This file was deleted.

0 commit comments

Comments
 (0)