@@ -22,11 +22,11 @@ TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikim
2222 }
2323}
2424
25- void TTopicSdkTestSetup::CreateTopicWithAutoscale (const TString & path, const TString & consumer, size_t partitionCount, size_t maxPartitionCount) {
25+ void TTopicSdkTestSetup::CreateTopicWithAutoscale (const std::string & path, const std::string & consumer, size_t partitionCount, size_t maxPartitionCount) {
2626 CreateTopic (path, consumer, partitionCount, maxPartitionCount);
2727}
2828
29- void TTopicSdkTestSetup::CreateTopic (const TString & path, const TString & consumer, size_t partitionCount, std::optional<size_t > maxPartitionCount)
29+ void TTopicSdkTestSetup::CreateTopic (const std::string & path, const std::string & consumer, size_t partitionCount, std::optional<size_t > maxPartitionCount)
3030{
3131 TTopicClient client (MakeDriver ());
3232
@@ -49,10 +49,10 @@ void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consume
4949 auto status = client.CreateTopic (path, topics).GetValueSync ();
5050 UNIT_ASSERT (status.IsSuccess ());
5151
52- Server.WaitInit (path);
52+ Server.WaitInit (TString{ path} );
5353}
5454
55- TTopicDescription TTopicSdkTestSetup::DescribeTopic (const TString & path)
55+ TTopicDescription TTopicSdkTestSetup::DescribeTopic (const std::string & path)
5656{
5757 TTopicClient client (MakeDriver ());
5858
@@ -66,7 +66,7 @@ TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path)
6666 return status.GetTopicDescription ();
6767}
6868
69- TConsumerDescription TTopicSdkTestSetup::DescribeConsumer (const TString & path, const TString & consumer)
69+ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer (const std::string & path, const std::string & consumer)
7070{
7171 TTopicClient client (MakeDriver ());
7272
@@ -80,22 +80,79 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, c
8080 return status.GetConsumerDescription ();
8181}
8282
83- void TTopicSdkTestSetup::Write (const std::string& message, ui32 partitionId) {
83+ void TTopicSdkTestSetup::Write (const std::string& message, ui32 partitionId, const std::optional<std::string> producer, std::optional<ui64> seqNo ) {
8484 TTopicClient client (MakeDriver ());
8585
8686 TWriteSessionSettings settings;
8787 settings.Path (TEST_TOPIC);
8888 settings.PartitionId (partitionId);
89- settings.DeduplicationEnabled (false );
89+ settings.DeduplicationEnabled (producer.has_value ());
90+ if (producer) {
91+ settings.ProducerId (producer.value ())
92+ .MessageGroupId (producer.value ());
93+ }
9094 auto session = client.CreateSimpleBlockingWriteSession (settings);
9195
92- TWriteMessage msg (TStringBuilder () << message);
93- UNIT_ASSERT (session->Write (std::move (msg)));
96+ UNIT_ASSERT (session->Write (message, seqNo));
9497
9598 session->Close (TDuration::Seconds (5 ));
9699}
97100
98- TStatus TTopicSdkTestSetup::Commit (const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId) {
101+ TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read (const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout) {
102+ TTopicClient client (MakeDriver ());
103+
104+ auto reader = client.CreateReadSession (
105+ TReadSessionSettings ()
106+ .AutoPartitioningSupport (true )
107+ .AppendTopics (TTopicReadSettings (topic))
108+ .ConsumerName (consumer));
109+
110+ TInstant deadlineTime = TInstant::Now () + timeout;
111+
112+ ReadResult result;
113+ result.Reader = reader;
114+
115+ bool continueFlag = true ;
116+ while (continueFlag && deadlineTime > TInstant::Now ()) {
117+ for (auto event : reader->GetEvents (false )) {
118+ if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
119+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
120+ if (!handler (*x)) {
121+ continueFlag = false ;
122+ break ;
123+ }
124+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
125+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
126+ x->Confirm ();
127+ result.StartPartitionSessionEvents .push_back (*x);
128+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
129+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
130+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
131+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
132+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
133+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
134+ x->Confirm ();
135+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
136+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
137+ } else if (auto * x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
138+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
139+ x->Confirm ();
140+ } else if (auto * x = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
141+ Cerr << " SESSION EVENT " << x->DebugString () << Endl << Flush;
142+ } else {
143+ Cerr << " SESSION EVENT unhandled \n " ;
144+ }
145+ }
146+
147+ Sleep (TDuration::MilliSeconds (250 ));
148+ }
149+
150+ result.Timeout = continueFlag;
151+
152+ return result;
153+ }
154+
155+ TStatus TTopicSdkTestSetup::Commit (const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId) {
99156 TTopicClient client (MakeDriver ());
100157
101158 TCommitOffsetSettings commitSettings {.ReadSessionId_ = sessionId};
0 commit comments