Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/executor/operator/physical_flush_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ void PhysicalFlush::FlushCatalog(QueryContext *query_context, OperatorState *ope

void PhysicalFlush::FlushData(QueryContext *query_context, OperatorState *operator_state) {
auto *wal_manager = query_context->storage()->wal_manager();
if (wal_manager->IsCheckpointing()) {
LOG_ERROR("There is a running checkpoint task, skip this checkpoint triggered by command");
Status status = Status::Checkpointing();
RecoverableError(status);
}

TxnTimeStamp max_commit_ts{};
i64 wal_size{};
std::tie(max_commit_ts, wal_size) = wal_manager->GetCommitState();
Expand Down
2 changes: 1 addition & 1 deletion src/storage/bg_task/periodic_trigger_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void CheckpointPeriodicTrigger::Trigger() {
LOG_DEBUG("No write txn after last checkpoint");
return;
}
if (wal_manager->IsCheckpointing()) {
if (new_txn_mgr->IsCheckpointing()) {
LOG_INFO("There is a running checkpoint task, skip this checkpoint triggered by periodic timer");
return;
}
Expand Down
20 changes: 5 additions & 15 deletions src/storage/new_txn/new_txn_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1625,14 +1625,6 @@ Status NewTxn::Checkpoint(TxnTimeStamp last_ckp_ts, bool auto_checkpoint) {
return Status::OK();
}

auto *wal_manager = InfinityContext::instance().storage()->wal_manager();
if (!wal_manager->SetCheckpointing()) {
// Checkpointing
LOG_INFO(fmt::format("checkpoint ts: {} skipped due to the system is checkpointing.", checkpoint_ts));
return Status::OK();
}
DeferFn defer([&] { wal_manager->UnsetCheckpoint(); });

std::vector<std::string> *db_id_strs_ptr;
std::vector<std::string> *db_names_ptr;
CatalogMeta catalog_meta(this);
Expand Down Expand Up @@ -5426,20 +5418,18 @@ Status NewTxn::CheckpointforSnapshot(TxnTimeStamp last_ckp_ts, CheckpointTxnStor
current_ckp_ts_ = checkpoint_ts;
LOG_INFO(fmt::format("checkpoint ts for snapshot: {}", current_ckp_ts_));

if (!txn_mgr_->SetCheckpointBeginTS(checkpoint_ts)) {
LOG_ERROR(fmt::format("Create snapshot with txn: {} is conflicted with another checkpoint transaction.", this->TxnID()));
return Status::Checkpointing();
}

if (last_ckp_ts > 0 and last_ckp_ts + 2 >= checkpoint_ts) {
// last checkpoint ts: last checkpoint txn begin ts. checkpoint is the begin_ts of current txn
txn_context_ptr_->txn_type_ = TransactionType::kSkippedCheckpoint;
LOG_INFO(fmt::format("Last checkpoint ts {}, this checkpoint begin ts: {}, SKIP CHECKPOINT", last_ckp_ts, checkpoint_ts));
return Status::OK();
}

auto *wal_manager = InfinityContext::instance().storage()->wal_manager();
if (!wal_manager->SetCheckpointing()) {
LOG_ERROR(fmt::format("Create snapshot with txn: {} is conflicted with another checkpoint transaction.", this->TxnID()));
return Status::Checkpointing();
}
DeferFn defer([&] { wal_manager->UnsetCheckpoint(); });

std::vector<std::string> *db_id_strs_ptr;
std::vector<std::string> *db_names_ptr;
CatalogMeta catalog_meta(this);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/new_txn/new_txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public:

void UpdateTxnBeginTSAndKVInstance(NewTxn *txn);

bool SetCheckpointBeginTS(TxnTimeStamp checkpoint_ts);

bool IsCheckpointing() const;

private:
mutable std::mutex locker_{};
Storage *storage_{};
Expand Down
20 changes: 18 additions & 2 deletions src/storage/new_txn/new_txn_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ Status NewTxnManager::CommitTxn(NewTxn *txn, TxnTimeStamp *commit_ts_ptr) {
}
if (status.ok()) {
TransactionType txn_type = txn->GetTxnType();
if (txn_type == TransactionType::kNewCheckpoint or txn_type == TransactionType::kSkippedCheckpoint) {
if (txn_type == TransactionType::kNewCheckpoint or txn_type == TransactionType::kSkippedCheckpoint or
txn_type == TransactionType::kCreateDB) {
std::lock_guard guard(locker_);
ckp_begin_ts_ = UNCOMMIT_TS;
}
Expand All @@ -342,7 +343,8 @@ Status NewTxnManager::RollBackTxn(NewTxn *txn) {
Status status = txn->Rollback();
if (status.ok()) {
TransactionType txn_type = txn->GetTxnType();
if (txn_type == TransactionType::kNewCheckpoint or txn_type == TransactionType::kSkippedCheckpoint) {
if (txn_type == TransactionType::kNewCheckpoint or txn_type == TransactionType::kSkippedCheckpoint or
txn_type == TransactionType::kCreateTableSnapshot) {
std::lock_guard guard(locker_);
ckp_begin_ts_ = UNCOMMIT_TS;
}
Expand Down Expand Up @@ -900,6 +902,20 @@ void NewTxnManager::UpdateTxnBeginTSAndKVInstance(NewTxn *txn) {
txn->UpdateKVInstance(kv_store_->GetInstance());
}

bool NewTxnManager::SetCheckpointBeginTS(TxnTimeStamp checkpoint_ts) {
std::lock_guard guard(locker_);
if (ckp_begin_ts_ == UNCOMMIT_TS) {
ckp_begin_ts_ = checkpoint_ts;
return true;
}
return false;
}

bool NewTxnManager::IsCheckpointing() const {
std::lock_guard guard(locker_);
return ckp_begin_ts_ != UNCOMMIT_TS;
}

void NewTxnManager::CollectInfo(NewTxn *txn) {
switch (txn->GetTxnType()) {
case TransactionType::kNewCheckpoint: {
Expand Down
4 changes: 0 additions & 4 deletions src/storage/wal/wal_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ public:

void FlushLogByReplication(const std::vector<std::string> &synced_logs, bool on_startup);

bool SetCheckpointing();
bool UnsetCheckpoint();
bool IsCheckpointing() const;

void SwapWalFile(TxnTimeStamp max_commit_ts, bool error_if_duplicate);

std::string GetWalFilename() const;
Expand Down
18 changes: 0 additions & 18 deletions src/storage/wal/wal_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,24 +436,6 @@ void WalManager::FlushLogByReplication(const std::vector<std::string> &synced_lo
ofs_.flush();
}

bool WalManager::SetCheckpointing() {
bool expect = false;
if (checkpoint_in_progress_.compare_exchange_strong(expect, true)) {
return true;
}
return false;
}

bool WalManager::UnsetCheckpoint() {
bool expect = true;
if (checkpoint_in_progress_.compare_exchange_strong(expect, false)) {
return true;
}
return false;
}

bool WalManager::IsCheckpointing() const { return checkpoint_in_progress_; }

/*****************************************************************************
* CHECKPOINT WAL FILE
*****************************************************************************/
Expand Down
23 changes: 23 additions & 0 deletions src/unit_test/storage/new_catalog/checkpoint_internal_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,3 +831,26 @@ TEST_P(TestTxnCheckpointInternalTest, test_checkpoint5) {
EXPECT_TRUE(status.ok());
}
}

TEST_P(TestTxnCheckpointInternalTest, test_concurrent_checkpoint) {
std::shared_ptr<std::string> db_name = std::make_shared<std::string>("db1");

{
auto *txn = new_txn_mgr->BeginTxn(std::make_unique<std::string>("create db"), TransactionType::kCreateDB);
Status status = txn->CreateDatabase(*db_name, ConflictType::kError, std::make_shared<std::string>());
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn);
EXPECT_TRUE(status.ok());
}

{
auto *txn1 = new_txn_mgr->BeginTxn(std::make_unique<std::string>("checkpoint"), TransactionType::kNewCheckpoint);
auto *txn2 = new_txn_mgr->BeginTxn(std::make_unique<std::string>("checkpoint"), TransactionType::kNewCheckpoint);
EXPECT_EQ(txn2, nullptr); // Another checkpoint txn is started.

Status status = txn1->Checkpoint(wal_manager_->LastCheckpointTS(), false);
EXPECT_TRUE(status.ok());
status = new_txn_mgr->CommitTxn(txn1);
EXPECT_TRUE(status.ok());
}
}
Loading