Skip to content

Commit 048f425

Browse files
authored
p4: add test cases for bonus tasks (#653)
Signed-off-by: Alex Chi <[email protected]>
1 parent 12101ef commit 048f425

File tree

8 files changed

+649
-394
lines changed

8 files changed

+649
-394
lines changed

src/concurrency/transaction_manager.cpp

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,52 @@ auto TransactionManager::Begin(IsolationLevel isolation_level) -> Transaction *
4242
auto *txn_ref = txn.get();
4343
txn_map_.insert(std::make_pair(txn_id, std::move(txn)));
4444

45-
// TODO(fall2023): set the timestamps and compute watermark.
45+
// TODO(fall2023): set the timestamps here. Watermark updated below.
4646

47+
running_txns_.AddTxn(txn_ref->read_ts_);
4748
return txn_ref;
4849
}
4950

51+
auto TransactionManager::VerifyTxn(Transaction *txn) -> bool { return true; }
52+
5053
auto TransactionManager::Commit(Transaction *txn) -> bool {
51-
std::lock_guard<std::mutex> commit_lck(commit_mutex_);
52-
// TODO(fall2023): Implement me!
54+
std::unique_lock<std::mutex> commit_lck(commit_mutex_);
55+
56+
if (txn->state_ != TransactionState::RUNNING) {
57+
throw Exception("txn not in running state");
58+
}
59+
60+
if (txn->GetIsolationLevel() == IsolationLevel::SERIALIZABLE) {
61+
if (!VerifyTxn(txn)) {
62+
commit_lck.unlock();
63+
Abort(txn);
64+
return false;
65+
}
66+
}
67+
68+
// TODO(fall2023): Implement the commit logic!
69+
70+
std::unique_lock<std::shared_mutex> lck(txn_map_mutex_);
71+
72+
// TODO(fall2023): set commit timestamp + update last committed timestamp here.
73+
5374
txn->state_ = TransactionState::COMMITTED;
75+
running_txns_.UpdateCommitTs(txn->commit_ts_);
76+
running_txns_.RemoveTxn(txn->read_ts_);
77+
5478
return true;
5579
}
5680

5781
void TransactionManager::Abort(Transaction *txn) {
58-
// TODO(fall2023): Implement me!
82+
if (txn->state_ != TransactionState::RUNNING && txn->state_ != TransactionState::TAINTED) {
83+
throw Exception("txn not in running / tainted state");
84+
}
85+
86+
// TODO(fall2023): Implement the abort logic!
87+
88+
std::unique_lock<std::shared_mutex> lck(txn_map_mutex_);
5989
txn->state_ = TransactionState::ABORTED;
90+
running_txns_.RemoveTxn(txn->read_ts_);
6091
}
6192

6293
void TransactionManager::GarbageCollection() { UNIMPLEMENTED("not implemented"); }

src/concurrency/watermark.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,16 @@
44

55
namespace bustub {
66

7-
auto Watermark::AddTxn(timestamp_t read_ts) -> void { throw NotImplementedException("unimplemented"); }
7+
auto Watermark::AddTxn(timestamp_t read_ts) -> void {
8+
if (read_ts < commit_ts_) {
9+
throw Exception("read ts < commit ts");
10+
}
811

9-
auto Watermark::RemoveTxn(timestamp_t read_ts) -> void { throw NotImplementedException("unimplemented"); }
12+
// TODO(fall2023): implement me!
13+
}
14+
15+
auto Watermark::RemoveTxn(timestamp_t read_ts) -> void {
16+
// TODO(fall2023): implement me!
17+
}
1018

1119
} // namespace bustub

src/include/common/bustub_instance.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class StringVectorWriter : public ResultWriter {
121121
void EndHeader() override {}
122122
void BeginRow() override { values_.emplace_back(); }
123123
void EndRow() override {}
124-
void BeginTable(bool simplified_output) override {}
124+
void BeginTable(bool simplified_output) override { values_.clear(); }
125125
void EndTable() override {}
126126

127127
std::vector<std::vector<std::string>> values_;

src/include/concurrency/transaction_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ class TransactionManager {
127127
Catalog *catalog_;
128128

129129
std::atomic<txn_id_t> next_txn_id_{TXN_START_ID};
130+
131+
private:
132+
auto VerifyTxn(Transaction *txn) -> bool;
130133
};
131134

132135
} // namespace bustub

test/txn/txn_common.h

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -248,28 +248,67 @@ void ExecuteTxn(BustubInstance &instance, const std::string &txn_var_name, Trans
248248
}
249249
}
250250

251-
auto BeginTxn(BustubInstance &instance, const std::string &txn_var_name) -> Transaction * {
252-
auto txn = instance.txn_manager_->Begin();
253-
fmt::println(stderr, "- txn_begin var={} id={} status={} read_ts={}", txn_var_name,
254-
txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs());
251+
auto BeginTxn(BustubInstance &instance, const std::string &txn_var_name,
252+
IsolationLevel iso_lvl = IsolationLevel::SNAPSHOT_ISOLATION) -> Transaction * {
253+
auto txn = instance.txn_manager_->Begin(iso_lvl);
254+
fmt::println(stderr, "- txn_begin var={} id={} status={} read_ts={} iso_lvl={}", txn_var_name,
255+
txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(),
256+
txn->GetIsolationLevel());
255257
return txn;
256258
}
257259

258-
auto CommitTxn(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn) {
260+
auto BeginTxnSerializable(BustubInstance &instance, const std::string &txn_var_name) -> Transaction * {
261+
return BeginTxn(instance, txn_var_name, IsolationLevel::SERIALIZABLE);
262+
}
263+
264+
const bool EXPECT_FAIL = true;
265+
266+
auto CommitTxn(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn, bool expect_fail = false) {
259267
if (txn->GetTransactionState() != TransactionState::RUNNING) {
260268
fmt::println(stderr, "txn not running");
261269
std::terminate();
262270
}
263-
if (!instance.txn_manager_->Commit(txn)) {
264-
fmt::println(stderr, "failed to commit txn: var={} id={}", txn_var_name, txn->GetTransactionId());
271+
auto res = instance.txn_manager_->Commit(txn);
272+
if (!expect_fail) {
273+
if (!res) {
274+
fmt::println(stderr, "failed to commit txn: var={} id={}", txn_var_name, txn->GetTransactionId());
275+
std::terminate();
276+
}
277+
if (txn->GetTransactionState() != TransactionState::COMMITTED) {
278+
fmt::println(stderr, "should set to committed state var={} id={}", txn_var_name, txn->GetTransactionId());
279+
std::terminate();
280+
}
281+
fmt::println(stderr, "- txn_commit var={} id={} status={} read_ts={} commit_ts={}", txn_var_name,
282+
txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(),
283+
txn->GetCommitTs());
284+
return;
285+
}
286+
if (res) {
287+
fmt::println(stderr, "expect txn fail to commit, but committed: var={} id={}", txn_var_name,
288+
txn->GetTransactionId());
265289
std::terminate();
266290
}
267-
if (txn->GetTransactionState() != TransactionState::COMMITTED) {
268-
fmt::println(stderr, "should set to committed state var={} id={}", txn_var_name, txn->GetTransactionId());
291+
if (txn->GetTransactionState() != TransactionState::ABORTED) {
292+
fmt::println(stderr, "should set to aborted state var={} id={}", txn_var_name, txn->GetTransactionId());
269293
std::terminate();
270294
}
271-
fmt::println(stderr, "- txn_commit var={} id={} status={} read_ts={} commit_ts={}", txn_var_name,
272-
txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs(), txn->GetCommitTs());
295+
fmt::println(stderr, "- txn_commit_fail var={} id={} status={} read_ts={}", txn_var_name,
296+
txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs());
297+
}
298+
299+
auto AbortTxn(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn) {
300+
if (txn->GetTransactionState() != TransactionState::RUNNING &&
301+
txn->GetTransactionState() != TransactionState::TAINTED) {
302+
fmt::println(stderr, "txn not running / tainted");
303+
std::terminate();
304+
}
305+
instance.txn_manager_->Abort(txn);
306+
if (txn->GetTransactionState() != TransactionState::ABORTED) {
307+
fmt::println(stderr, "should set to aborted state var={} id={}", txn_var_name, txn->GetTransactionId());
308+
std::terminate();
309+
}
310+
fmt::println(stderr, "- txn_abort var={} id={} status={} read_ts={}", txn_var_name,
311+
txn->GetTransactionIdHumanReadable(), txn->GetTransactionState(), txn->GetReadTs());
273312
}
274313

275314
auto CheckTainted(BustubInstance &instance, const std::string &txn_var_name, Transaction *txn) {

test/txn/txn_index_concurrent_test.cpp

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <exception>
33
#include <memory>
44
#include <mutex> // NOLINT
5+
#include <random>
56
#include <string>
67
#include <thread> // NOLINT
78
#include "common/bustub_instance.h"
@@ -149,6 +150,7 @@ TEST(TxnIndexTest, DISABLED_IndexConcurrentUpdateTest) { // NOLINT
149150
if (add_delete_insert) {
150151
StringVectorWriter data_writer;
151152
BUSTUB_ENSURE(bustub->ExecuteSqlTxn(generate_select_sql(i), data_writer, txn), "cannot retrieve data");
153+
BUSTUB_ENSURE(data_writer.values_.size() == 1, "more than 1 row fetched??");
152154
const auto b_val = std::stoi(data_writer.values_[0][0]);
153155
BUSTUB_ENSURE(bustub->ExecuteSqlTxn(generate_delete_sql(i), data_writer, txn), "cannot delete data");
154156
BUSTUB_ENSURE(bustub->ExecuteSqlTxn(generate_txn_insert_sql(b_val, i), data_writer, txn),
@@ -190,6 +192,99 @@ TEST(TxnIndexTest, DISABLED_IndexConcurrentUpdateTest) { // NOLINT
190192
}
191193
}
192194

195+
TEST(TxnIndexTest, DISABLED_IndexConcurrentUpdateAbortTest) { // NOLINT
196+
const auto generate_sql = [](int n) -> std::string {
197+
return fmt::format("UPDATE maintable SET b = b + {} WHERE a = {}", 1, n);
198+
};
199+
const int thread_cnt = 8;
200+
const int number_cnt = 5;
201+
const auto generate_insert_sql = [](int n) -> std::string {
202+
std::vector<std::string> data;
203+
data.reserve(n);
204+
for (int i = 0; i < n; i++) {
205+
data.push_back(fmt::format("({}, {})", i, 0));
206+
}
207+
return fmt::format("INSERT INTO maintable VALUES {}", fmt::join(data, ","));
208+
};
209+
const int trials = 10;
210+
const int operation_cnt = 100;
211+
for (int n = 0; n < trials; n++) {
212+
auto bustub = std::make_unique<BustubInstance>();
213+
EnsureIndexScan(*bustub);
214+
Execute(*bustub, "CREATE TABLE maintable(a int primary key, b int)");
215+
std::vector<std::thread> update_threads;
216+
Execute(*bustub, generate_insert_sql(number_cnt), false);
217+
TableHeapEntryNoMoreThan(*bustub, bustub->catalog_->GetTable("maintable"), number_cnt);
218+
update_threads.reserve(thread_cnt);
219+
std::map<int, std::vector<int>> operation_result;
220+
std::mutex result_mutex;
221+
fmt::println(stderr, "trial {}: running with {} threads with {} rows ", n + 1, thread_cnt, number_cnt);
222+
global_disable_execution_exception_print.store(true);
223+
for (int thread = 0; thread < thread_cnt; thread++) {
224+
update_threads.emplace_back([&bustub, thread, generate_sql, &result_mutex, &operation_result]() {
225+
NoopWriter writer;
226+
std::vector<int> result(number_cnt, 0);
227+
std::random_device dev;
228+
std::mt19937 rng(dev());
229+
std::uniform_int_distribution<std::mt19937::result_type> dist(0, number_cnt - 1);
230+
for (int i = 0; i < operation_cnt; i++) {
231+
int x = 0;
232+
int y = 0;
233+
do {
234+
x = dist(rng);
235+
y = dist(rng);
236+
} while (x == y);
237+
auto *txn = bustub->txn_manager_->Begin();
238+
auto sql = generate_sql(x);
239+
if (!bustub->ExecuteSqlTxn(sql, writer, txn)) {
240+
bustub->txn_manager_->Abort(txn);
241+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
242+
continue;
243+
}
244+
sql = generate_sql(y);
245+
if (!bustub->ExecuteSqlTxn(sql, writer, txn)) {
246+
bustub->txn_manager_->Abort(txn);
247+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
248+
continue;
249+
}
250+
BUSTUB_ENSURE(bustub->txn_manager_->Commit(txn), "cannot commit??");
251+
result[x] += 1;
252+
result[y] += 1;
253+
}
254+
{
255+
std::lock_guard<std::mutex> lck(result_mutex);
256+
operation_result.emplace(thread, std::move(result));
257+
}
258+
});
259+
}
260+
for (auto &&thread : update_threads) {
261+
thread.join();
262+
}
263+
global_disable_execution_exception_print.store(false);
264+
std::vector<std::vector<int>> expected_rows;
265+
for (int i = 0; i < number_cnt; i++) {
266+
int total = 0;
267+
for (int j = 0; j < thread_cnt; j++) {
268+
total += operation_result[j][i];
269+
}
270+
expected_rows.push_back({i, total});
271+
if (total < 10) {
272+
fmt::println(stderr, "abort rate too high, {} txn succeeded", total);
273+
std::terminate();
274+
}
275+
}
276+
auto *table_info = bustub->catalog_->GetTable("maintable");
277+
auto query_txn = BeginTxn(*bustub, "query_txn");
278+
WithTxn(query_txn, QueryShowResult(*bustub, _var, _txn, "SELECT * FROM maintable", expected_rows));
279+
TableHeapEntryNoMoreThan(*bustub, table_info, number_cnt);
280+
if (n >= trials - 2) {
281+
SimpleStreamWriter writer(std::cerr);
282+
fmt::println(stderr, "--- the following data might be manually inspected by TAs ---");
283+
bustub->ExecuteSqlTxn("SELECT * FROM maintable", writer, query_txn);
284+
}
285+
}
286+
}
287+
193288
// NOLINTEND(bugprone-unchecked-optional-access))
194289

195290
} // namespace bustub

0 commit comments

Comments
 (0)