@@ -13,14 +13,17 @@ namespace NYql {
1313
1414class TFileTopicReadSession : public NYdb ::NTopic::IReadSession {
1515
16- constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5 );
16+ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5 );
1717
1818public:
19- TFileTopicReadSession (TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = " " ):
20- File_(std::move(file)), Session_(std::move(session)), ProducerId_(producerId),
21- FilePoller_([this ] () {
22- PollFileForChanges ();
23- }), Counters_()
19+ TFileTopicReadSession (TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = " " )
20+ : File_(std::move(file))
21+ , Session_(std::move(session))
22+ , ProducerId_(producerId)
23+ , FilePoller_([this ] () {
24+ PollFileForChanges ();
25+ })
26+ , Counters_()
2427 {
2528 Pool_.Start (1 );
2629 }
@@ -37,7 +40,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
3740 Y_UNUSED (maxByteSize);
3841
3942 TVector<NYdb::NTopic::TReadSessionEvent::TEvent> res;
40- for (auto event = EventsQ_.Pop (block); !event.Empty () && res.size () < maxEventsCount.GetOrElse (std::numeric_limits<size_t >::max ()); event = EventsQ_.Pop (/* block=*/ false )) {
43+ for (auto event = EventsQ_.Pop (block); !event.Empty () && res.size () < maxEventsCount.GetOrElse (std::numeric_limits<size_t >::max ()); event = EventsQ_.Pop (/* block=*/ false )) {
4144 res.push_back (std::move (*event));
4245 }
4346 return res;
@@ -60,13 +63,15 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
6063
6164 bool Close (TDuration timeout = TDuration::Max()) override {
6265 Y_UNUSED (timeout);
66+ // TODO send TSessionClosedEvent
67+ // XXX (... but if we stop queues, nobody will receive it, needs rethinking)
6368 EventsQ_.Stop ();
6469 Pool_.Stop ();
6570
6671 if (FilePoller_.joinable ()) {
6772 FilePoller_.join ();
6873 }
69- return true ;
74+ return true ; // TODO incorrect if EventQ_ was non-empty
7075 }
7176
7277 NYdb::NTopic::TReaderCounters::TPtr GetCounters () const override {
@@ -86,11 +91,11 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
8691 }
8792
8893private:
89- using TMessageInformation = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation;
90- using TMessage = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage;
94+ using TMessageInformation = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation;
95+ using TMessage = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage;
9196
92- TMessageInformation MakeNextMessageInformation (size_t offset, size_t uncompressedSize, const TString& messageGroupId = " " ) {
93- auto now = TInstant::Now ();
97+ TMessageInformation MakeNextMessageInformation (size_t offset, size_t uncompressedSize, const TString& messageGroupId = " " ) {
98+ auto now = TInstant::Now ();
9499 TMessageInformation msgInfo (
95100 offset,
96101 ProducerId_,
@@ -104,7 +109,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
104109 );
105110 return msgInfo;
106111 }
107-
112+
108113 TMessage MakeNextMessage (const TString& msgBuff) {
109114 TMessage msg (msgBuff, nullptr , MakeNextMessageInformation (MsgOffset_, msgBuff.size ()), Session_);
110115 return msg;
@@ -146,12 +151,14 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
146151 ui64 SeqNo_ = 0 ;
147152};
148153
149- class TFileTopicWriteSession : public NYdb ::NTopic::IWriteSession, private NYdb::NTopic::TContinuationTokenIssuer {
154+ class TFileTopicWriteSession : public NYdb ::NTopic::IWriteSession, private NYdb::NTopic::TContinuationTokenIssuer {
150155public:
151- TFileTopicWriteSession (TFile file):
152- File_ (std::move(file)), FileWriter_([this ] () {
153- PushToFile ();
154- }), Counters_()
156+ explicit TFileTopicWriteSession (TFile file)
157+ : File_(std::move(file))
158+ , FileWriter_([this ] () {
159+ PushToFile ();
160+ })
161+ , Counters_()
155162 {
156163 Pool_.Start (1 );
157164 EventsQ_.Push (NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
@@ -170,7 +177,7 @@ class TFileTopicWriteSession : public NYdb::NTopic::IWriteSession, private NYdb
170177
171178 TVector<NYdb::NTopic::TWriteSessionEvent::TEvent> GetEvents (bool block, TMaybe<size_t > maxEventsCount) override {
172179 TVector<NYdb::NTopic::TWriteSessionEvent::TEvent> res;
173- for (auto event = EventsQ_.Pop (block); !event.Empty () && res.size () < maxEventsCount.GetOrElse (std::numeric_limits<size_t >::max ()); event = EventsQ_.Pop (/* block=*/ false )) {
180+ for (auto event = EventsQ_.Pop (block); !event.Empty () && res.size () < maxEventsCount.GetOrElse (std::numeric_limits<size_t >::max ()); event = EventsQ_.Pop (/* block=*/ false )) {
174181 res.push_back (std::move (*event));
175182 }
176183 return res;
@@ -180,10 +187,10 @@ class TFileTopicWriteSession : public NYdb::NTopic::IWriteSession, private NYdb
180187 return NThreading::MakeFuture (SeqNo_);
181188 }
182189
183- void Write (NYdb::NTopic::TContinuationToken&&, NYdb::NTopic::TWriteMessage&& message,
190+ void Write (NYdb::NTopic::TContinuationToken&&, NYdb::NTopic::TWriteMessage&& message,
184191 NYdb::NTable::TTransaction* tx) override {
185192 Y_UNUSED (tx);
186-
193+
187194 auto size = message.Data .size ();
188195 EventsMsgQ_.Push (TOwningWriteMessage (std::move (message)), size);
189196 }
@@ -238,14 +245,16 @@ class TFileTopicWriteSession : public NYdb::NTopic::IWriteSession, private NYdb
238245
239246 bool Close (TDuration timeout = TDuration::Max()) override {
240247 Y_UNUSED (timeout);
248+ // TODO send TSessionClosedEvent
249+ // XXX (... but if we stop queues, nobody will receive it, needs rethinking)
241250 EventsQ_.Stop ();
242251 EventsMsgQ_.Stop ();
243252 Pool_.Stop ();
244253
245254 if (FileWriter_.joinable ()) {
246255 FileWriter_.join ();
247256 }
248- return true ;
257+ return true ; // TODO incorrect if Event*Q_ was non-empty
249258 }
250259
251260 NYdb::NTopic::TWriterCounters::TPtr GetCounters () override {
@@ -289,16 +298,19 @@ class TFileTopicWriteSession : public NYdb::NTopic::IWriteSession, private NYdb
289298 }
290299 }
291300 }
292-
301+
293302 TFile File_;
294-
303+
295304 // We acquire ownership of messages immediately
296305 // TODO: remove extra message copying to and from queue
297306 struct TOwningWriteMessage {
298307 TString content;
299308 NYdb::NTopic::TWriteMessage msg;
300-
301- TOwningWriteMessage (NYdb::NTopic::TWriteMessage&& msg): content(msg.Data), msg(std::move(msg)) {
309+
310+ explicit TOwningWriteMessage (NYdb::NTopic::TWriteMessage&& msg)
311+ : content(msg.Data)
312+ , msg(std::move(msg))
313+ {
302314 msg.Data = content;
303315 }
304316 };
@@ -368,7 +380,7 @@ NYdb::TAsyncStatus TFileTopicClient::DropTopic(const TString& path, const NYdb::
368380 return NThreading::MakeFuture (NYdb::TStatus (NYdb::EStatus::SUCCESS, {}));
369381}
370382
371- NYdb::NTopic::TAsyncDescribeTopicResult TFileTopicClient::DescribeTopic (const TString& path,
383+ NYdb::NTopic::TAsyncDescribeTopicResult TFileTopicClient::DescribeTopic (const TString& path,
372384 const NYdb::NTopic::TDescribeTopicSettings& settings) {
373385 Y_UNUSED (path);
374386 Y_UNUSED (settings);
@@ -377,7 +389,7 @@ NYdb::NTopic::TAsyncDescribeTopicResult TFileTopicClient::DescribeTopic(const TS
377389 return NThreading::MakeFuture (NYdb::NTopic::TDescribeTopicResult (std::move (success), {}));
378390}
379391
380- NYdb::NTopic::TAsyncDescribeConsumerResult TFileTopicClient::DescribeConsumer (const TString& path, const TString& consumer,
392+ NYdb::NTopic::TAsyncDescribeConsumerResult TFileTopicClient::DescribeConsumer (const TString& path, const TString& consumer,
381393 const NYdb::NTopic::TDescribeConsumerSettings& settings) {
382394 Y_UNUSED (path);
383395 Y_UNUSED (consumer);
@@ -387,7 +399,7 @@ NYdb::NTopic::TAsyncDescribeConsumerResult TFileTopicClient::DescribeConsumer(co
387399 return NThreading::MakeFuture (NYdb::NTopic::TDescribeConsumerResult (std::move (success), {}));
388400}
389401
390- NYdb::NTopic::TAsyncDescribePartitionResult TFileTopicClient::DescribePartition (const TString& path, i64 partitionId,
402+ NYdb::NTopic::TAsyncDescribePartitionResult TFileTopicClient::DescribePartition (const TString& path, i64 partitionId,
391403 const NYdb::NTopic::TDescribePartitionSettings& settings) {
392404 Y_UNUSED (path);
393405 Y_UNUSED (partitionId);
0 commit comments