Skip to content

Commit 44fe957

Browse files
committed
test
1 parent a7ea666 commit 44fe957

File tree

3 files changed

+69
-43
lines changed

3 files changed

+69
-43
lines changed

flex/engines/graph_db/app/cypher_read_app.cc

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,52 +10,72 @@ namespace gs {
1010
bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
1111
Encoder& output) {
1212
auto txn = graph.GetReadTransaction();
13-
std::string_view bytes = input.get_bytes();
13+
std::string_view _bytes = input.get_bytes();
14+
uint8_t type = static_cast<uint8_t>(_bytes.back());
15+
std::string_view bytes = std::string_view(_bytes.data(), _bytes.size() - 1);
16+
if (type == Schema::ADHOC_READ_PLUGIN_ID) {
17+
auto txn = graph.GetReadTransaction();
1418

15-
size_t sep = bytes.find_first_of("&?");
16-
auto query_str = bytes.substr(0, sep);
17-
auto params_str = bytes.substr(sep + 2);
18-
std::map<std::string, std::string> params;
19-
parse_params(params_str, params);
20-
auto query = std::string(query_str.data(), query_str.size());
21-
if (!pipeline_cache_.count(query)) {
22-
if (plan_cache_.count(query)) {
23-
// LOG(INFO) << "Hit cache for query ";
24-
} else {
25-
auto& query_cache = db_.getQueryCache();
26-
std::string_view plan_str;
27-
if (query_cache.get(query, plan_str)) {
28-
physical::PhysicalPlan plan;
29-
if (!plan.ParseFromString(std::string(plan_str))) {
30-
return false;
31-
}
32-
plan_cache_[query] = plan;
19+
physical::PhysicalPlan plan;
20+
if (!plan.ParseFromString(std::string(bytes))) {
21+
LOG(ERROR) << "Parse plan failed...";
22+
return false;
23+
}
24+
25+
LOG(INFO) << "plan: " << plan.DebugString();
26+
gs::runtime::GraphReadInterface gri(txn);
27+
auto ctx = runtime::PlanParser::get()
28+
.parse_read_pipeline(graph.schema(),
29+
gs::runtime::ContextMeta(), plan)
30+
.Execute(gri, runtime::Context(), {}, timer_);
31+
32+
runtime::Sink::sink(ctx, txn, output);
33+
} else {
34+
size_t sep = bytes.find_first_of("&?");
35+
auto query_str = bytes.substr(0, sep);
36+
auto params_str = bytes.substr(sep + 2);
37+
std::map<std::string, std::string> params;
38+
parse_params(params_str, params);
39+
auto query = std::string(query_str.data(), query_str.size());
40+
if (!pipeline_cache_.count(query)) {
41+
if (plan_cache_.count(query)) {
42+
// LOG(INFO) << "Hit cache for query ";
3343
} else {
34-
const std::string statistics = db_.work_dir() + "/statistics.json";
35-
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
36-
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
37-
for (int i = 0; i < 3; ++i) {
38-
if (!generate_plan(query, statistics, compiler_yaml, tmp_dir,
39-
plan_cache_)) {
40-
LOG(ERROR) << "Generate plan failed for query: " << query;
41-
} else {
42-
query_cache.put(query, plan_cache_[query].SerializeAsString());
43-
break;
44+
auto& query_cache = db_.getQueryCache();
45+
std::string_view plan_str;
46+
if (query_cache.get(query, plan_str)) {
47+
physical::PhysicalPlan plan;
48+
if (!plan.ParseFromString(std::string(plan_str))) {
49+
return false;
50+
}
51+
plan_cache_[query] = plan;
52+
} else {
53+
const std::string statistics = db_.work_dir() + "/statistics.json";
54+
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
55+
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
56+
for (int i = 0; i < 3; ++i) {
57+
if (!generate_plan(query, statistics, compiler_yaml, tmp_dir,
58+
plan_cache_)) {
59+
LOG(ERROR) << "Generate plan failed for query: " << query;
60+
} else {
61+
query_cache.put(query, plan_cache_[query].SerializeAsString());
62+
break;
63+
}
4464
}
4565
}
4666
}
67+
const auto& plan = plan_cache_[query];
68+
pipeline_cache_.emplace(
69+
query, runtime::PlanParser::get().parse_read_pipeline(
70+
db_.schema(), gs::runtime::ContextMeta(), plan));
4771
}
48-
const auto& plan = plan_cache_[query];
49-
pipeline_cache_.emplace(
50-
query, runtime::PlanParser::get().parse_read_pipeline(
51-
db_.schema(), gs::runtime::ContextMeta(), plan));
52-
}
5372

54-
gs::runtime::GraphReadInterface gri(txn);
55-
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::Context(), params,
56-
timer_);
73+
gs::runtime::GraphReadInterface gri(txn);
74+
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::Context(),
75+
params, timer_);
5776

58-
runtime::Sink::sink_encoder(ctx, gri, output);
77+
runtime::Sink::sink_encoder(ctx, gri, output);
78+
}
5979
return true;
6080
}
6181
AppWrapper CypherReadAppFactory::CreateApp(const GraphDB& db) {

flex/engines/graph_db/database/graph_db_session.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class GraphDBSession {
4040
kCypherJson = 1, // Json format for cypher query
4141
kCypherProtoAdhoc = 2, // Protobuf format for adhoc query
4242
kCypherProtoProcedure = 3, // Protobuf format for procedure query
43+
kCypherString = 4,
4344
};
4445

4546
static constexpr int32_t MAX_RETRY = 3;
@@ -156,7 +157,7 @@ class GraphDBSession {
156157
// second last byte,which is fixed to 255, and other bytes are a string
157158
// representing the path to generated dynamic lib.
158159
return std::make_pair((uint8_t) input[len - 2],
159-
std::string_view(str_data, len - 2));
160+
std::string_view(str_data, len - 1));
160161
} else if (input_tag == static_cast<uint8_t>(InputFormat::kCypherJson)) {
161162
// For cypherJson there is no query-id provided. The query name is
162163
// provided in the json string.
@@ -171,6 +172,9 @@ class GraphDBSession {
171172
// Same as cypherJson, we don't discard the last byte.
172173
std::string_view str_view(input.data(), len);
173174
return parse_query_type_from_cypher_internal(str_view);
175+
} else if (input_tag == static_cast<uint8_t>(InputFormat::kCypherString)) {
176+
return std::make_pair((uint8_t) input[len - 2],
177+
std::string_view(str_data, len - 1));
174178
} else {
175179
return Result<std::pair<uint8_t, std::string_view>>(
176180
gs::Status(StatusCode::INVALID_ARGUMENT,

flex/engines/http_server/handler/graph_db_http_handler.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,8 @@ class stored_proc_handler : public StoppableHandler {
296296
if (req->content.size() > 0) {
297297
// read last byte and get the format info from the byte.
298298
last_byte = req->content.back();
299-
if (last_byte >
300-
static_cast<uint8_t>(
301-
gs::GraphDBSession::InputFormat::kCypherProtoProcedure)) {
299+
if (last_byte > static_cast<uint8_t>(
300+
gs::GraphDBSession::InputFormat::kCypherString)) {
302301
LOG(ERROR) << "Unsupported request format: " << (int) last_byte;
303302
rep->set_status(
304303
seastar::httpd::reply::status_type::internal_server_error);
@@ -339,7 +338,10 @@ class stored_proc_handler : public StoppableHandler {
339338
#endif // HAVE_OPENTELEMETRY_CPP
340339
](auto&& output) {
341340
if (last_byte == static_cast<uint8_t>(
342-
gs::GraphDBSession::InputFormat::kCppEncoder)) {
341+
gs::GraphDBSession::InputFormat::kCppEncoder) ||
342+
last_byte ==
343+
static_cast<uint8_t>(
344+
gs::GraphDBSession::InputFormat::kCypherString)) {
343345
return seastar::make_ready_future<query_param>(
344346
std::move(output.content));
345347
} else {

0 commit comments

Comments
 (0)