@@ -1479,7 +1479,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
14791479 }
14801480
14811481 if (partSwitch.FollowerUpdateStep ) {
1482- auto subset = Database->Subset (partSwitch.TableId , partSwitch.Leaving , partSwitch.Head );
1482+ auto subset = Database->PartSwitchSubset (partSwitch.TableId , partSwitch.Head , partSwitch. Leaving , partSwitch.LeavingTxStatus );
14831483
14841484 if (partSwitch.Head != subset->Head ) {
14851485 Y_ABORT (" Follower table epoch head has diverged from leader" );
@@ -1488,30 +1488,36 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
14881488 }
14891489
14901490 Y_ABORT_UNLESS (newColdParts.empty (), " Unexpected cold part at a follower" );
1491- Database->Replace (partSwitch.TableId , std::move (newParts), *subset);
1492- Database->ReplaceTxStatus (partSwitch.TableId , std::move (newTxStatus), *subset);
1491+ Database->Replace (partSwitch.TableId , *subset, std::move (newParts), std::move (newTxStatus));
14931492
14941493 for (auto &gone : subset->Flatten )
14951494 DropCachesOfBundle (*gone);
14961495
14971496 Send (Owner->Tablet (), new TEvTablet::TEvFGcAck (Owner->TabletID (), Generation (), partSwitch.FollowerUpdateStep ));
14981497 } else {
1498+ bool merged = false ;
14991499 for (auto &partView : newParts) {
15001500 Database->Merge (partSwitch.TableId , partView);
1501+ merged = true ;
15011502
15021503 if (CompactionLogic) {
15031504 CompactionLogic->BorrowedPart (partSwitch.TableId , std::move (partView));
15041505 }
15051506 }
15061507 for (auto &part : newColdParts) {
15071508 Database->Merge (partSwitch.TableId , part);
1509+ merged = true ;
15081510
15091511 if (CompactionLogic) {
15101512 CompactionLogic->BorrowedPart (partSwitch.TableId , std::move (part));
15111513 }
15121514 }
15131515 for (auto &txStatus : newTxStatus) {
15141516 Database->Merge (partSwitch.TableId , txStatus);
1517+ merged = true ;
1518+ }
1519+ if (merged) {
1520+ Database->MergeDone (partSwitch.TableId );
15151521 }
15161522 }
15171523
@@ -1533,7 +1539,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
15331539 // N.B. there should be a single source table per part switch
15341540 for (auto & [sourceTable, state] : perTable) {
15351541 // Rebase source parts to their respective new epochs
1536- auto srcSubset = Database->Subset (sourceTable, state. Bundles , NTable::TEpoch::Zero ());
1542+ auto srcSubset = Database->PartSwitchSubset (sourceTable, NTable::TEpoch::Zero (), state. Bundles , { } );
15371543 TVector<NTable::TPartView> rebased (Reserve (srcSubset->Flatten .size ()));
15381544 for (const auto & partView : srcSubset->Flatten ) {
15391545 Y_ABORT_UNLESS (!partView->TxIdStats , " Cannot move parts with uncommitted deltas" );
@@ -1542,7 +1548,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
15421548 }
15431549
15441550 // Remove source parts from the source table
1545- Database->Replace (sourceTable, { }, *srcSubset );
1551+ Database->Replace (sourceTable, *srcSubset, { }, { } );
15461552
15471553 if (CompactionLogic) {
15481554 CompactionLogic->RemovedParts (sourceTable, state.Bundles );
@@ -1557,6 +1563,8 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
15571563 }
15581564 }
15591565 }
1566+
1567+ Database->MergeDone (partSwitch.TableId );
15601568 }
15611569}
15621570
@@ -2310,7 +2318,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
23102318 }
23112319
23122320 // Remove source parts from the source table
2313- Database->Replace (src, { }, *srcSubset );
2321+ Database->Replace (src, *srcSubset, { }, { } );
23142322
23152323 const auto logicResult = CompactionLogic->RemovedParts (src, labels);
23162324
@@ -2342,6 +2350,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
23422350 Database->Merge (dst, partView);
23432351 CompactionLogic->BorrowedPart (dst, partView);
23442352 }
2353+ Database->MergeDone (dst);
23452354
23462355 // Serialize rebased parts as moved from the source table
23472356 NKikimrExecutorFlat::TTablePartSwitch proto;
@@ -3050,7 +3059,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable::
30503059 TAutoPtr<NTable::TSubset> subset;
30513060
30523061 if (params) {
3053- subset = Database->Subset (table, { }, params->Edge .Head );
3062+ subset = Database->CompactionSubset (table, params->Edge .Head , { } );
30543063
30553064 if (params->Parts ) {
30563065 subset->Flatten .insert (subset->Flatten .end (), params->Parts .begin (), params->Parts .end ());
@@ -3397,8 +3406,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
33973406 newParts.emplace_back (result.Part );
33983407 }
33993408
3400- Database->Replace (tableId, newParts, *ops->Subset );
3401- Database->ReplaceTxStatus (tableId, newTxStatus, *ops->Subset );
3409+ Database->Replace (tableId, *ops->Subset , newParts, newTxStatus);
34023410
34033411 TVector<TLogoBlobID> bundles (Reserve (ops->Subset ->Flatten .size () + ops->Subset ->ColdParts .size ()));
34043412 for (auto &part: ops->Subset ->Flatten ) {
@@ -4525,23 +4533,28 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
45254533 if (!memTableSnapshot->GetCommittedTransactions ().empty () || !memTableSnapshot->GetRemovedTransactions ().empty ()) {
45264534 // We must compact tx status when mem table has changes
45274535 compactTxStatus = true ;
4536+ break ;
45284537 }
45294538 }
45304539 for (const auto & txStatus : snapshot->Subset ->TxStatus ) {
45314540 if (txStatus->Label .TabletID () != Owner->TabletID ()) {
45324541 // We want to compact borrowed tx status
45334542 compactTxStatus = true ;
4543+ break ;
45344544 }
45354545 }
4546+ if (snapshot->Subset ->TxStatus && snapshot->Subset ->GarbageTransactions ) {
4547+ // We want to remove garbage transactions
4548+ compactTxStatus = true ;
4549+ }
45364550
45374551 if (compactTxStatus) {
4538- comp->CommittedTransactions = snapshot->Subset ->CommittedTransactions ;
4539- comp->RemovedTransactions = snapshot->Subset ->RemovedTransactions ;
45404552 comp->Frozen .reserve (snapshot->Subset ->Frozen .size ());
45414553 for (auto & memTableSnapshot : snapshot->Subset ->Frozen ) {
45424554 comp->Frozen .push_back (memTableSnapshot.MemTable );
45434555 }
45444556 comp->TxStatus = snapshot->Subset ->TxStatus ;
4557+ comp->GarbageTransactions = snapshot->Subset ->GarbageTransactions ;
45454558 } else {
45464559 // We are not compacting tx status, avoid deleting current blobs
45474560 snapshot->Subset ->TxStatus .clear ();
0 commit comments