Skip to content

Commit 7d13f48

Browse files
authored
fix(interactive): add a cypher client to process cypher queries and remove duplicate dependencies from CMakeLists.txt (#4446)
Fixes
1 parent 3f19bfd commit 7d13f48

File tree

12 files changed

+123
-33
lines changed

12 files changed

+123
-33
lines changed

flex/bin/CMakeLists.txt

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,37 @@
11
if(Hiactor_FOUND)
22
add_executable(rt_server rt_server.cc)
3-
target_link_libraries(rt_server flex_utils flex_rt_mutable_graph flex_graph_db flex_server)
3+
target_link_libraries(rt_server flex_server)
44
install_without_export_flex_target(rt_server)
55
endif()
66

77
if(Hiactor_FOUND)
88
include_directories(../engines/http_server)
99
add_executable(rt_bench rt_bench.cc)
10-
target_link_libraries(rt_bench flex_utils flex_rt_mutable_graph flex_graph_db flex_server)
10+
target_link_libraries(rt_bench flex_server)
1111
install_without_export_flex_target(rt_bench)
1212
endif()
1313

1414
add_executable(rt_admin rt_admin.cc)
15-
target_link_libraries(rt_admin flex_utils flex_rt_mutable_graph flex_graph_db)
15+
target_link_libraries(rt_admin flex_utils)
1616
install_without_export_flex_target(rt_admin)
1717

1818

1919
add_executable(adhoc_runner adhoc_runner.cc)
20-
target_link_libraries(adhoc_runner flex_utils flex_graph_db)
20+
target_link_libraries(adhoc_runner flex_graph_db)
2121
install_without_export_flex_target(adhoc_runner)
2222

23+
add_executable(cypher_client cypher_client.cc)
24+
target_link_libraries(cypher_client flex_utils)
25+
install_without_export_flex_target(cypher_client)
26+
2327
add_executable(flex_analytical_engine flex_analytical_engine.cc)
2428
target_link_libraries(flex_analytical_engine flex_immutable_graph flex_bsp ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
2529
install_without_export_flex_target(flex_analytical_engine)
2630

2731

2832
if(Hiactor_FOUND)
2933
add_executable(interactive_server interactive_server.cc)
30-
target_link_libraries(interactive_server flex_utils flex_graph_db flex_server flex_plan_proto flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES})
34+
target_link_libraries(interactive_server flex_server ${GFLAGS_LIBRARIES})
3135
if (OPENTELEMETRY_CPP_FOUND)
3236
target_link_libraries(interactive_server otel)
3337
endif()
@@ -39,9 +43,9 @@ install(PROGRAMS load_plan_and_gen.sh DESTINATION bin)
3943

4044
include_directories(${Boost_INCLUDE_DIRS})
4145
add_executable(bulk_loader bulk_loader.cc)
42-
target_link_libraries(bulk_loader flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES})
46+
target_link_libraries(bulk_loader flex_rt_mutable_graph ${GFLAGS_LIBRARIES})
4347
install_without_export_flex_target(bulk_loader)
4448

4549
add_executable(stored_procedure_runner stored_procedure_runner.cc)
46-
target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils flex_graph_db ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES})
50+
target_link_libraries(stored_procedure_runner flex_graph_db ${GFLAGS_LIBRARIES})
4751
install_without_export_flex_target(stored_procedure_runner)

flex/bin/cypher_client.cc

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#include "grape/util.h"
17+
18+
#include <boost/program_options.hpp>
19+
#include <fstream>
20+
#include <iostream>
21+
#include <vector>
22+
#include "flex/engines/graph_db/database/graph_db_session.h"
23+
#include "flex/storages/rt_mutable_graph/schema.h"
24+
#include "flex/third_party/httplib.h"
25+
26+
namespace bpo = boost::program_options;
27+
28+
int main(int argc, char** argv) {
29+
bpo::options_description desc("Usage:");
30+
desc.add_options()("help", "Display help message")("version,v",
31+
"Display version")(
32+
"uri,u", bpo::value<std::string>()->default_value("127.0.0.1"),
33+
"uri of the db")("port,p", bpo::value<int>()->default_value(10000),
34+
"port number");
35+
google::InitGoogleLogging(argv[0]);
36+
FLAGS_logtostderr = true;
37+
38+
bpo::variables_map vm;
39+
bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
40+
bpo::notify(vm);
41+
42+
if (vm.count("help")) {
43+
std::cout << desc << std::endl;
44+
return 0;
45+
}
46+
if (vm.count("version")) {
47+
std::cout << "GraphScope/Flex version " << FLEX_VERSION << std::endl;
48+
return 0;
49+
}
50+
51+
std::string uri = vm["uri"].as<std::string>();
52+
int port = vm["port"].as<int>();
53+
httplib::Client cli(uri, port);
54+
setenv("TZ", "Asia/Shanghai", 1);
55+
tzset();
56+
57+
while (true) {
58+
std::cout << ">>> ";
59+
std::string query;
60+
getline(std::cin, query);
61+
if (query == "exit") {
62+
break;
63+
}
64+
if (query == "") {
65+
continue;
66+
}
67+
68+
query.append(1, *gs::Schema::CYPHER_READ_DEBUG_PLUGIN_ID_STR);
69+
char input_format =
70+
static_cast<char>(gs::GraphDBSession::InputFormat::kCypherString);
71+
query.append(1, input_format);
72+
auto res = cli.Post("/v1/graph/current/query", query, "text/plain");
73+
std::string ret = res->body;
74+
std::cout << ret << std::endl;
75+
}
76+
return 0;
77+
}

flex/engines/graph_db/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
66
add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})
77

88
target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
9-
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
9+
target_link_libraries(flex_graph_db flex_rt_mutable_graph ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
1010
target_link_libraries(flex_graph_db runtime_execute)
1111
install_flex_target(flex_graph_db)
1212

flex/engines/graph_db/app/cypher_app_utils.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
#include "flex/engines/graph_db/app/cypher_app_utils.h"
17+
#include <glog/logging.h>
1718

1819
#include <sys/wait.h> // for waitpid()
1920
#include <unistd.h> // for fork() and execvp()
@@ -135,8 +136,8 @@ bool generate_plan(
135136
int status;
136137
waitpid(pid, &status, 0);
137138
if (WIFEXITED(status)) {
138-
std::cout << "Child exited with status " << WEXITSTATUS(status)
139-
<< std::endl;
139+
VLOG(1) << "Child exited with status " << WEXITSTATUS(status)
140+
<< std::endl;
140141
}
141142

142143
{

flex/engines/graph_db/app/cypher_read_app.cc

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,15 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
8484
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
8585
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
8686
const auto& compiler_path = db_.schema().get_compiler_path();
87-
for (int i = 0; i < 3; ++i) {
88-
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
89-
tmp_dir, plan_cache_)) {
90-
LOG(ERROR) << "Generate plan failed for query: " << query;
91-
} else {
92-
query_cache.put(query, plan_cache_[query].SerializeAsString());
93-
break;
94-
}
87+
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
88+
tmp_dir, plan_cache_)) {
89+
LOG(ERROR) << "Generate plan failed for query: " << query;
90+
std::string error =
91+
" Compiler failed to generate physical plan: " + query;
92+
output.put_bytes(error.data(), error.size());
93+
return false;
94+
} else {
95+
query_cache.put(query, plan_cache_[query].SerializeAsString());
9596
}
9697
}
9798
}
@@ -107,8 +108,11 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
107108
gs::runtime::GraphReadInterface gri(txn);
108109
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::Context(),
109110
params, timer_);
110-
111-
runtime::Sink::sink_encoder(ctx.value(), gri, output);
111+
if (type == Schema::CYPHER_READ_PLUGIN_ID) {
112+
runtime::Sink::sink_encoder(ctx.value(), gri, output);
113+
} else {
114+
runtime::Sink::sink_beta(ctx.value(), gri, output);
115+
}
112116
}
113117
return true;
114118
}

flex/engines/graph_db/database/graph_db.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,8 @@ void GraphDB::initApps(
421421
std::make_shared<HQPSAdhocWriteAppFactory>();
422422
app_factories_[Schema::ADHOC_READ_PLUGIN_ID] =
423423
std::make_shared<CypherReadAppFactory>();
424+
app_factories_[Schema::CYPHER_READ_DEBUG_PLUGIN_ID] =
425+
std::make_shared<CypherReadAppFactory>();
424426

425427
auto& parser = gs::runtime::PlanParser::get();
426428
parser.init();

flex/engines/graph_db/runtime/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
file(GLOB_RECURSE COMMON_SOURCES "common/*.cc")
22
add_library(runtime_common SHARED ${COMMON_SOURCES})
3-
target_link_libraries(runtime_common ${Boost_LIBRARIES} flex_rt_mutable_graph flex_utils flex_plan_proto)
3+
target_link_libraries(runtime_common flex_rt_mutable_graph flex_plan_proto)
44
install_flex_target(runtime_common)
55

66
file(GLOB_RECURSE EXECUTE_SOURCES "execute/*.cc" "utils/*.cc")

flex/engines/graph_db/runtime/common/operators/retrieve/sink.cc

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,23 @@ void Sink::sink_encoder(const Context& ctx, const GraphReadInterface& graph,
5858
void Sink::sink_beta(const Context& ctx, const GraphReadInterface& graph,
5959
Encoder& output) {
6060
size_t row_num = ctx.row_num();
61-
results::CollectiveResults results;
61+
std::stringstream ss;
62+
6263
for (size_t i = 0; i < row_num; ++i) {
63-
auto result = results.add_results();
64-
std::stringstream ss;
6564
for (size_t j : ctx.tag_ids) {
6665
auto col = ctx.get(j);
6766
if (col == nullptr) {
6867
continue;
6968
}
70-
auto column = result->mutable_record()->add_columns();
7169
auto val = col->get_elem(i);
7270
ss << val.to_string() << "|";
73-
val.sink(graph, j, column);
7471
}
75-
std::cout << ss.str() << std::endl;
72+
ss << std::endl;
7673
}
77-
std::cout << "========================================================="
78-
<< std::endl;
79-
auto res = results.SerializeAsString();
74+
ss << "========================================================="
75+
<< std::endl;
76+
// std::cout << ss.str();
77+
auto res = ss.str();
8078
output.put_bytes(res.data(), res.size());
8179
}
8280

flex/engines/graph_db/runtime/common/rt_any.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,8 @@ static void sink_any(const Any& any, common::Value* value) {
695695
value->set_f64(any.AsDouble());
696696
} else if (any.type == PropertyType::Empty()) {
697697
value->mutable_none();
698+
} else if (any.type == PropertyType::Day()) {
699+
value->set_i64(any.AsDay().to_timestamp());
698700
} else {
699701
LOG(FATAL) << "Any value: " << any.to_string()
700702
<< ", type = " << any.type.type_enum;

flex/storages/rt_mutable_graph/schema.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,18 @@ class Schema {
3131
// How many built-in plugins are there.
3232
// Currently only one builtin plugin, SERVER_APP is supported.
3333
static constexpr uint8_t RESERVED_PLUGIN_NUM = 1;
34-
static constexpr uint8_t MAX_PLUGIN_ID = 246;
34+
static constexpr uint8_t MAX_PLUGIN_ID = 245;
3535
static constexpr uint8_t ADHOC_READ_PLUGIN_ID = 253;
3636
static constexpr uint8_t HQPS_ADHOC_READ_PLUGIN_ID = 254;
3737
static constexpr uint8_t HQPS_ADHOC_WRITE_PLUGIN_ID = 255;
3838

3939
static constexpr uint8_t CYPHER_READ_PLUGIN_ID = 248;
4040
static constexpr uint8_t CYPHER_WRITE_PLUGIN_ID = 247;
41+
static constexpr uint8_t CYPHER_READ_DEBUG_PLUGIN_ID = 246;
4142
static constexpr const char* HQPS_ADHOC_READ_PLUGIN_ID_STR = "\xFE";
4243
static constexpr const char* HQPS_ADHOC_WRITE_PLUGIN_ID_STR = "\xFF";
4344
static constexpr const char* ADHOC_READ_PLUGIN_ID_STR = "\xFD";
45+
static constexpr const char* CYPHER_READ_DEBUG_PLUGIN_ID_STR = "\xF6";
4446
static constexpr const char* PRIMITIVE_TYPE_KEY = "primitive_type";
4547
static constexpr const char* VARCHAR_KEY = "varchar";
4648
static constexpr const char* MAX_LENGTH_KEY = "max_length";

0 commit comments

Comments
 (0)