1616#include < library/cpp/threading/future/future.h>
1717#include < library/cpp/threading/future/async.h>
1818
19+ #include < util/stream/zlib.h>
20+
1921#include < future>
2022
2123namespace NYdb ::NTopic::NTests {
@@ -99,7 +101,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
99101 NYdb::TDriverConfig cfg;
100102 cfg.SetEndpoint (TStringBuilder () << " invalid:" << setup.GetServer ().GrpcPort );
101103 cfg.SetDatabase (" /Invalid" );
102- cfg.SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG));
104+ cfg.SetLog (std::unique_ptr<TLogBackend>( CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG). Release () ));
103105 auto driver = NYdb::TDriver (cfg);
104106
105107 {
@@ -113,13 +115,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
113115 auto writeSession = client.CreateWriteSession (writeSettings);
114116
115117 auto event = writeSession->GetEvent (true );
116- UNIT_ASSERT (event. Defined () && std::holds_alternative<TSessionClosedEvent>(event.GetRef ()));
118+ UNIT_ASSERT (event && std::holds_alternative<TSessionClosedEvent>(event.value ()));
117119 }
118120
119121 {
120122 auto settings = TTopicClientSettings ()
121123 .Database ({" /Root" })
122- .DiscoveryEndpoint ({ TStringBuilder () << " localhost:" << setup.GetServer ().GrpcPort } );
124+ .DiscoveryEndpoint (" localhost:" + std::to_string ( setup.GetServer ().GrpcPort ) );
123125
124126 TTopicClient client (driver, settings);
125127
@@ -130,7 +132,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
130132 auto writeSession = client.CreateWriteSession (writeSettings);
131133
132134 auto event = writeSession->GetEvent (true );
133- UNIT_ASSERT (event. Defined () && !std::holds_alternative<TSessionClosedEvent>(event.GetRef ()));
135+ UNIT_ASSERT (event && !std::holds_alternative<TSessionClosedEvent>(event.value ()));
134136 }
135137 }
136138
@@ -171,13 +173,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
171173 auto readSession = client.CreateReadSession (readSettings);
172174
173175 auto event = readSession->GetEvent (true );
174- UNIT_ASSERT (event.Defined ());
176+ UNIT_ASSERT (event.has_value ());
175177
176178 auto & startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event);
177179 startPartitionSession.Confirm ();
178180
179181 event = readSession->GetEvent (true );
180- UNIT_ASSERT (event.Defined ());
182+ UNIT_ASSERT (event.has_value ());
181183
182184 auto & dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
183185 dataReceived.Commit ();
@@ -234,16 +236,16 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
234236 auto readSession = client.CreateReadSession (readSettings);
235237
236238 auto event = readSession->GetEvent (true );
237- UNIT_ASSERT (event.Defined ());
239+ UNIT_ASSERT (event.has_value ());
238240
239241 auto & startPartitionSession = std::get<TReadSessionEvent::TStartPartitionSessionEvent>(*event);
240242 startPartitionSession.Confirm ();
241243
242244 UNIT_CHECK_GENERATED_EXCEPTION (readSession->GetEvent (true , 0 ), TContractViolation);
243- UNIT_CHECK_GENERATED_EXCEPTION (readSession->GetEvents (true , Nothing () , 0 ), TContractViolation);
245+ UNIT_CHECK_GENERATED_EXCEPTION (readSession->GetEvents (true , std:: nullopt , 0 ), TContractViolation);
244246
245247 event = readSession->GetEvent (true , 1 );
246- UNIT_ASSERT (event.Defined ());
248+ UNIT_ASSERT (event.has_value ());
247249
248250 auto & dataReceived = std::get<TReadSessionEvent::TDataReceivedEvent>(*event);
249251 dataReceived.Commit ();
@@ -335,7 +337,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
335337 auto description = result.GetConsumerDescription ();
336338 UNIT_ASSERT (description.GetPartitions ().size () == 1 );
337339 auto stats = description.GetPartitions ().front ().GetPartitionConsumerStats ();
338- UNIT_ASSERT (stats.Defined ());
340+ UNIT_ASSERT (stats.has_value ());
339341 UNIT_ASSERT (stats->GetCommittedOffset () == 50 );
340342 }
341343
@@ -695,7 +697,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
695697 auto description = result.GetTopicDescription ();
696698 UNIT_ASSERT (description.GetPartitions ().size () == 1 );
697699 auto stats = description.GetPartitions ().front ().GetPartitionStats ();
698- UNIT_ASSERT (stats.Defined ());
700+ UNIT_ASSERT (stats.has_value ());
699701 UNIT_ASSERT_VALUES_EQUAL (stats->GetEndOffset (), count);
700702
701703 }
@@ -779,7 +781,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
779781 std::visit (TOverloaded {
780782 [&](TReadSessionEvent::TDataReceivedEvent& event) {
781783 for (auto & message: event.GetMessages ()) {
782- TString sourceId = message.GetMessageGroupId ();
784+ std::string sourceId = message.GetMessageGroupId ();
783785 ui32 seqNo = message.GetSeqNo ();
784786 UNIT_ASSERT_VALUES_EQUAL (readMessageCount + 1 , seqNo);
785787 ++readMessageCount;
@@ -829,11 +831,11 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
829831
830832 auto client = setup.MakeClient ();
831833 ui64 producerIndex = 0u ;
832- auto runTest = [&](TString producer, TString msgGroup, const TMaybe <bool >& useDedup, bool useSeqNo, EExpectedTestResult result) ->bool
834+ auto runTest = [&](TString producer, TString msgGroup, const std::optional <bool >& useDedup, bool useSeqNo, EExpectedTestResult result) ->bool
833835 {
834836 TWriteSessionSettings writeSettings;
835837 writeSettings.Path (setup.GetTopicPath ()).Codec (NTopic::ECodec::RAW);
836- TString useDedupStr = useDedup.Defined () ? ToString (*useDedup) : " <unset>" ;
838+ TString useDedupStr = useDedup.has_value () ? ToString (*useDedup) : " <unset>" ;
837839 if (producer) {
838840 producer += ToString (producerIndex);
839841 }
@@ -848,7 +850,7 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
848850 << useDedupStr << " , manual SeqNo: " << useSeqNo << Endl;
849851
850852 try {
851- if (useDedup.Defined ()) {
853+ if (useDedup.has_value ()) {
852854 writeSettings.DeduplicationEnabled (useDedup);
853855 }
854856 auto session = client.CreateWriteSession (writeSettings);
@@ -857,12 +859,12 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
857859 ui64 written = 0 ;
858860 while (written < 10 ) {
859861 auto event = session->GetEvent (true );
860- if (std::holds_alternative<TSessionClosedEvent>(event.GetRef ())) {
862+ if (std::holds_alternative<TSessionClosedEvent>(event.value ())) {
861863 auto closed = std::get<TSessionClosedEvent>(*event);
862864 Cerr << " Session failed with error: " << closed.DebugString () << Endl;
863865 UNIT_ASSERT (result == EExpectedTestResult::FAIL_ON_RPC);
864866 return false ;
865- } else if (std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event.GetRef ())) {
867+ } else if (std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event.value ())) {
866868 token = std::move (std::get<TWriteSessionEvent::TReadyToAcceptEvent>(*event).ContinuationToken );
867869 if (useSeqNo) {
868870 session->Write (std::move (*token), " data" , seqNo++);
@@ -948,10 +950,10 @@ Y_UNIT_TEST_SUITE(TSettingsValidation) {
948950
949951 auto readSession = client.CreateReadSession (readSettings);
950952 auto event = readSession->GetEvent (true );
951- UNIT_ASSERT (event.Defined ());
953+ UNIT_ASSERT (event.has_value ());
952954
953955 auto & closeEvent = std::get<NYdb::NTopic::TSessionClosedEvent>(*event);
954- UNIT_ASSERT (closeEvent.DebugString ().Contains (" Too small max memory usage" ));
956+ UNIT_ASSERT (closeEvent.DebugString ().contains (" Too small max memory usage" ));
955957 }
956958
957959} // Y_UNIT_TEST_SUITE(TSettingsValidation)
0 commit comments