Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit 6591523

Browse files
Nov11apavlo
authored andcommitted
fix #1313 Remove read only isolation level (#1322)
* remove 'read_only' from isolation level and add a new 'read only' flag in transaction context * set flag when starting a read only txn * add missing isolation level parameter * fix serializalbe txn test - readonly txn failure. 'read only' is actually of snapshot isolation level in the previous implementation * fix spacing, add comment and remove wrapper function definition * add a test case of two concurrent txns with one read-only one * fix duplicate assertion * add comments to test case * change parameter name * update read only snapshot test case to a more reasonable one and add some comments * wait one epoch before updating snapshot epoch. make test case more reliable.
1 parent 6cdca15 commit 6591523

File tree

10 files changed

+122
-43
lines changed

10 files changed

+122
-43
lines changed

src/common/internal_types.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2403,9 +2403,6 @@ std::string IsolationLevelTypeToString(IsolationLevelType type) {
24032403
case IsolationLevelType::READ_COMMITTED: {
24042404
return "READ_COMMITTED";
24052405
}
2406-
case IsolationLevelType::READ_ONLY: {
2407-
return "READ_ONLY";
2408-
}
24092406
default: {
24102407
throw ConversionException(StringUtil::Format(
24112408
"No string conversion for IsolationLevelType value '%d'",
@@ -2427,8 +2424,6 @@ IsolationLevelType StringToIsolationLevelType(const std::string &str) {
24272424
return IsolationLevelType::REPEATABLE_READS;
24282425
} else if (upper_str == "READ_COMMITTED") {
24292426
return IsolationLevelType::READ_COMMITTED;
2430-
} else if (upper_str == "READ_ONLY") {
2431-
return IsolationLevelType::READ_ONLY;
24322427
} else {
24332428
throw ConversionException(
24342429
StringUtil::Format("No IsolationLevelType conversion from string '%s'",

src/concurrency/timestamp_ordering_transaction_manager.cpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ bool TimestampOrderingTransactionManager::PerformRead(
178178
//////////////////////////////////////////////////////////
179179
//// handle READ_ONLY
180180
//////////////////////////////////////////////////////////
181-
if (current_txn->GetIsolationLevel() == IsolationLevelType::READ_ONLY) {
181+
if (current_txn->IsReadOnly()) {
182182
// do not update read set for read-only transactions.
183183
return true;
184184
} // end READ ONLY
@@ -437,8 +437,7 @@ bool TimestampOrderingTransactionManager::PerformRead(
437437
void TimestampOrderingTransactionManager::PerformInsert(
438438
TransactionContext *const current_txn, const ItemPointer &location,
439439
ItemPointer *index_entry_ptr) {
440-
PELOTON_ASSERT(current_txn->GetIsolationLevel() !=
441-
IsolationLevelType::READ_ONLY);
440+
PELOTON_ASSERT(!current_txn->IsReadOnly());
442441

443442
oid_t tile_group_id = location.block;
444443
oid_t tuple_id = location.offset;
@@ -477,8 +476,7 @@ void TimestampOrderingTransactionManager::PerformInsert(
477476
void TimestampOrderingTransactionManager::PerformUpdate(
478477
TransactionContext *const current_txn, const ItemPointer &location,
479478
const ItemPointer &new_location) {
480-
PELOTON_ASSERT(current_txn->GetIsolationLevel() !=
481-
IsolationLevelType::READ_ONLY);
479+
PELOTON_ASSERT(!current_txn->IsReadOnly());
482480

483481
ItemPointer old_location = location;
484482

@@ -562,8 +560,7 @@ void TimestampOrderingTransactionManager::PerformUpdate(
562560
void TimestampOrderingTransactionManager::PerformUpdate(
563561
TransactionContext *const current_txn UNUSED_ATTRIBUTE,
564562
const ItemPointer &location) {
565-
PELOTON_ASSERT(current_txn->GetIsolationLevel() !=
566-
IsolationLevelType::READ_ONLY);
563+
PELOTON_ASSERT(!current_txn->IsReadOnly());
567564

568565
oid_t tile_group_id = location.block;
569566
UNUSED_ATTRIBUTE oid_t tuple_id = location.offset;
@@ -596,8 +593,7 @@ void TimestampOrderingTransactionManager::PerformUpdate(
596593
void TimestampOrderingTransactionManager::PerformDelete(
597594
TransactionContext *const current_txn, const ItemPointer &location,
598595
const ItemPointer &new_location) {
599-
PELOTON_ASSERT(current_txn->GetIsolationLevel() !=
600-
IsolationLevelType::READ_ONLY);
596+
PELOTON_ASSERT(!current_txn->IsReadOnly());
601597

602598
ItemPointer old_location = location;
603599

@@ -684,8 +680,7 @@ void TimestampOrderingTransactionManager::PerformDelete(
684680

685681
void TimestampOrderingTransactionManager::PerformDelete(
686682
TransactionContext *const current_txn, const ItemPointer &location) {
687-
PELOTON_ASSERT(current_txn->GetIsolationLevel() !=
688-
IsolationLevelType::READ_ONLY);
683+
PELOTON_ASSERT(!current_txn->IsReadOnly());
689684

690685
oid_t tile_group_id = location.block;
691686
oid_t tuple_id = location.offset;
@@ -725,7 +720,7 @@ ResultType TimestampOrderingTransactionManager::CommitTransaction(
725720
//////////////////////////////////////////////////////////
726721
//// handle READ_ONLY
727722
//////////////////////////////////////////////////////////
728-
if (current_txn->GetIsolationLevel() == IsolationLevelType::READ_ONLY) {
723+
if (current_txn->IsReadOnly()) {
729724
EndTransaction(current_txn);
730725
return ResultType::SUCCESS;
731726
}
@@ -914,8 +909,7 @@ ResultType TimestampOrderingTransactionManager::CommitTransaction(
914909
ResultType TimestampOrderingTransactionManager::AbortTransaction(
915910
TransactionContext *const current_txn) {
916911
// a pre-declared read-only transaction will never abort.
917-
PELOTON_ASSERT(current_txn->GetIsolationLevel() !=
918-
IsolationLevelType::READ_ONLY);
912+
PELOTON_ASSERT(!current_txn->IsReadOnly());
919913

920914
LOG_TRACE("Aborting peloton txn : %" PRId64, current_txn->GetTransactionId());
921915
auto &manager = catalog::Manager::GetInstance();

src/concurrency/transaction_manager.cpp

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,10 @@ ConflictAvoidanceType TransactionManager::conflict_avoidance_ =
3131
ConflictAvoidanceType::ABORT;
3232

3333
TransactionContext *TransactionManager::BeginTransaction(
34-
const size_t thread_id, const IsolationLevelType type) {
34+
const size_t thread_id, const IsolationLevelType type, bool read_only) {
3535
TransactionContext *txn = nullptr;
3636

37-
if (type == IsolationLevelType::READ_ONLY) {
38-
// transaction processing with decentralized epoch manager
39-
cid_t read_id = EpochManagerFactory::GetInstance().EnterEpoch(
40-
thread_id, TimestampType::SNAPSHOT_READ);
41-
txn = new TransactionContext(thread_id, type, read_id);
42-
43-
} else if (type == IsolationLevelType::SNAPSHOT) {
37+
if (type == IsolationLevelType::SNAPSHOT) {
4438
// transaction processing with decentralized epoch manager
4539
// the DBMS must acquire
4640
cid_t read_id = EpochManagerFactory::GetInstance().EnterEpoch(
@@ -66,6 +60,10 @@ TransactionContext *TransactionManager::BeginTransaction(
6660
txn = new TransactionContext(thread_id, type, read_id);
6761
}
6862

63+
if (read_only) {
64+
txn->SetReadOnly();
65+
}
66+
6967
if (static_cast<StatsType>(settings::SettingsManager::GetInt(
7068
settings::SettingId::stats_mode)) != StatsType::INVALID) {
7169
stats::BackendStatsContext::GetInstance()

src/gc/transaction_level_gc_manager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void TransactionLevelGCManager::RecycleTransaction(
9191

9292
epoch_manager.ExitEpoch(txn->GetThreadId(), txn->GetEpochId());
9393

94-
if (txn->GetIsolationLevel() != IsolationLevelType::READ_ONLY &&
94+
if (!txn->IsReadOnly() && \
9595
txn->GetResult() != ResultType::SUCCESS && txn->IsGCSetEmpty() != true) {
9696
txn->SetEpochId(epoch_manager.GetNextEpochId());
9797
}
@@ -146,7 +146,7 @@ int TransactionLevelGCManager::Unlink(const int &thread_id,
146146

147147
// Deallocate the Transaction Context of transactions that don't involve
148148
// any garbage collection
149-
if (txn_ctx->GetIsolationLevel() == IsolationLevelType::READ_ONLY ||
149+
if (txn_ctx->IsReadOnly() || \
150150
txn_ctx->IsGCSetEmpty()) {
151151
delete txn_ctx;
152152
continue;

src/include/common/internal_types.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,6 @@ enum class IsolationLevelType {
422422
SNAPSHOT = 2, // snapshot isolation
423423
REPEATABLE_READS = 3, // repeatable reads
424424
READ_COMMITTED = 4, // read committed
425-
READ_ONLY = 5 // read only
426425
};
427426
std::string IsolationLevelTypeToString(IsolationLevelType type);
428427
IsolationLevelType StringToIsolationLevelType(const std::string &str);

src/include/concurrency/transaction_context.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,16 @@ class TransactionContext : public Printable {
262262
*
263263
* @return True if read only, False otherwise.
264264
*/
265-
inline bool IsReadOnly() const {
266-
return is_written_ == false && insert_count_ == 0;
265+
bool IsReadOnly() const {
266+
return read_only_;
267+
}
268+
269+
/**
270+
* @brief mark this context as read only
271+
*
272+
*/
273+
void SetReadOnly() {
274+
read_only_ = true;
267275
}
268276

269277
/**
@@ -335,6 +343,9 @@ class TransactionContext : public Printable {
335343
IsolationLevelType isolation_level_;
336344

337345
std::unique_ptr<trigger::TriggerSet> on_commit_triggers_;
346+
347+
/** one default transaction is NOT 'read only' unless it is marked 'read only' explicitly*/
348+
bool read_only_ = false;
338349
};
339350

340351
} // namespace concurrency

src/include/concurrency/transaction_manager.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,12 @@ class TransactionManager {
219219
}
220220

221221
TransactionContext *BeginTransaction(const IsolationLevelType type) {
222-
return BeginTransaction(0, type);
222+
return BeginTransaction(0, type, false);
223223
}
224224

225225
TransactionContext *BeginTransaction(const size_t thread_id = 0,
226-
const IsolationLevelType type = isolation_level_);
226+
const IsolationLevelType type = isolation_level_,
227+
bool read_only = false);
227228

228229
/**
229230
* @brief Ends a transaction.

test/common/internal_types_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ TEST_F(InternalTypesTests, IsolationLevelTypeTest) {
646646
std::vector<IsolationLevelType> list = {
647647
IsolationLevelType::INVALID, IsolationLevelType::SERIALIZABLE,
648648
IsolationLevelType::SNAPSHOT, IsolationLevelType::REPEATABLE_READS,
649-
IsolationLevelType::READ_COMMITTED, IsolationLevelType::READ_ONLY};
649+
IsolationLevelType::READ_COMMITTED};
650650

651651
// Make sure that ToString and FromString work
652652
for (auto val : list) {

test/concurrency/serializable_transaction_test.cpp

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,96 @@ TEST_F(SerializableTransactionTests, ReadOnlyTransactionTest) {
6464
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
6565
// Just scan the table
6666
{
67+
//same as 'ConcurrentReadOnlyTransactionTest'
68+
thread_pool.Initialize(0, CONNECTION_THREAD_COUNT + 3);
6769
concurrency::EpochManagerFactory::GetInstance().Reset();
70+
concurrency::EpochManagerFactory::GetInstance().StartEpoch();
71+
gc::GCManagerFactory::Configure();
72+
gc::GCManagerFactory::GetInstance().StartGC();
73+
74+
//this consists of 2 txns. 1.catalog creation 2.test table creation
6875
storage::DataTable *table = TestingTransactionUtil::CreateTable();
69-
70-
TransactionScheduler scheduler(1, table, &txn_manager, true);
76+
77+
//manually update snapshot epoch number, so later snapshot read must get a larger epoch than table creating txn
78+
//or it may read nothing
79+
//wait one epoch. so that global epoch is guaranteed to increase
80+
std::this_thread::sleep_for(std::chrono::milliseconds(EPOCH_LENGTH));
81+
concurrency::EpochManagerFactory::GetInstance().GetExpiredEpochId();
82+
83+
TransactionScheduler scheduler(1, table, &txn_manager, {0});
7184
scheduler.Txn(0).Scan(0);
7285
scheduler.Txn(0).Commit();
7386

7487
scheduler.Run();
7588

76-
// Snapshot read cannot read the recent insert
77-
EXPECT_EQ(0, scheduler.schedules[0].results.size());
89+
//it should read all the 10 tuples
90+
EXPECT_EQ(10, scheduler.schedules[0].results.size());
91+
92+
gc::GCManagerFactory::GetInstance().StopGC();
93+
concurrency::EpochManagerFactory::GetInstance().StopEpoch();
94+
thread_pool.Shutdown();
95+
//reset to default value, other test cases
96+
gc::GCManagerFactory::Configure(0);
97+
}
98+
}
99+
}
100+
101+
// test r/w txn with a read-only txn runs concurrently
102+
TEST_F(SerializableTransactionTests, ConcurrentReadOnlyTransactionTest) {
103+
for (auto protocol_type : PROTOCOL_TYPES) {
104+
concurrency::TransactionManagerFactory::Configure(protocol_type, ISOLATION_LEVEL_TYPE);
105+
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
106+
// Txn #0 | Txn #1
107+
// ----------------
108+
// BEGIN |
109+
// W(X) |
110+
// | BEGIN R/O
111+
// | R(X)
112+
// W(X) |
113+
// COMMIT |
114+
// | R(X)
115+
// | COMMIT
116+
117+
{
118+
//if gc manager is active, finishing a txn will make the txn be removed from epoch list as well.
119+
//epoch manager needs this behavior to find the largest expired txn id.
120+
//that id is used to determine whether snapshot epoch falls behind and needs update.
121+
//gc and epoch manager both depend on thread pool
122+
thread_pool.Initialize(0, CONNECTION_THREAD_COUNT + 3);
123+
concurrency::EpochManagerFactory::GetInstance().Reset();
124+
concurrency::EpochManagerFactory::GetInstance().StartEpoch();
125+
gc::GCManagerFactory::Configure();
126+
gc::GCManagerFactory::GetInstance().StartGC();
127+
128+
//this contains 2 txns: 1.create catalog table 2.create test table
129+
storage::DataTable *table = TestingTransactionUtil::CreateTable();
130+
131+
//force snapshot epoch to be updated. it should be larger than table creation txn's epoch
132+
std::this_thread::sleep_for(std::chrono::milliseconds(EPOCH_LENGTH));
133+
concurrency::EpochManagerFactory::GetInstance().GetExpiredEpochId();
134+
135+
TransactionScheduler scheduler(2, table, &txn_manager, {1});
136+
scheduler.Txn(0).Update(0, 1);
137+
scheduler.Txn(1).Read(0);
138+
scheduler.Txn(0).Update(0, 2);
139+
scheduler.Txn(0).Commit();
140+
scheduler.Txn(1).Read(0);
141+
scheduler.Txn(1).Commit();
142+
143+
scheduler.Run();
144+
145+
EXPECT_EQ(ResultType::SUCCESS, scheduler.schedules[0].txn_result);
146+
EXPECT_EQ(ResultType::SUCCESS, scheduler.schedules[1].txn_result);
147+
148+
//read only txn should read the same snapshot that exists after table creation and before update txn commits
149+
EXPECT_EQ(0, scheduler.schedules[1].results[0]);
150+
EXPECT_EQ(0, scheduler.schedules[1].results[1]);
151+
152+
gc::GCManagerFactory::GetInstance().StopGC();
153+
concurrency::EpochManagerFactory::GetInstance().StopEpoch();
154+
thread_pool.Shutdown();
155+
//reset it to default value for test cases
156+
gc::GCManagerFactory::Configure(0);
78157
}
79158
}
80159
}

test/include/concurrency/testing_transaction_util.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
*
5555
* See isolation_level_test.cpp for examples.
5656
*/
57-
57+
#include <initializer_list>
5858
#include "catalog/schema.h"
5959
#include "common/harness.h"
6060
#include "concurrency/transaction_context.h"
@@ -241,7 +241,8 @@ class TransactionThread {
241241

242242
if (cur_seq == 0) {
243243
if (schedule->declared_ro == true) {
244-
txn = txn_manager->BeginTransaction(IsolationLevelType::READ_ONLY);
244+
/** starts a read only transaction*/
245+
txn = txn_manager->BeginTransaction(0, IsolationLevelType::SNAPSHOT, true);
245246
} else {
246247
txn = txn_manager->BeginTransaction();
247248
}
@@ -348,13 +349,14 @@ class TransactionScheduler {
348349
public:
349350
TransactionScheduler(size_t num_txn, storage::DataTable *datatable_,
350351
concurrency::TransactionManager *txn_manager_,
351-
bool first_as_ro = false)
352+
std::set<int> read_only_ = {})
352353
: txn_manager(txn_manager_),
353354
table(datatable_),
354355
time(0),
355356
concurrent(false) {
357+
356358
for (size_t i = 0; i < num_txn; i++) {
357-
if (first_as_ro && i == 0) {
359+
if (read_only_.find(i) != read_only_.end()) {
358360
schedules.emplace_back(i, true);
359361
} else {
360362
schedules.emplace_back(i, false);

0 commit comments

Comments
 (0)