Skip to content

Commit 5a4ab93

Browse files
kirillvasilenkoKirill Vasilenko
andauthored
Move the breaking of the conflicting tx locks from CommitOnComplete to CommitOnExecute (#26206)
Co-authored-by: Kirill Vasilenko <[email protected]>
1 parent 0bb28a5 commit 5a4ab93

File tree

3 files changed

+37
-16
lines changed

3 files changed

+37
-16
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
118118
pathIds.emplace(op->GetPathId().InternalPathId);
119119
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
120120
if (op->GetBehaviour() != EOperationBehaviour::NoTxWrite || Self->GetOperationsManager().HasReadLocks(writeMeta.GetPathId().InternalPathId)) {
121+
// detect active reads which the given tx has conflicts with
121122
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(
122123
writeMeta.GetPathId().InternalPathId, writeResult.GetPKBatchVerified(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
123124
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
@@ -127,9 +128,11 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
127128
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
128129
auto& granule = index.MutableGranuleVerified(Pack.GetPathId());
129130
for (auto&& portion : Pack.GetPortions()) {
131+
// make the portions visible as uncommitted for reads
130132
granule.InsertPortionOnComplete(portion.GetPortionInfoPtr(), index);
131133
}
132134
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
135+
// make the portions visible as committed for reads
133136
for (auto&& i : InsertWriteIds) {
134137
granule.CommitPortionOnComplete(i, index);
135138
}
@@ -139,7 +142,10 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
139142
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
140143
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
141144
AFL_VERIFY(CommitSnapshot);
145+
// No tx writes (bulk upsert) must break decent/proper txs.
146+
// Decent/proper txs asked for serializable, so we have to give them serializable.
142147
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
148+
Self->OperationsManager->BreakConflictingTxs(op->GetLockId());
143149
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), *CommitSnapshot);
144150
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
145151
} else {

ydb/core/tx/columnshard/operations/manager.cpp

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,25 @@ bool TOperationsManager::Load(NTabletFlatExecutor::TTransactionContext& txc) {
7171
return true;
7272
}
7373

74+
void TOperationsManager::BreakConflictingTxs(const TLockFeatures& lock) {
75+
for (auto&& i : lock.GetBrokeOnCommit()) {
76+
if (auto lockNotify = GetLockOptional(i)) {
77+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("broken_lock_id", i);
78+
lockNotify->SetBroken();
79+
}
80+
}
81+
for (auto&& i : lock.GetNotifyOnCommit()) {
82+
if (auto lockNotify = GetLockOptional(i)) {
83+
lockNotify->AddNotifyCommit(lock.GetLockId());
84+
}
85+
}
86+
}
87+
88+
void TOperationsManager::BreakConflictingTxs(const ui64 txId) {
89+
auto& lock = GetLockFeaturesForTxVerified(txId);
90+
BreakConflictingTxs(lock);
91+
}
92+
7493
void TOperationsManager::CommitTransactionOnExecute(
7594
TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) {
7695
auto& lock = GetLockFeaturesForTxVerified(txId);
@@ -81,6 +100,9 @@ void TOperationsManager::CommitTransactionOnExecute(
81100
opPtr->CommitOnExecute(owner, txc, snapshot);
82101
commited.emplace_back(opPtr);
83102
}
103+
104+
BreakConflictingTxs(lock);
105+
84106
OnTransactionFinishOnExecute(commited, lock, txId, txc);
85107
}
86108

@@ -89,18 +111,6 @@ void TOperationsManager::CommitTransactionOnComplete(
89111
auto& lock = GetLockFeaturesForTxVerified(txId);
90112
TLogContextGuard gLogging(
91113
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("commit_tx_id", txId)("commit_lock_id", lock.GetLockId()));
92-
for (auto&& i : lock.GetBrokeOnCommit()) {
93-
if (auto lockNotify = GetLockOptional(i)) {
94-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("broken_lock_id", i);
95-
lockNotify->SetBroken();
96-
}
97-
}
98-
99-
for (auto&& i : lock.GetNotifyOnCommit()) {
100-
if (auto lockNotify = GetLockOptional(i)) {
101-
lockNotify->AddNotifyCommit(lock.GetLockId());
102-
}
103-
}
104114

105115
TVector<TWriteOperation::TPtr> commited;
106116
for (auto&& opPtr : lock.GetWriteOperations()) {
@@ -317,10 +327,12 @@ void TOperationsManager::AddEventForLock(
317327
NOlap::NTxInteractions::TTxConflicts txConflicts;
318328
auto& txLock = GetLockVerified(lockId);
319329
writer->CheckInteraction(lockId, InteractionsContext, txConflicts, txNotifications);
320-
for (auto&& i : txConflicts) {
321-
if (GetLockOptional(i.first)) {
322-
GetLockVerified(i.first).AddBrokeOnCommit(i.second);
323-
} else if (txLock.IsCommitted(i.first)) {
330+
for (auto& [commitLockId, breakLockIds] : txConflicts) {
331+
if (GetLockOptional(commitLockId)) {
332+
GetLockVerified(commitLockId).AddBrokeOnCommit(breakLockIds);
333+
// if txId not found, it means the conflicting tx is already committed or aborted
334+
} else if (txLock.IsCommitted(commitLockId)) {
335+
// if the conflicting tx is already committed, we cannot commit the given tx, so break its lock
324336
txLock.SetBroken();
325337
}
326338
}

ydb/core/tx/columnshard/operations/manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ class TOperationsManager {
236236
void AbortLockOnExecute(TColumnShard& owner, const ui64 lockId, NTabletFlatExecutor::TTransactionContext& txc);
237237
void AbortLockOnComplete(TColumnShard& owner, const ui64 lockId);
238238

239+
void BreakConflictingTxs(const TLockFeatures& lock);
240+
void BreakConflictingTxs(const ui64 txId);
241+
239242
std::optional<ui64> GetLockForTx(const ui64 txId) const;
240243
std::optional<ui64> GetLockForTxOptional(const ui64 txId) const {
241244
return GetLockForTx(txId);

0 commit comments

Comments
 (0)