Skip to content

Commit 03219af

Browse files
committed
Fixed topic tests
1 parent 2ab964a commit 03219af

File tree

4 files changed

+28
-34
lines changed

4 files changed

+28
-34
lines changed

src/client/query/client.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -858,8 +858,8 @@ class TTransaction::TImpl : public std::enable_shared_from_this<TImpl> {
858858

859859
private:
860860
bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
861-
std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
862-
std::vector<TOnFailureTransactionCallback> OnFailureCallbacks;
861+
mutable std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
862+
mutable std::vector<TOnFailureTransactionCallback> OnFailureCallbacks;
863863

864864
std::mutex PrecommitCallbacksMutex;
865865
std::mutex OnFailureCallbacksMutex;

src/client/topic/impl/read_session_impl.ipp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2183,6 +2183,9 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TrySubscribeOnTransact
21832183
return;
21842184
}
21852185

2186+
txInfo->IsActive = true;
2187+
txInfo->Subscribed = true;
2188+
21862189
auto callback = [cbContext = this->SelfContext, txId, txInfo, consumer = Settings.ConsumerName_, client]() {
21872190
std::vector<TTopicOffsets> offsets;
21882191

@@ -2205,9 +2208,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TrySubscribeOnTransact
22052208
};
22062209

22072210
tx.AddPrecommitCallback(std::move(callback));
2208-
2209-
txInfo->IsActive = true;
2210-
txInfo->Subscribed = true;
22112211
}
22122212
}
22132213

src/client/topic/impl/write_session_impl.cpp

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -551,34 +551,34 @@ void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransactionBase* tx)
551551
return;
552552
}
553553

554+
txInfo->AllAcksReceived = NThreading::NewPromise<TStatus>();
554555
txInfo->IsActive = true;
555556
txInfo->Subscribed = true;
556-
txInfo->AllAcksReceived = NThreading::NewPromise<TStatus>();
557-
}
558-
559-
auto callback = [cbContext = this->SelfContext, txId, txInfo]() {
560-
with_lock(txInfo->Lock) {
561-
Y_ABORT_UNLESS(!txInfo->CommitCalled);
562557

563-
txInfo->CommitCalled = true;
564-
565-
if (txInfo->WriteCount == txInfo->AckCount) {
566-
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
567-
if (auto self = cbContext->LockShared()) {
568-
self->DeleteTx(txId);
558+
auto callback = [cbContext = this->SelfContext, txId, txInfo]() {
559+
with_lock(txInfo->Lock) {
560+
Y_ABORT_UNLESS(!txInfo->CommitCalled);
561+
562+
txInfo->CommitCalled = true;
563+
564+
if (txInfo->WriteCount == txInfo->AckCount) {
565+
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
566+
if (auto self = cbContext->LockShared()) {
567+
self->DeleteTx(txId);
568+
}
569+
return txInfo->AllAcksReceived.GetFuture();
570+
}
571+
572+
if (txInfo->IsActive) {
573+
return txInfo->AllAcksReceived.GetFuture();
569574
}
570-
return txInfo->AllAcksReceived.GetFuture();
571-
}
572-
573-
if (txInfo->IsActive) {
574-
return txInfo->AllAcksReceived.GetFuture();
575575
}
576-
}
577-
578-
return NThreading::MakeFuture(MakeSessionExpiredError());
579-
};
580-
581-
tx->AddPrecommitCallback(std::move(callback));
576+
577+
return NThreading::MakeFuture(MakeSessionExpiredError());
578+
};
579+
580+
tx->AddPrecommitCallback(std::move(callback));
581+
}
582582
}
583583

584584
void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)

tests/integration/topic/topic_to_table.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,9 +1335,6 @@ void TxUsage::TestWriteToTopic10()
13351335

13361336
void TxUsage::TestWriteToTopic26()
13371337
{
1338-
// TODO(brgayazov): fix test
1339-
GTEST_SKIP() << "Test is flaky";
1340-
13411338
//
13421339
// the test verifies a transaction in which data is read from a partition of one topic and written to
13431340
// another partition of this topic
@@ -1723,9 +1720,6 @@ TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_17))
17231720

17241721
void TxUsage::TestWriteToTopic25()
17251722
{
1726-
// TODO(brgayazov): fix test
1727-
GTEST_SKIP() << "Test is flaky";
1728-
17291723
//
17301724
// the test verifies a transaction in which data is read from one topic and written to another
17311725
//

0 commit comments

Comments
 (0)