1212#include < util/stream/buffer.h>
1313#include < util/generic/guid.h>
1414
15+ template <>
16+ void Out<NYdb::NTopic::TTransactionId>(IOutputStream& s, const NYdb::NTopic::TTransactionId& v)
17+ {
18+ s << " {" << v.SessionId << " , " << v.TxId << " }" ;
19+ }
20+
1521namespace NYdb ::inline V3::NTopic {
1622
1723const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1 );
@@ -35,13 +41,22 @@ TTxIdOpt GetTransactionId(const Ydb::Topic::StreamWriteMessage_WriteRequest& req
3541 return TTxId (tx.session (), tx.id ());
3642}
3743
38- TTxIdOpt GetTransactionId (const NTable::TTransaction* tx)
44+ TTxIdOpt GetTransactionId (const std::optional<TTransactionId>& tx)
45+ {
46+ if (!tx) {
47+ return std::nullopt ;
48+ }
49+
50+ return TTxId (tx->SessionId , tx->TxId );
51+ }
52+
53+ std::optional<TTransactionId> MakeTransactionId (const NTable::TTransaction* tx)
3954{
4055 if (!tx) {
4156 return std::nullopt ;
4257 }
4358
44- return TTxId ( tx->GetSession ().GetId (), tx->GetId ()) ;
59+ return TTransactionId{ tx->GetSession ().GetId (), tx->GetId ()} ;
4560}
4661
4762}
@@ -584,7 +599,7 @@ void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)
584599 ++txInfo->AckCount ;
585600
586601 LOG_LAZY (DbDriverState->Log , TLOG_DEBUG,
587- LogPrefixImpl () << " OnAck: seqNo=" << seqNo << " , txId=" << GetTxId ( txId) << " , WriteCount=" << txInfo->WriteCount << " , AckCount=" << txInfo->AckCount );
602+ LogPrefixImpl () << " OnAck: seqNo=" << seqNo << " , txId=" << txId << " , WriteCount=" << txInfo->WriteCount << " , AckCount=" << txInfo->AckCount );
588603
589604 if (txInfo->CommitCalled && (txInfo->WriteCount == txInfo->AckCount )) {
590605 txInfo->AllAcksReceived .SetValue (MakeCommitTransactionSuccess ());
@@ -631,15 +646,15 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
631646 ++txInfo->WriteCount ;
632647
633648 LOG_LAZY (DbDriverState->Log , TLOG_DEBUG,
634- LogPrefixImpl () << " OnWrite: seqNo=" << seqNo << " , txId=" << GetTxId ( txId) << " , WriteCount=" << txInfo->WriteCount << " , AckCount=" << txInfo->AckCount );
649+ LogPrefixImpl () << " OnWrite: seqNo=" << seqNo << " , txId=" << txId << " , WriteCount=" << txInfo->WriteCount << " , AckCount=" << txInfo->AckCount );
635650 }
636651 WrittenInTx[seqNo] = txId;
637652 }
638653
639654 CurrentBatch.Add (
640655 seqNo, createdAtValue, message.Data , message.Codec , message.OriginalSize ,
641656 message.MessageMeta_ ,
642- message.GetTxPtr ()
657+ MakeTransactionId ( message.GetTxPtr () )
643658 );
644659
645660 FlushWriteIfRequiredImpl ();
@@ -1412,10 +1427,10 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
14121427 if (!currMessage.MessageMeta .empty ()) {
14131428 OriginalMessagesToSend.emplace (id, createTs, datum.size (),
14141429 std::move (currMessage.MessageMeta ),
1415- currMessage.Tx );
1430+ std::move ( currMessage.Tx ) );
14161431 } else {
14171432 OriginalMessagesToSend.emplace (id, createTs, datum.size (),
1418- currMessage.Tx );
1433+ std::move ( currMessage.Tx ) );
14191434 }
14201435 }
14211436 block.Data = std::move (CurrentBatch.Data );
@@ -1523,8 +1538,8 @@ void TWriteSessionImpl::SendImpl() {
15231538 auto * msgData = writeRequest->add_messages ();
15241539
15251540 if (message.Tx ) {
1526- writeRequest->mutable_tx ()->set_id (TStringType{ message.Tx ->GetId ()} );
1527- writeRequest->mutable_tx ()->set_session (TStringType{ message.Tx ->GetSession (). GetId ()} );
1541+ writeRequest->mutable_tx ()->set_id (message.Tx ->TxId );
1542+ writeRequest->mutable_tx ()->set_session (message.Tx ->SessionId );
15281543 }
15291544
15301545 msgData->set_seq_no (GetSeqNoImpl (message.Id ));
0 commit comments