Skip to content

Commit f00da9d

Browse files
authored
feat(interactive): runtime support update operations (#4490)
Fixes
1 parent ece22ae commit f00da9d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+4449
-720
lines changed

flex/engines/graph_db/app/cypher_app_utils.cc

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,10 @@ void generate_compiler_configs(const std::string& graph_yaml,
7575
out.close();
7676
}
7777

78-
bool generate_plan(
79-
const std::string& query, const std::string& statistics,
80-
const std::string& compiler_jar_path, const std::string& compiler_yaml,
81-
const std::string& tmp_dir,
82-
std::unordered_map<std::string, physical::PhysicalPlan>& plan_cache) {
78+
bool generate_plan(const std::string& query, const std::string& statistics,
79+
const std::string& compiler_jar_path,
80+
const std::string& compiler_yaml, const std::string& tmp_dir,
81+
physical::PhysicalPlan& plan) {
8382
// dump query to file
8483
const char* compiler_jar = compiler_jar_path.c_str();
8584
if (compiler_jar_path == "") {
@@ -157,12 +156,9 @@ bool generate_plan(
157156
file.read(&buffer[0], size);
158157

159158
file.close();
160-
physical::PhysicalPlan plan;
161159
if (!plan.ParseFromString(std::string(buffer))) {
162160
return false;
163161
}
164-
165-
plan_cache[query] = plan;
166162
}
167163
// clean up temp files
168164
{

flex/engines/graph_db/app/cypher_app_utils.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@
2222
#include "flex/proto_generated_gie/physical.pb.h"
2323

2424
namespace gs {
25-
bool generate_plan(
26-
const std::string& query, const std::string& statistics,
27-
const std::string& compiler_jar_path, const std::string& compiler_yaml,
28-
const std::string& tmp_dir,
29-
std::unordered_map<std::string, physical::PhysicalPlan>& plan_cache);
25+
bool generate_plan(const std::string& query, const std::string& statistics,
26+
const std::string& compiler_jar_path,
27+
const std::string& compiler_yaml, const std::string& tmp_dir,
28+
physical::PhysicalPlan& plan_cache);
3029
void parse_params(std::string_view sw,
3130
std::map<std::string, std::string>& params);
3231
} // namespace gs

flex/engines/graph_db/app/cypher_read_app.cc

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "flex/engines/graph_db/database/graph_db.h"
55
#include "flex/engines/graph_db/runtime/common/operators/retrieve/sink.h"
66
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
7+
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
78

89
namespace gs {
910

@@ -71,29 +72,23 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
7172
if (!pipeline_cache_.count(query)) {
7273
if (plan_cache_.count(query)) {
7374
} else {
74-
auto& query_cache = db_.getQueryCache();
75-
std::string_view plan_str;
76-
if (query_cache.get(query, plan_str)) {
77-
physical::PhysicalPlan plan;
78-
if (!plan.ParseFromString(std::string(plan_str))) {
79-
return false;
80-
}
81-
plan_cache_[query] = plan;
75+
physical::PhysicalPlan plan;
76+
std::string plan_str;
77+
78+
if (!gs::runtime::CypherRunnerImpl::get().gen_plan(db_, query,
79+
plan_str)) {
80+
LOG(ERROR) << "Generate plan failed for query: " << query;
81+
std::string error =
82+
" Compiler failed to generate physical plan: " + query;
83+
output.put_bytes(error.data(), error.size());
84+
85+
return false;
8286
} else {
83-
const std::string statistics = db_.work_dir() + "/statistics.json";
84-
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
85-
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
86-
const auto& compiler_path = db_.schema().get_compiler_path();
87-
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
88-
tmp_dir, plan_cache_)) {
89-
LOG(ERROR) << "Generate plan failed for query: " << query;
90-
std::string error =
91-
" Compiler failed to generate physical plan: " + query;
92-
output.put_bytes(error.data(), error.size());
87+
if (!plan.ParseFromString(plan_str)) {
88+
LOG(ERROR) << "Parse plan failed for query: " << query;
9389
return false;
94-
} else {
95-
query_cache.put(query, plan_cache_[query].SerializeAsString());
9690
}
91+
plan_cache_[query] = std::move(plan);
9792
}
9893
}
9994
const auto& plan = plan_cache_[query];

flex/engines/graph_db/app/cypher_write_app.cc

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "flex/engines/graph_db/database/graph_db.h"
44

55
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
6+
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
67

78
namespace gs {
89

@@ -21,43 +22,30 @@ bool CypherWriteApp::Query(GraphDBSession& graph, Decoder& input,
2122
if (!pipeline_cache_.count(query)) {
2223
if (plan_cache_.count(query)) {
2324
} else {
24-
const std::string statistics = db_.work_dir() + "/statistics.json";
25-
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
26-
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
25+
physical::PhysicalPlan plan;
26+
std::string plan_str;
2727

28-
auto& query_cache = db_.getQueryCache();
29-
std::string_view plan_str;
30-
if (query_cache.get(query, plan_str)) {
31-
physical::PhysicalPlan plan;
32-
if (!plan.ParseFromString(std::string(plan_str))) {
33-
return false;
34-
}
35-
plan_cache_[query] = plan;
28+
if (!gs::runtime::CypherRunnerImpl::get().gen_plan(db_, query,
29+
plan_str)) {
30+
return false;
3631
} else {
37-
const auto& compiler_path = db_.schema().get_compiler_path();
38-
39-
for (int i = 0; i < 3; ++i) {
40-
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
41-
tmp_dir, plan_cache_)) {
42-
LOG(ERROR) << "Generate plan failed for query: " << query;
43-
} else {
44-
query_cache.put(query, plan_cache_[query].SerializeAsString());
45-
break;
46-
}
32+
if (!plan.ParseFromString(plan_str)) {
33+
LOG(ERROR) << "Parse plan failed for query: " << query;
34+
return false;
4735
}
36+
plan_cache_[query] = std::move(plan);
4837
}
4938
}
5039
const auto& plan = plan_cache_[query];
5140
pipeline_cache_.emplace(query, runtime::PlanParser::get()
5241
.parse_write_pipeline(db_.schema(), plan)
5342
.value());
54-
} else {
5543
}
5644

5745
gs::runtime::GraphInsertInterface gri(txn);
5846
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::WriteContext(),
5947
params, timer_);
60-
48+
txn.Commit();
6149
return true;
6250
}
6351
AppWrapper CypherWriteAppFactory::CreateApp(const GraphDB& db) {

flex/engines/graph_db/database/graph_db.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "flex/engines/graph_db/database/graph_db_session.h"
2626
#include "flex/engines/graph_db/database/wal.h"
2727
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
28+
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
2829
#include "flex/utils/yaml_utils.h"
2930

3031
#include "flex/third_party/httplib.h"
@@ -239,7 +240,7 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
239240

240241
unlink((work_dir_ + "/statistics.json").c_str());
241242
graph_.generateStatistics(work_dir_);
242-
query_cache_.cache.clear();
243+
runtime::CypherRunnerImpl::get().clear_cache();
243244

244245
return Result<bool>(true);
245246
}
@@ -507,8 +508,6 @@ size_t GraphDB::getExecutedQueryNum() const {
507508
return ret;
508509
}
509510

510-
QueryCache& GraphDB::getQueryCache() const { return query_cache_; }
511-
512511
void GraphDB::OutputCypherProfiles(const std::string& prefix) {
513512
runtime::OprTimer read_timer, write_timer;
514513
int session_num = SessionNum();

flex/engines/graph_db/database/graph_db.h

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
#include <map>
2222
#include <mutex>
23-
#include <shared_mutex>
2423
#include <thread>
2524
#include <vector>
2625

@@ -70,25 +69,6 @@ struct GraphDBConfig {
7069
int memory_level;
7170
};
7271

73-
struct QueryCache {
74-
bool get(const std::string& key, std::string_view& value) {
75-
std::shared_lock<std::shared_mutex> lock(mutex);
76-
auto it = cache.find(key);
77-
if (it != cache.end()) {
78-
value = it->second;
79-
return true;
80-
}
81-
return false;
82-
}
83-
84-
void put(const std::string& key, const std::string_view& value) {
85-
std::unique_lock<std::shared_mutex> lock(mutex);
86-
cache[key] = value;
87-
}
88-
std::shared_mutex mutex;
89-
std::unordered_map<std::string, std::string> cache;
90-
};
91-
9272
class GraphDB {
9373
public:
9474
GraphDB();
@@ -178,8 +158,6 @@ class GraphDB {
178158
void UpdateCompactionTimestamp(timestamp_t ts);
179159
timestamp_t GetLastCompactionTimestamp() const;
180160

181-
QueryCache& getQueryCache() const;
182-
183161
std::string work_dir() const { return work_dir_; }
184162

185163
void OutputCypherProfiles(const std::string& prefix);
@@ -214,8 +192,6 @@ class GraphDB {
214192
std::array<std::string, 256> app_paths_;
215193
std::array<std::shared_ptr<AppFactoryBase>, 256> app_factories_;
216194

217-
mutable QueryCache query_cache_;
218-
219195
std::thread monitor_thread_;
220196
bool monitor_thread_running_;
221197

flex/engines/graph_db/database/graph_db_session.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ ReadTransaction GraphDBSession::GetReadTransaction() const {
3434

3535
InsertTransaction GraphDBSession::GetInsertTransaction() {
3636
uint32_t ts = db_.version_manager_.acquire_insert_timestamp();
37-
return InsertTransaction(db_.graph_, alloc_, logger_, db_.version_manager_,
38-
ts);
37+
return InsertTransaction(*this, db_.graph_, alloc_, logger_,
38+
db_.version_manager_, ts);
3939
}
4040

4141
SingleVertexInsertTransaction
@@ -53,7 +53,7 @@ SingleEdgeInsertTransaction GraphDBSession::GetSingleEdgeInsertTransaction() {
5353

5454
UpdateTransaction GraphDBSession::GetUpdateTransaction() {
5555
uint32_t ts = db_.version_manager_.acquire_update_timestamp();
56-
return UpdateTransaction(db_.graph_, alloc_, work_dir_, logger_,
56+
return UpdateTransaction(*this, db_.graph_, alloc_, work_dir_, logger_,
5757
db_.version_manager_, ts);
5858
}
5959

flex/engines/graph_db/database/insert_transaction.cc

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@
1414
*/
1515

1616
#include "flex/engines/graph_db/database/insert_transaction.h"
17+
#include "flex/engines/graph_db/database/graph_db_session.h"
1718
#include "flex/engines/graph_db/database/transaction_utils.h"
1819
#include "flex/engines/graph_db/database/version_manager.h"
1920
#include "flex/engines/graph_db/database/wal.h"
21+
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
2022
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
2123
#include "flex/utils/allocators.h"
22-
2324
namespace gs {
2425

25-
InsertTransaction::InsertTransaction(MutablePropertyFragment& graph,
26+
InsertTransaction::InsertTransaction(const GraphDBSession& session,
27+
MutablePropertyFragment& graph,
2628
Allocator& alloc, WalWriter& logger,
2729
VersionManager& vm, timestamp_t timestamp)
2830

29-
: graph_(graph),
31+
: session_(session),
32+
graph_(graph),
3033
alloc_(alloc),
3134
logger_(logger),
3235
vm_(vm),
@@ -36,6 +39,12 @@ InsertTransaction::InsertTransaction(MutablePropertyFragment& graph,
3639

3740
InsertTransaction::~InsertTransaction() { Abort(); }
3841

42+
std::string InsertTransaction::run(
43+
const std::string& cypher,
44+
const std::map<std::string, std::string>& params) {
45+
return gs::runtime::CypherRunnerImpl::get().run(*this, cypher, params);
46+
}
47+
3948
bool InsertTransaction::AddVertex(label_t label, const Any& id,
4049
const std::vector<Any>& props) {
4150
size_t arc_size = arc_.GetSize();
@@ -210,6 +219,8 @@ void InsertTransaction::clear() {
210219

211220
const Schema& InsertTransaction::schema() const { return graph_.schema(); }
212221

222+
const GraphDBSession& InsertTransaction::GetSession() const { return session_; }
223+
213224
#define likely(x) __builtin_expect(!!(x), 1)
214225

215226
bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph,

flex/engines/graph_db/database/insert_transaction.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@ namespace gs {
2929
class MutablePropertyFragment;
3030
class WalWriter;
3131
class VersionManager;
32+
class GraphDBSession;
3233

3334
class InsertTransaction {
3435
public:
35-
InsertTransaction(MutablePropertyFragment& graph, Allocator& alloc,
36+
std::string run(const std::string& cypher,
37+
const std::map<std::string, std::string>& params);
38+
39+
InsertTransaction(const GraphDBSession& session,
40+
MutablePropertyFragment& graph, Allocator& alloc,
3641
WalWriter& logger, VersionManager& vm,
3742
timestamp_t timestamp);
3843

@@ -54,12 +59,15 @@ class InsertTransaction {
5459

5560
const Schema& schema() const;
5661

62+
const GraphDBSession& GetSession() const;
63+
5764
private:
5865
void clear();
5966

6067
static bool get_vertex_with_retries(MutablePropertyFragment& graph,
6168
label_t label, const Any& oid,
6269
vid_t& lid);
70+
const GraphDBSession& session_;
6371

6472
grape::InArchive arc_;
6573

flex/engines/graph_db/database/read_transaction.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include "flex/engines/graph_db/database/read_transaction.h"
1717
#include "flex/engines/graph_db/database/version_manager.h"
18+
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
1819
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
1920

2021
namespace gs {
@@ -25,6 +26,12 @@ ReadTransaction::ReadTransaction(const GraphDBSession& session,
2526
: session_(session), graph_(graph), vm_(vm), timestamp_(timestamp) {}
2627
ReadTransaction::~ReadTransaction() { release(); }
2728

29+
std::string ReadTransaction::run(
30+
const std::string& cypher,
31+
const std::map<std::string, std::string>& params) const {
32+
return gs::runtime::CypherRunnerImpl::get().run(*this, cypher, params);
33+
}
34+
2835
timestamp_t ReadTransaction::timestamp() const { return timestamp_; }
2936

3037
void ReadTransaction::Commit() { release(); }

0 commit comments

Comments
 (0)