Skip to content

Commit f5b2fc0

Browse files
authored
refactor(interactive): Let transction returns commit result (#4507)
In Interactive's execution engine, transactions such as `ReadTransaction`, `UpdateTransaction`, and `InsertTransaction` are employed to maintain data consistency and integrity throughout operations. Each transaction can either succeed or fail, and it is important to understand the transactional guarantees provided by Interactive: 1. For every transaction, we first write the Write-Ahead Log (WAL) to persistent storage before applying it to the graph data. This ensures a reliable record of the transaction steps. 2. If a transaction returns `false` during the `commit()` process, the error occurred prior to applying the WAL to the graph data. This type of failure could arise during the construction of the WAL or during its writing phase. 3. It is important to note that errors can still occur when replaying the WAL to the graph database. Replaying might fail due to limitations in resources or due to unforeseen bugs. **However,** any errors encountered during this stage will be handled via exceptions or may result in process failure. Currently, there is no established mechanism to handle such failures. Future improvements should focus on implementing failover strategies, potentially allowing the GraphDB to continue replaying the WAL until it succeeds.
1 parent 4111956 commit f5b2fc0

17 files changed

+83
-38
lines changed

docs/flex/interactive/development/dev_and_test.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,15 @@ The mapping between status codes and HTTP codes is shown in the table below.
250250
| INVALID_IMPORT_FILE(104) | 400 |
251251
| IO_ERROR(105) | 500 |
252252
| QUERY_FAILED(106) | 500 |
253-
| default | 500 |
253+
| default | 500 |
254+
255+
256+
### Transactions
257+
258+
In Interactive's execution engine, transactions such as `ReadTransaction`, `UpdateTransaction`, and `InsertTransaction` are employed to maintain data consistency and integrity throughout operations. Each transaction can either succeed or fail, and it is important to understand the transactional guarantees provided by Interactive:
259+
260+
1. For every transaction, we first write the Write-Ahead Log (WAL) to persistent storage before applying it to the graph data. This ensures a reliable record of the transaction steps.
261+
262+
2. If a transaction returns `false` during the `commit()` process, the error occurred prior to applying the WAL to the graph data. This type of failure could arise during the construction of the WAL or during its writing phase.
263+
264+
3. It is important to note that errors can still occur when replaying the WAL to the graph database. Replaying might fail due to limitations in resources or due to unforeseen bugs. **However,** any errors encountered during this stage will be handled via exceptions or may result in process failure. Currently, there is no established mechanism to handle such failures. Future improvements should focus on implementing failover strategies, potentially allowing the GraphDB to continue replaying the WAL until it succeeds.

flex/engines/graph_db/database/compact_transaction.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ CompactTransaction::~CompactTransaction() { Abort(); }
3131

3232
timestamp_t CompactTransaction::timestamp() const { return timestamp_; }
3333

34-
void CompactTransaction::Commit() {
34+
bool CompactTransaction::Commit() {
3535
if (timestamp_ != std::numeric_limits<timestamp_t>::max()) {
3636
auto* header = reinterpret_cast<WalHeader*>(arc_.GetBuffer());
3737
header->length = 0;
3838
header->timestamp = timestamp_;
3939
header->type = 1;
4040

41-
logger_.append(arc_.GetBuffer(), arc_.GetSize());
41+
if (!logger_.append(arc_.GetBuffer(), arc_.GetSize())) {
42+
LOG(ERROR) << "Failed to append wal log";
43+
Abort();
44+
return false;
45+
}
4246
arc_.Clear();
4347

4448
LOG(INFO) << "before compact - " << timestamp_;
@@ -48,6 +52,7 @@ void CompactTransaction::Commit() {
4852
vm_.release_update_timestamp(timestamp_);
4953
timestamp_ = std::numeric_limits<timestamp_t>::max();
5054
}
55+
return true;
5156
}
5257

5358
void CompactTransaction::Abort() {

flex/engines/graph_db/database/compact_transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class CompactTransaction {
3333

3434
timestamp_t timestamp() const;
3535

36-
void Commit();
36+
bool Commit();
3737

3838
void Abort();
3939

flex/engines/graph_db/database/graph_db_session.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ UpdateTransaction GraphDBSession::GetUpdateTransaction() {
5858
}
5959

6060
bool GraphDBSession::BatchUpdate(UpdateBatch& batch) {
61-
GetUpdateTransaction().batch_commit(batch);
62-
return true;
61+
return GetUpdateTransaction().batch_commit(batch);
6362
}
6463

6564
const MutablePropertyFragment& GraphDBSession::graph() const {

flex/engines/graph_db/database/insert_transaction.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,26 +144,31 @@ bool InsertTransaction::AddEdge(label_t src_label, const Any& src,
144144
return true;
145145
}
146146

147-
void InsertTransaction::Commit() {
147+
bool InsertTransaction::Commit() {
148148
if (timestamp_ == std::numeric_limits<timestamp_t>::max()) {
149-
return;
149+
return true;
150150
}
151151
if (arc_.GetSize() == sizeof(WalHeader)) {
152152
vm_.release_insert_timestamp(timestamp_);
153153
clear();
154-
return;
154+
return true;
155155
}
156156
auto* header = reinterpret_cast<WalHeader*>(arc_.GetBuffer());
157157
header->length = arc_.GetSize() - sizeof(WalHeader);
158158
header->type = 0;
159159
header->timestamp = timestamp_;
160160

161-
logger_.append(arc_.GetBuffer(), arc_.GetSize());
161+
if (!logger_.append(arc_.GetBuffer(), arc_.GetSize())) {
162+
LOG(ERROR) << "Failed to append wal log";
163+
Abort();
164+
return false;
165+
}
162166
IngestWal(graph_, timestamp_, arc_.GetBuffer() + sizeof(WalHeader),
163167
header->length, alloc_);
164168

165169
vm_.release_insert_timestamp(timestamp_);
166170
clear();
171+
return true;
167172
}
168173

169174
void InsertTransaction::Abort() {

flex/engines/graph_db/database/insert_transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class InsertTransaction {
4848
bool AddEdge(label_t src_label, const Any& src, label_t dst_label,
4949
const Any& dst, label_t edge_label, const Any& prop);
5050

51-
void Commit();
51+
bool Commit();
5252

5353
void Abort();
5454

flex/engines/graph_db/database/read_transaction.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ std::string ReadTransaction::run(
3434

3535
timestamp_t ReadTransaction::timestamp() const { return timestamp_; }
3636

37-
void ReadTransaction::Commit() { release(); }
37+
bool ReadTransaction::Commit() {
38+
release();
39+
return true;
40+
}
3841

3942
void ReadTransaction::Abort() { release(); }
4043

flex/engines/graph_db/database/read_transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ class ReadTransaction {
384384

385385
timestamp_t timestamp() const;
386386

387-
void Commit();
387+
bool Commit();
388388

389389
void Abort();
390390

flex/engines/graph_db/database/single_edge_insert_transaction.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,19 @@ timestamp_t SingleEdgeInsertTransaction::timestamp() const {
109109
return timestamp_;
110110
}
111111

112-
void SingleEdgeInsertTransaction::Commit() {
112+
bool SingleEdgeInsertTransaction::Commit() {
113113
if (timestamp_ == std::numeric_limits<timestamp_t>::max()) {
114-
return;
114+
return false;
115115
}
116116
auto* header = reinterpret_cast<WalHeader*>(arc_.GetBuffer());
117117
header->length = arc_.GetSize() - sizeof(WalHeader);
118118
header->type = 0;
119119
header->timestamp = timestamp_;
120-
logger_.append(arc_.GetBuffer(), arc_.GetSize());
120+
if (!logger_.append(arc_.GetBuffer(), arc_.GetSize())) {
121+
LOG(ERROR) << "Failed to append wal log";
122+
Abort();
123+
return false;
124+
}
121125

122126
grape::OutArchive arc;
123127
{
@@ -134,6 +138,7 @@ void SingleEdgeInsertTransaction::Commit() {
134138
timestamp_, arc, alloc_);
135139
vm_.release_insert_timestamp(timestamp_);
136140
clear();
141+
return true;
137142
}
138143

139144
void SingleEdgeInsertTransaction::clear() {

flex/engines/graph_db/database/single_edge_insert_transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class SingleEdgeInsertTransaction {
4141

4242
timestamp_t timestamp() const;
4343

44-
void Commit();
44+
bool Commit();
4545

4646
private:
4747
void clear();

0 commit comments

Comments
 (0)