Skip to content

Commit 5d6d660

Browse files
authored
refactor(interactive): Refactoring Wal parser and writer for GraphDB (#4504)
- Extract two interfaces`IWalWriter` and `IWalParser` to decouple wal-related implementation from GraphDB. - Provide local-file based WalWriter and WalParser. - Introducing configuration `wal_uri`, which is by default set to `{GRAPH_DATA_DIR}/wal`
1 parent cb73658 commit 5d6d660

29 files changed

+626
-245
lines changed

docs/flex/interactive/configuration.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ In this following table, we use the `.` notation to represent the hierarchy with
115115
| log_level | INFO | The level of database log, INFO/WARNING/ERROR/FATAL | 0.0.1 |
116116
| verbose_level | 0 | The verbose level of database log, should be a int | 0.0.3 |
117117
| compute_engine.thread_num_per_worker | 1 | The number of threads will be used to process the queries. Increase the number can benefit the query throughput | 0.0.1 |
118+
| compute_engine.wal_uri | file://{GRAPH_DATA_DIR}/wal | The location where Interactive will store and access WALs. `GRAPH_DATA_DIR` is a placeholder that will be populated by Interactive. | 0.5 |
118119
| compiler.planner.is_on | true | Determines if query optimization is enabled for compiling Cypher queries | 0.0.1 |
119120
| compiler.planner.opt | RBO | Specifies the optimizer to be used for query optimization. Currently, only the Rule-Based Optimizer (RBO) is supported | 0.0.1 |
120121
| compiler.planner.rules.FilterMatchRule | N/A | An optimization rule that pushes filter (`Where`) conditions into the `Match` clause | 0.0.1 |
@@ -125,6 +126,7 @@ In this following table, we use the `.` notation to represent the hierarchy with
125126
| http_service.max_content_length | 1GB | The maximum length of a http request that admin http service could handle | 0.5 |
126127
| storage.string_default_max_length | 256 | The default maximum size for a string field | 0.5 |
127128

129+
128130
#### TODOs
129131

130132
We currently only allow service configuration during instance deployment. In the near future, we will support:

flex/bin/interactive_server.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,10 @@ int main(int argc, char** argv) {
240240
gs::init_codegen_proxy(vm, engine_config_file, graph_schema_path);
241241
}
242242
db.Close();
243-
auto load_res =
244-
db.Open(schema_res.value(), data_path, service_config.shard_num);
243+
gs::GraphDBConfig config(schema_res.value(), data_path, "",
244+
service_config.shard_num);
245+
config.wal_uri = service_config.wal_uri;
246+
auto load_res = db.Open(config);
245247
if (!load_res.ok()) {
246248
LOG(FATAL) << "Failed to load graph from data directory: "
247249
<< load_res.status().error_message();

flex/bin/rt_server.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ int main(int argc, char** argv) {
4040
"warmup graph data")("memory-level,m",
4141
bpo::value<int>()->default_value(1))(
4242
"compiler-path,c", bpo::value<std::string>()->default_value(""))(
43-
"sharding-mode", bpo::value<std::string>()->default_value("cooperative"));
43+
"sharding-mode", bpo::value<std::string>()->default_value("cooperative"))(
44+
"wal-uri",
45+
bpo::value<std::string>()->default_value("file://{GRAPH_DATA_DIR}/wal"));
4446
google::InitGoogleLogging(argv[0]);
4547
FLAGS_logtostderr = true;
4648

@@ -84,6 +86,7 @@ int main(int argc, char** argv) {
8486
}
8587
gs::GraphDBConfig config(schema.value(), data_path, compiler_path, shard_num);
8688
config.memory_level = memory_level;
89+
config.wal_uri = vm["wal-uri"].as<std::string>();
8790
if (config.memory_level >= 2) {
8891
config.enable_auto_compaction = true;
8992
}

flex/engines/graph_db/CMakeLists.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
add_subdirectory(runtime)
22
file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
33
"${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc"
4+
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/*.cc"
45
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")
56

67
add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})
@@ -19,7 +20,6 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
1920
${CMAKE_CURRENT_SOURCE_DIR}/database/update_transaction.h
2021
${CMAKE_CURRENT_SOURCE_DIR}/database/compact_transaction.h
2122
${CMAKE_CURRENT_SOURCE_DIR}/database/version_manager.h
22-
${CMAKE_CURRENT_SOURCE_DIR}/database/wal.h
2323
${CMAKE_CURRENT_SOURCE_DIR}/database/transaction_utils.h
2424
${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db_operations.h
2525
DESTINATION include/flex/engines/graph_db/database)
@@ -29,4 +29,10 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/app_base.h
2929
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h
3030
DESTINATION include/flex/engines/graph_db/app)
3131

32+
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/wal/wal.h
33+
${CMAKE_CURRENT_SOURCE_DIR}/database/wal/local_wal_parser.h
34+
${CMAKE_CURRENT_SOURCE_DIR}/database/wal/local_wal_writer.h
35+
DESTINATION include/flex/engines/graph_db/database/wal)
36+
37+
3238

flex/engines/graph_db/database/compact_transaction.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515

1616
#include "flex/engines/graph_db/database/compact_transaction.h"
1717
#include "flex/engines/graph_db/database/version_manager.h"
18-
#include "flex/engines/graph_db/database/wal.h"
18+
#include "flex/engines/graph_db/database/wal/wal.h"
1919
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
2020

2121
namespace gs {
2222

2323
CompactTransaction::CompactTransaction(MutablePropertyFragment& graph,
24-
WalWriter& logger, VersionManager& vm,
24+
IWalWriter& logger, VersionManager& vm,
2525
timestamp_t timestamp)
2626
: graph_(graph), logger_(logger), vm_(vm), timestamp_(timestamp) {
2727
arc_.Resize(sizeof(WalHeader));

flex/engines/graph_db/database/compact_transaction.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
namespace gs {
2323

2424
class MutablePropertyFragment;
25-
class WalWriter;
25+
class IWalWriter;
2626
class VersionManager;
2727

2828
class CompactTransaction {
2929
public:
30-
CompactTransaction(MutablePropertyFragment& graph, WalWriter& logger,
30+
CompactTransaction(MutablePropertyFragment& graph, IWalWriter& logger,
3131
VersionManager& vm, timestamp_t timestamp);
3232
~CompactTransaction();
3333

@@ -39,7 +39,7 @@ class CompactTransaction {
3939

4040
private:
4141
MutablePropertyFragment& graph_;
42-
WalWriter& logger_;
42+
IWalWriter& logger_;
4343
VersionManager& vm_;
4444
timestamp_t timestamp_;
4545

flex/engines/graph_db/database/graph_db.cc

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
#include "flex/engines/graph_db/app/hqps_app.h"
2424
#include "flex/engines/graph_db/app/server_app.h"
2525
#include "flex/engines/graph_db/database/graph_db_session.h"
26-
#include "flex/engines/graph_db/database/wal.h"
26+
#include "flex/engines/graph_db/database/wal/wal.h"
2727
#include "flex/engines/graph_db/runtime/execute/plan_parser.h"
2828
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
2929
#include "flex/utils/yaml_utils.h"
@@ -34,18 +34,20 @@ namespace gs {
3434

3535
struct SessionLocalContext {
3636
SessionLocalContext(GraphDB& db, const std::string& work_dir, int thread_id,
37-
MemoryStrategy allocator_strategy)
37+
MemoryStrategy allocator_strategy,
38+
std::unique_ptr<IWalWriter> in_logger)
3839
: allocator(allocator_strategy,
3940
(allocator_strategy != MemoryStrategy::kSyncToFile
4041
? ""
4142
: thread_local_allocator_prefix(work_dir, thread_id))),
42-
session(db, allocator, logger, work_dir, thread_id) {}
43-
~SessionLocalContext() { logger.close(); }
43+
logger(std::move(in_logger)),
44+
session(db, allocator, *logger, work_dir, thread_id) {}
45+
~SessionLocalContext() { logger->close(); }
4446
Allocator allocator;
4547
char _padding0[128 - sizeof(Allocator) % 128];
46-
WalWriter logger;
47-
char _padding1[4096 - sizeof(WalWriter) - sizeof(Allocator) -
48-
sizeof(_padding0)];
48+
std::unique_ptr<IWalWriter> logger;
49+
char _padding1[4096 - sizeof(std::unique_ptr<IWalWriter>) -
50+
sizeof(Allocator) - sizeof(_padding0)];
4951
GraphDBSession session;
5052
char _padding2[4096 - sizeof(GraphDBSession) % 4096];
5153
};
@@ -64,6 +66,8 @@ GraphDB::~GraphDB() {
6466

6567
free(contexts_);
6668
}
69+
WalWriterFactory::Finalize();
70+
WalParserFactory::Finalize();
6771
}
6872

6973
GraphDB& GraphDB::get() {
@@ -86,6 +90,7 @@ Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
8690
}
8791

8892
Result<bool> GraphDB::Open(const GraphDBConfig& config) {
93+
config_ = config;
8994
const std::string& data_dir = config.data_dir;
9095
const Schema& schema = config.schema;
9196
if (!std::filesystem::exists(data_dir)) {
@@ -142,7 +147,7 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
142147
allocator_strategy = MemoryStrategy::kHugepagePrefered;
143148
}
144149

145-
openWalAndCreateContexts(data_dir, allocator_strategy);
150+
openWalAndCreateContexts(config, data_dir, allocator_strategy);
146151

147152
if ((!create_empty_graph) && config.warmup) {
148153
graph_.Warmup(thread_num_);
@@ -228,8 +233,9 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
228233
VLOG(10) << "Trigger auto compaction";
229234
last_compaction_at = query_num_after;
230235
timestamp_t ts = this->version_manager_.acquire_update_timestamp();
231-
auto txn = CompactTransaction(this->graph_, this->contexts_[0].logger,
232-
this->version_manager_, ts);
236+
auto txn =
237+
CompactTransaction(this->graph_, *this->contexts_[0].logger,
238+
this->version_manager_, ts);
233239
OutputCypherProfiles("./" + std::to_string(ts) + "_");
234240
txn.Commit();
235241
VLOG(10) << "Finish compaction";
@@ -344,7 +350,7 @@ void GraphDB::GetAppInfo(Encoder& output) {
344350

345351
static void IngestWalRange(SessionLocalContext* contexts,
346352
MutablePropertyFragment& graph,
347-
const WalsParser& parser, uint32_t from, uint32_t to,
353+
const IWalParser& parser, uint32_t from, uint32_t to,
348354
int thread_num) {
349355
std::atomic<uint32_t> cur_ts(from);
350356
std::vector<std::thread> threads(thread_num);
@@ -372,11 +378,10 @@ static void IngestWalRange(SessionLocalContext* contexts,
372378
}
373379
}
374380

375-
void GraphDB::ingestWals(const std::vector<std::string>& wals,
376-
const std::string& work_dir, int thread_num) {
377-
WalsParser parser(wals);
381+
void GraphDB::ingestWals(IWalParser& parser, const std::string& work_dir,
382+
int thread_num) {
378383
uint32_t from_ts = 1;
379-
for (auto& update_wal : parser.update_wals()) {
384+
for (auto& update_wal : parser.get_update_wals()) {
380385
uint32_t to_ts = update_wal.timestamp;
381386
if (from_ts < to_ts) {
382387
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
@@ -452,30 +457,36 @@ void GraphDB::initApps(
452457
<< ", from " << plugins.size();
453458
}
454459

455-
void GraphDB::openWalAndCreateContexts(const std::string& data_dir,
460+
void GraphDB::openWalAndCreateContexts(const GraphDBConfig& config,
461+
const std::string& data_dir,
456462
MemoryStrategy allocator_strategy) {
457-
std::string wal_dir_path = wal_dir(data_dir);
458-
if (!std::filesystem::exists(wal_dir_path)) {
459-
std::filesystem::create_directory(wal_dir_path);
460-
}
461-
462-
std::vector<std::string> wal_files;
463-
for (const auto& entry : std::filesystem::directory_iterator(wal_dir_path)) {
464-
wal_files.push_back(entry.path().string());
465-
}
466-
463+
WalWriterFactory::Init();
464+
WalParserFactory::Init();
467465
contexts_ = static_cast<SessionLocalContext*>(
468466
aligned_alloc(4096, sizeof(SessionLocalContext) * thread_num_));
469467
std::filesystem::create_directories(allocator_dir(data_dir));
468+
469+
// Open the wal writer.
470+
std::string wal_uri = config.wal_uri;
471+
if (wal_uri.empty()) {
472+
LOG(ERROR) << "wal_uri is not set, use default wal_uri";
473+
wal_uri = wal_dir(data_dir);
474+
} else if (wal_uri.find("{GRAPH_DATA_DIR}") != std::string::npos) {
475+
LOG(INFO) << "Template {GRAPH_DATA_DIR} found in wal_uri, replace it with "
476+
"data_dir";
477+
wal_uri = std::regex_replace(wal_uri, std::regex("\\{GRAPH_DATA_DIR\\}"),
478+
data_dir);
479+
}
480+
VLOG(1) << "Using wal uri: " << wal_uri;
470481
for (int i = 0; i < thread_num_; ++i) {
471482
new (&contexts_[i])
472-
SessionLocalContext(*this, data_dir, i, allocator_strategy);
483+
SessionLocalContext(*this, data_dir, i, allocator_strategy,
484+
WalWriterFactory::CreateWalWriter(wal_uri));
485+
contexts_[i].logger->open(wal_uri, i);
473486
}
474-
ingestWals(wal_files, data_dir, thread_num_);
475487

476-
for (int i = 0; i < thread_num_; ++i) {
477-
contexts_[i].logger.open(wal_dir_path, i);
478-
}
488+
auto wal_parser = WalParserFactory::CreateWalParser(wal_uri);
489+
ingestWals(*wal_parser, data_dir, thread_num_);
479490

480491
initApps(graph_.schema().GetPlugins());
481492
VLOG(1) << "Successfully restore load plugins";

flex/engines/graph_db/database/graph_db.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class GraphDBSession;
4141
struct SessionLocalContext;
4242

4343
struct GraphDBConfig {
44+
GraphDBConfig() = default;
4445
GraphDBConfig(const Schema& schema_, const std::string& data_dir_,
4546
const std::string& compiler_path_ = "", int thread_num_ = 1)
4647
: schema(schema_),
@@ -50,7 +51,8 @@ struct GraphDBConfig {
5051
warmup(false),
5152
enable_monitoring(false),
5253
enable_auto_compaction(false),
53-
memory_level(1) {}
54+
memory_level(1),
55+
wal_uri("") {}
5456

5557
Schema schema;
5658
std::string data_dir;
@@ -67,6 +69,9 @@ struct GraphDBConfig {
6769
3 - force hugepages;
6870
*/
6971
int memory_level;
72+
std::string wal_uri; // Indicate the where shall we store the wal files.
73+
// could be file://{GRAPH_DATA_DIR}/wal or other scheme
74+
// that interactive supports
7075
};
7176

7277
class GraphDB {
@@ -162,17 +167,20 @@ class GraphDB {
162167

163168
void OutputCypherProfiles(const std::string& prefix);
164169

170+
inline const GraphDBConfig& config() const { return config_; }
171+
165172
private:
166173
bool registerApp(const std::string& path, uint8_t index = 0);
167174

168-
void ingestWals(const std::vector<std::string>& wals,
169-
const std::string& work_dir, int thread_num);
175+
void ingestWals(IWalParser& parser, const std::string& work_dir,
176+
int thread_num);
170177

171178
void initApps(
172179
const std::unordered_map<std::string, std::pair<std::string, uint8_t>>&
173180
plugins);
174181

175-
void openWalAndCreateContexts(const std::string& data_dir_path,
182+
void openWalAndCreateContexts(const GraphDBConfig& config,
183+
const std::string& data_dir,
176184
MemoryStrategy allocator_strategy);
177185

178186
void showAppMetrics() const;
@@ -181,6 +189,7 @@ class GraphDB {
181189

182190
friend class GraphDBSession;
183191

192+
GraphDBConfig config_;
184193
std::string work_dir_;
185194
SessionLocalContext* contexts_;
186195

flex/engines/graph_db/database/graph_db_session.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
namespace gs {
3232

3333
class GraphDB;
34-
class WalWriter;
34+
class IWalWriter;
3535

3636
class GraphDBSession {
3737
public:
@@ -49,7 +49,7 @@ class GraphDBSession {
4949
static constexpr const char* kCypherJsonStr = "\x01";
5050
static constexpr const char* kCypherProtoAdhocStr = "\x02";
5151
static constexpr const char* kCypherProtoProcedureStr = "\x03";
52-
GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger,
52+
GraphDBSession(GraphDB& db, Allocator& alloc, IWalWriter& logger,
5353
const std::string& work_dir, int thread_id)
5454
: db_(db),
5555
alloc_(alloc),
@@ -183,7 +183,7 @@ class GraphDBSession {
183183
}
184184
GraphDB& db_;
185185
Allocator& alloc_;
186-
WalWriter& logger_;
186+
IWalWriter& logger_;
187187
std::string work_dir_;
188188
int thread_id_;
189189

flex/engines/graph_db/database/insert_transaction.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
#include "flex/engines/graph_db/database/graph_db_session.h"
1818
#include "flex/engines/graph_db/database/transaction_utils.h"
1919
#include "flex/engines/graph_db/database/version_manager.h"
20-
#include "flex/engines/graph_db/database/wal.h"
20+
#include "flex/engines/graph_db/database/wal/wal.h"
2121
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
2222
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
2323
#include "flex/utils/allocators.h"
2424
namespace gs {
2525

2626
InsertTransaction::InsertTransaction(const GraphDBSession& session,
2727
MutablePropertyFragment& graph,
28-
Allocator& alloc, WalWriter& logger,
28+
Allocator& alloc, IWalWriter& logger,
2929
VersionManager& vm, timestamp_t timestamp)
3030

3131
: session_(session),

0 commit comments

Comments
 (0)