Skip to content

Commit 05ecabe

Browse files
committed
fix
1 parent 19e2c90 commit 05ecabe

File tree

6 files changed

+213
-125
lines changed

6 files changed

+213
-125
lines changed

flex/engines/graph_db/database/graph_db.cc

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ GraphDB& GraphDB::get() {
7171
return db;
7272
}
7373

74-
QueryCache& GraphDB::getQueryCache() const { return query_cache_; }
75-
7674
Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
7775
int32_t thread_num, bool warmup, bool memory_only,
7876
bool enable_auto_compaction) {
@@ -231,12 +229,17 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
231229
timestamp_t ts = this->version_manager_.acquire_update_timestamp();
232230
auto txn = CompactTransaction(this->graph_, this->contexts_[0].logger,
233231
this->version_manager_, ts);
232+
OutputCypherProfiles("./" + std::to_string(ts) + "_");
234233
txn.Commit();
235234
VLOG(10) << "Finish compaction";
236235
}
237236
}
238237
});
239238
}
239+
240+
unlink((work_dir_ + "/statistics.json").c_str());
241+
unlink((work_dir_ + "/.compiler.yaml").c_str());
242+
graph_.generateStatistics(work_dir_);
240243
query_cache_.cache.clear();
241244

242245
return Result<bool>(true);
@@ -265,8 +268,9 @@ void GraphDB::Close() {
265268
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
266269
}
267270

268-
ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
269-
return contexts_[thread_id].session.GetReadTransaction();
271+
ReadTransaction GraphDB::GetReadTransaction() {
272+
uint32_t ts = version_manager_.acquire_read_timestamp();
273+
return {graph_, version_manager_, ts};
270274
}
271275

272276
InsertTransaction GraphDB::GetInsertTransaction(int thread_id) {
@@ -304,21 +308,6 @@ timestamp_t GraphDB::GetLastCompactionTimestamp() const {
304308
return last_compaction_ts_;
305309
}
306310

307-
const MutablePropertyFragment& GraphDB::graph() const { return graph_; }
308-
MutablePropertyFragment& GraphDB::graph() { return graph_; }
309-
310-
const Schema& GraphDB::schema() const { return graph_.schema(); }
311-
312-
std::shared_ptr<ColumnBase> GraphDB::get_vertex_property_column(
313-
uint8_t label, const std::string& col_name) const {
314-
return graph_.get_vertex_property_column(label, col_name);
315-
}
316-
317-
std::shared_ptr<RefColumnBase> GraphDB::get_vertex_id_column(
318-
uint8_t label) const {
319-
return graph_.get_vertex_id_column(label);
320-
}
321-
322311
AppWrapper GraphDB::CreateApp(uint8_t app_type, int thread_id) {
323312
if (app_factories_[app_type] == nullptr) {
324313
LOG(ERROR) << "Stored procedure " << static_cast<int>(app_type)
@@ -516,4 +505,27 @@ size_t GraphDB::getExecutedQueryNum() const {
516505
return ret;
517506
}
518507

508+
QueryCache& GraphDB::getQueryCache() const { return query_cache_; }
509+
510+
void GraphDB::OutputCypherProfiles(const std::string& prefix) {
511+
runtime::OprTimer read_timer, write_timer;
512+
int session_num = SessionNum();
513+
for (int i = 0; i < session_num; ++i) {
514+
auto read_app_ptr = GetSession(i).GetApp(Schema::CYPHER_READ_PLUGIN_ID);
515+
auto casted_read_app = dynamic_cast<CypherReadApp*>(read_app_ptr);
516+
if (casted_read_app) {
517+
read_timer += casted_read_app->timer();
518+
}
519+
520+
auto write_app_ptr = GetSession(i).GetApp(Schema::CYPHER_WRITE_PLUGIN_ID);
521+
auto casted_write_app = dynamic_cast<CypherWriteApp*>(write_app_ptr);
522+
if (casted_write_app) {
523+
write_timer += casted_write_app->timer();
524+
}
525+
}
526+
527+
read_timer.output(prefix + "read_profile.log");
528+
write_timer.output(prefix + "write_profile.log");
529+
}
530+
519531
} // namespace gs

flex/engines/graph_db/database/graph_db.h

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <map>
2222
#include <mutex>
23+
#include <shared_mutex>
2324
#include <thread>
2425
#include <vector>
2526

@@ -93,8 +94,6 @@ class GraphDB {
9394

9495
static GraphDB& get();
9596

96-
QueryCache& getQueryCache() const;
97-
9897
/**
9998
* @brief Load the graph from data directory.
10099
* @param schema The schema of graph. It should be the same as the schema,
@@ -119,7 +118,7 @@ class GraphDB {
119118
*
120119
* @return graph_dir The directory of graph data.
121120
*/
122-
ReadTransaction GetReadTransaction(int thread_id = 0);
121+
ReadTransaction GetReadTransaction();
123122

124123
/** @brief Create a transaction to insert vertices and edges with a default
125124
* allocator.
@@ -150,15 +149,20 @@ class GraphDB {
150149
*/
151150
UpdateTransaction GetUpdateTransaction(int thread_id = 0);
152151

153-
const MutablePropertyFragment& graph() const;
154-
MutablePropertyFragment& graph();
152+
inline const MutablePropertyFragment& graph() const { return graph_; }
153+
inline MutablePropertyFragment& graph() { return graph_; }
155154

156-
const Schema& schema() const;
155+
inline const Schema& schema() const { return graph_.schema(); }
157156

158-
std::shared_ptr<ColumnBase> get_vertex_property_column(
159-
uint8_t label, const std::string& col_name) const;
157+
inline std::shared_ptr<ColumnBase> get_vertex_property_column(
158+
uint8_t label, const std::string& col_name) const {
159+
return graph_.get_vertex_table(label).get_column(col_name);
160+
}
160161

161-
std::shared_ptr<RefColumnBase> get_vertex_id_column(uint8_t label) const;
162+
inline std::shared_ptr<RefColumnBase> get_vertex_id_column(
163+
uint8_t label) const {
164+
return graph_.get_vertex_id_column(label);
165+
}
162166

163167
AppWrapper CreateApp(uint8_t app_type, int thread_id);
164168

@@ -172,8 +176,12 @@ class GraphDB {
172176
void UpdateCompactionTimestamp(timestamp_t ts);
173177
timestamp_t GetLastCompactionTimestamp() const;
174178

179+
QueryCache& getQueryCache() const;
180+
175181
std::string work_dir() const { return work_dir_; }
176182

183+
void OutputCypherProfiles(const std::string& prefix);
184+
177185
private:
178186
bool registerApp(const std::string& path, uint8_t index = 0);
179187

@@ -193,8 +201,6 @@ class GraphDB {
193201

194202
friend class GraphDBSession;
195203

196-
mutable QueryCache query_cache_;
197-
198204
std::string work_dir_;
199205
SessionLocalContext* contexts_;
200206

@@ -206,6 +212,8 @@ class GraphDB {
206212
std::array<std::string, 256> app_paths_;
207213
std::array<std::shared_ptr<AppFactoryBase>, 256> app_factories_;
208214

215+
mutable QueryCache query_cache_;
216+
209217
std::thread monitor_thread_;
210218
bool monitor_thread_running_;
211219

flex/engines/graph_db/runtime/common/columns/edge_columns.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -656,8 +656,7 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder {
656656
label_(label),
657657
prop_type_(prop_type),
658658
prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)),
659-
sub_types_(sub_types),
660-
cap_(0) {}
659+
sub_types_(sub_types) {}
661660
~SDSLEdgeColumnBuilder() = default;
662661

663662
void reserve(size_t size) override { edges_.reserve(size); }
@@ -686,7 +685,6 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder {
686685
PropertyType prop_type_;
687686
std::shared_ptr<EdgePropVecBase> prop_col_;
688687
std::vector<PropertyType> sub_types_;
689-
size_t cap_;
690688
};
691689

692690
template <typename T>

flex/engines/graph_db/runtime/common/rt_any.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ class SetImpl : public SetImplBase {
958958
return set_ == (dynamic_cast<const SetImpl<T>&>(p)).set_;
959959
}
960960

961-
void insert(const RTAny& val) {
961+
void insert(const RTAny& val) override {
962962
set_.insert(TypedConverter<T>::to_typed(val));
963963
}
964964
void insert(const T& val) { set_.insert(val); }
@@ -989,7 +989,7 @@ class SetImpl<VertexRecord> : public SetImplBase {
989989
return set_ == (dynamic_cast<const SetImpl<VertexRecord>&>(p)).set_;
990990
}
991991

992-
void insert(const RTAny& val) {
992+
void insert(const RTAny& val) override {
993993
insert(TypedConverter<VertexRecord>::to_typed(val));
994994
}
995995
void insert(VertexRecord val) {

flex/storages/rt_mutable_graph/mutable_property_fragment.cc

Lines changed: 92 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ void MutablePropertyFragment::Open(const std::string& work_dir,
194194
// We will reserve the at least 4096 slots for each vertex label
195195
size_t vertex_capacity =
196196
std::max(lf_indexers_[i].capacity(), (size_t) 4096);
197-
if (vertex_capacity >= lf_indexers_[i].size()) {
197+
if (vertex_capacity >= lf_indexers_[i].capacity()) {
198198
lf_indexers_[i].reserve(vertex_capacity);
199199
}
200200
vertex_data_[i].resize(vertex_capacity);
@@ -389,15 +389,6 @@ const Schema& MutablePropertyFragment::schema() const { return schema_; }
389389

390390
Schema& MutablePropertyFragment::mutable_schema() { return schema_; }
391391

392-
Table& MutablePropertyFragment::get_vertex_table(label_t vertex_label) {
393-
return vertex_data_[vertex_label];
394-
}
395-
396-
const Table& MutablePropertyFragment::get_vertex_table(
397-
label_t vertex_label) const {
398-
return vertex_data_[vertex_label];
399-
}
400-
401392
vid_t MutablePropertyFragment::vertex_num(label_t vertex_label) const {
402393
return static_cast<vid_t>(lf_indexers_[vertex_label].size());
403394
}
@@ -464,69 +455,98 @@ MutablePropertyFragment::get_incoming_edges_mut(label_t label, vid_t u,
464455
return get_ie_csr(label, neighbor_label, edge_label)->edge_iter_mut(u);
465456
}
466457

467-
CsrBase* MutablePropertyFragment::get_oe_csr(label_t label,
468-
label_t neighbor_label,
469-
label_t edge_label) {
470-
size_t index = label * vertex_label_num_ * edge_label_num_ +
471-
neighbor_label * edge_label_num_ + edge_label;
472-
return oe_[index];
473-
}
474-
475-
const CsrBase* MutablePropertyFragment::get_oe_csr(label_t label,
476-
label_t neighbor_label,
477-
label_t edge_label) const {
478-
size_t index = label * vertex_label_num_ * edge_label_num_ +
479-
neighbor_label * edge_label_num_ + edge_label;
480-
return oe_[index];
481-
}
482-
483-
CsrBase* MutablePropertyFragment::get_ie_csr(label_t label,
484-
label_t neighbor_label,
485-
label_t edge_label) {
486-
size_t index = neighbor_label * vertex_label_num_ * edge_label_num_ +
487-
label * edge_label_num_ + edge_label;
488-
return ie_[index];
489-
}
490-
491-
const CsrBase* MutablePropertyFragment::get_ie_csr(label_t label,
492-
label_t neighbor_label,
493-
label_t edge_label) const {
494-
size_t index = neighbor_label * vertex_label_num_ * edge_label_num_ +
495-
label * edge_label_num_ + edge_label;
496-
return ie_[index];
497-
}
498-
499-
std::shared_ptr<ColumnBase> MutablePropertyFragment::get_vertex_property_column(
500-
uint8_t label, const std::string& prop) const {
501-
return vertex_data_[label].get_column(prop);
502-
}
458+
void MutablePropertyFragment::generateStatistics(
459+
const std::string& work_dir) const {
460+
std::string filename = work_dir + "/statistics.json";
461+
size_t vertex_count = 0;
462+
463+
std::string ss = "\"vertex_type_statistics\": [\n";
464+
size_t vertex_label_num = schema_.vertex_label_num();
465+
for (size_t idx = 0; idx < vertex_label_num; ++idx) {
466+
ss += "{\n\"type_id\": " + std::to_string(idx) + ", \n";
467+
ss += "\"type_name\": \"" + schema_.get_vertex_label_name(idx) + "\", \n";
468+
size_t count = lf_indexers_[idx].size();
469+
ss += "\"count\": " + std::to_string(count) + "\n}";
470+
vertex_count += count;
471+
if (idx != vertex_label_num - 1) {
472+
ss += ", \n";
473+
} else {
474+
ss += "\n";
475+
}
476+
}
477+
ss += "]\n";
478+
size_t edge_count = 0;
479+
480+
size_t edge_label_num = schema_.edge_label_num();
481+
std::vector<std::thread> count_threads;
482+
std::vector<size_t> edge_count_list(dual_csr_list_.size(), 0);
483+
for (size_t src_label = 0; src_label < vertex_label_num; ++src_label) {
484+
const auto& src_label_name = schema_.get_vertex_label_name(src_label);
485+
for (size_t dst_label = 0; dst_label < vertex_label_num; ++dst_label) {
486+
const auto& dst_label_name = schema_.get_vertex_label_name(dst_label);
487+
for (size_t edge_label = 0; edge_label < edge_label_num; ++edge_label) {
488+
const auto& edge_label_name = schema_.get_edge_label_name(edge_label);
489+
if (schema_.exist(src_label_name, dst_label_name, edge_label_name)) {
490+
size_t index = src_label * vertex_label_num * edge_label_num +
491+
dst_label * edge_label_num + edge_label;
492+
if (dual_csr_list_[index] != NULL) {
493+
count_threads.emplace_back([&edge_count_list, index, this] {
494+
edge_count_list[index] = dual_csr_list_[index]->EdgeNum();
495+
});
496+
}
497+
}
498+
}
499+
}
500+
}
501+
for (auto& t : count_threads) {
502+
t.join();
503+
}
504+
ss += ",\n";
505+
ss += "\"edge_type_statistics\": [";
506+
507+
for (size_t edge_label = 0; edge_label < edge_label_num; ++edge_label) {
508+
const auto& edge_label_name = schema_.get_edge_label_name(edge_label);
509+
510+
ss += "{\n\"type_id\": " + std::to_string(edge_label) + ", \n";
511+
ss += "\"type_name\": \"" + edge_label_name + "\", \n";
512+
ss += "\"vertex_type_pair_statistics\": [\n";
513+
bool first = true;
514+
std::string props_content{};
515+
for (size_t src_label = 0; src_label < vertex_label_num; ++src_label) {
516+
const auto& src_label_name = schema_.get_vertex_label_name(src_label);
517+
for (size_t dst_label = 0; dst_label < vertex_label_num; ++dst_label) {
518+
const auto& dst_label_name = schema_.get_vertex_label_name(dst_label);
519+
size_t index = src_label * vertex_label_num * edge_label_num +
520+
dst_label * edge_label_num + edge_label;
521+
if (schema_.exist(src_label_name, dst_label_name, edge_label_name)) {
522+
if (!first) {
523+
ss += ",\n";
524+
}
525+
first = false;
526+
ss += "{\n\"source_vertex\" : \"" + src_label_name + "\", \n";
527+
ss += "\"destination_vertex\" : \"" + dst_label_name + "\", \n";
528+
ss += "\"count\" : " + std::to_string(edge_count_list[index]) + "\n";
529+
edge_count += edge_count_list[index];
530+
ss += "}";
531+
}
532+
}
533+
}
503534

504-
std::shared_ptr<RefColumnBase> MutablePropertyFragment::get_vertex_id_column(
505-
uint8_t label) const {
506-
if (lf_indexers_[label].get_type() == PropertyType::kInt64) {
507-
return std::make_shared<TypedRefColumn<int64_t>>(
508-
dynamic_cast<const TypedColumn<int64_t>&>(
509-
lf_indexers_[label].get_keys()));
510-
} else if (lf_indexers_[label].get_type() == PropertyType::kInt32) {
511-
return std::make_shared<TypedRefColumn<int32_t>>(
512-
dynamic_cast<const TypedColumn<int32_t>&>(
513-
lf_indexers_[label].get_keys()));
514-
} else if (lf_indexers_[label].get_type() == PropertyType::kUInt64) {
515-
return std::make_shared<TypedRefColumn<uint64_t>>(
516-
dynamic_cast<const TypedColumn<uint64_t>&>(
517-
lf_indexers_[label].get_keys()));
518-
} else if (lf_indexers_[label].get_type() == PropertyType::kUInt32) {
519-
return std::make_shared<TypedRefColumn<uint32_t>>(
520-
dynamic_cast<const TypedColumn<uint32_t>&>(
521-
lf_indexers_[label].get_keys()));
522-
} else if (lf_indexers_[label].get_type() == PropertyType::kStringView) {
523-
return std::make_shared<TypedRefColumn<std::string_view>>(
524-
dynamic_cast<const TypedColumn<std::string_view>&>(
525-
lf_indexers_[label].get_keys()));
526-
} else {
527-
LOG(ERROR) << "Unsupported vertex id type: "
528-
<< lf_indexers_[label].get_type();
529-
return nullptr;
535+
ss += "\n]\n}";
536+
if (edge_label != edge_label_num - 1) {
537+
ss += ", \n";
538+
} else {
539+
ss += "\n";
540+
}
541+
}
542+
ss += "]\n";
543+
{
544+
std::ofstream out(filename);
545+
out << "{\n\"total_vertex_count\": " << vertex_count << ",\n";
546+
out << "\"total_edge_count\": " << edge_count << ",\n";
547+
out << ss;
548+
out << "}\n";
549+
out.close();
530550
}
531551
}
532552

0 commit comments

Comments
 (0)