@@ -36,8 +36,8 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
3636 Y_UNUSED (maxByteSize);
3737
3838 TVector<NYdb::NTopic::TReadSessionEvent::TEvent> res;
39- for (auto event = EventsQ_.Pop (block); !event.Empty () && res.size () <= maxEventsCount.GetOrElse (std::numeric_limits<size_t >::max ()); event = EventsQ_.Pop (/* block=*/ false )) {
40- res.push_back (*event);
39+ for (auto event = EventsQ_.Pop (block); !event.Empty () && res.size () < maxEventsCount.GetOrElse (std::numeric_limits<size_t >::max ()); event = EventsQ_.Pop (/* block=*/ false )) {
40+ res.push_back (std::move ( *event) );
4141 }
4242 return res;
4343 }
@@ -59,7 +59,6 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
5959
6060 bool Close (TDuration timeout = TDuration::Max()) override {
6161 Y_UNUSED (timeout);
62- // TOOD send TSessionClosedEvent
6362 EventsQ_.Stop ();
6463 Pool_.Stop ();
6564
@@ -133,7 +132,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
133132 Sleep (FILE_POLL_PERIOD);
134133 }
135134 }
136-
135+
137136 TFile File_;
138137 TBlockingEQueue<NYdb::NTopic::TReadSessionEvent::TEvent> EventsQ_ {4_MB};
139138 NYdb::NTopic::TPartitionSession::TPtr Session_;
@@ -146,6 +145,172 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
146145 ui64 SeqNo_ = 0 ;
147146};
148147
148+ class TFileTopicWriteSession : public NYdb ::NTopic::IWriteSession, private NYdb::NTopic::TContinuationTokenIssuer {
149+ public:
150+ TFileTopicWriteSession (TFile file):
151+ File_ (std::move(file)), FileWriter_([this ] () {
152+ PushToFile ();
153+ }), Counters_()
154+ {
155+ Pool_.Start (1 );
156+ EventsQ_.Push (NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
157+ }
158+
159+ NThreading::TFuture<void > WaitEvent () override {
160+ return NThreading::Async ([this ] () {
161+ EventsQ_.BlockUntilEvent ();
162+ return NThreading::MakeFuture ();
163+ }, Pool_);
164+ }
165+
166+ TMaybe<NYdb::NTopic::TWriteSessionEvent::TEvent> GetEvent (bool block) override {
167+ return EventsQ_.Pop (block);
168+ }
169+
170+ TVector<NYdb::NTopic::TWriteSessionEvent::TEvent> GetEvents (bool block, TMaybe<size_t > maxEventsCount) override {
171+ TVector<NYdb::NTopic::TWriteSessionEvent::TEvent> res;
172+ for (auto event = EventsQ_.Pop (block); !event.Empty () && res.size () < maxEventsCount.GetOrElse (std::numeric_limits<size_t >::max ()); event = EventsQ_.Pop (/* block=*/ false )) {
173+ res.push_back (std::move (*event));
174+ }
175+ return res;
176+ }
177+
178+ NThreading::TFuture<ui64> GetInitSeqNo () override {
179+ return NThreading::MakeFuture (SeqNo_);
180+ }
181+
182+ void Write (NYdb::NTopic::TContinuationToken&&, NYdb::NTopic::TWriteMessage&& message,
183+ NYdb::NTable::TTransaction* tx) override {
184+ Y_UNUSED (tx);
185+
186+ auto size = message.Data .size ();
187+ EventsMsgQ_.Push (TOwningWriteMessage (std::move (message)), size);
188+ }
189+
190+ void Write (NYdb::NTopic::TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
191+ TMaybe<TInstant> createTimestamp) override {
192+ NYdb::NTopic::TWriteMessage message (data);
193+ if (seqNo.Defined ()) {
194+ message.SeqNo (*seqNo);
195+ }
196+ if (createTimestamp.Defined ()) {
197+ message.CreateTimestamp (*createTimestamp);
198+ }
199+
200+ Write (std::move (token), std::move (message), nullptr );
201+ }
202+
203+ // Ignores codec in message and always writes raw for debugging purposes
204+ void WriteEncoded (NYdb::NTopic::TContinuationToken&& token, NYdb::NTopic::TWriteMessage&& params,
205+ NYdb::NTable::TTransaction* tx) override {
206+ Y_UNUSED (tx);
207+
208+ NYdb::NTopic::TWriteMessage message (params.Data );
209+
210+ if (params.CreateTimestamp_ .Defined ()) {
211+ message.CreateTimestamp (*params.CreateTimestamp_ );
212+ }
213+ if (params.SeqNo_ ) {
214+ message.SeqNo (*params.SeqNo_ );
215+ }
216+ message.MessageMeta (params.MessageMeta_ );
217+
218+ Write (std::move (token), std::move (message), nullptr );
219+ }
220+
221+ // Ignores codec in message and always writes raw for debugging purposes
222+ void WriteEncoded (NYdb::NTopic::TContinuationToken&& token, TStringBuf data, NYdb::NTopic::ECodec codec, ui32 originalSize,
223+ TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) override {
224+ Y_UNUSED (codec);
225+ Y_UNUSED (originalSize);
226+
227+ NYdb::NTopic::TWriteMessage message (data);
228+ if (seqNo.Defined ()) {
229+ message.SeqNo (*seqNo);
230+ }
231+ if (createTimestamp.Defined ()) {
232+ message.CreateTimestamp (*createTimestamp);
233+ }
234+
235+ Write (std::move (token), std::move (message), nullptr );
236+ }
237+
238+ bool Close (TDuration timeout = TDuration::Max()) override {
239+ Y_UNUSED (timeout);
240+ EventsQ_.Stop ();
241+ EventsMsgQ_.Stop ();
242+ Pool_.Stop ();
243+
244+ if (FileWriter_.joinable ()) {
245+ FileWriter_.join ();
246+ }
247+ return true ;
248+ }
249+
250+ NYdb::NTopic::TWriterCounters::TPtr GetCounters () override {
251+ return Counters_;
252+ }
253+
254+ ~TFileTopicWriteSession () override {
255+ EventsQ_.Stop ();
256+ EventsMsgQ_.Stop ();
257+ Pool_.Stop ();
258+ if (FileWriter_.joinable ()) {
259+ FileWriter_.join ();
260+ }
261+ }
262+
263+ private:
264+ void PushToFile () {
265+ TFileOutput fo (File_);
266+ ui64 offset = 0 ; // FIXME dummy
267+ ui64 partitionId = 0 ; // FIXME dummy
268+ while (auto maybeMsg = EventsMsgQ_.Pop (true )) {
269+ NYdb::NTopic::TWriteSessionEvent::TAcksEvent acks;
270+ do {
271+ auto & [content, msg] = *maybeMsg;
272+ NYdb::NTopic::TWriteSessionEvent::TWriteAck ack;
273+ if (msg.SeqNo_ .Defined ()) { // FIXME should be auto generated otherwise
274+ ack.SeqNo = *msg.SeqNo_ ;
275+ }
276+ ack.State = NYdb::NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN;
277+ ack.Details .ConstructInPlace (offset, partitionId);
278+ acks.Acks .emplace_back (std::move (ack));
279+ offset += content.size () + 1 ;
280+ fo.Write (content);
281+ fo.Write (' \n ' );
282+ } while ((maybeMsg = EventsMsgQ_.Pop (false )));
283+ fo.Flush ();
284+ EventsQ_.Push (std::move (acks), 1 + acks.Acks .size ());
285+ EventsQ_.Push (NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()}, 1 );
286+ if (EventsQ_.IsStopped ()) {
287+ break ;
288+ }
289+ }
290+ }
291+
292+ TFile File_;
293+
294+ // We acquire ownership of messages immediately
295+ // TODO: remove extra message copying to and from queue
296+ struct TOwningWriteMessage {
297+ TString content;
298+ NYdb::NTopic::TWriteMessage msg;
299+
300+ TOwningWriteMessage (NYdb::NTopic::TWriteMessage&& msg): content(msg.Data), msg(std::move(msg)) {
301+ msg.Data = content;
302+ }
303+ };
304+ TBlockingEQueue<TOwningWriteMessage> EventsMsgQ_ {4_MB};
305+
306+ TBlockingEQueue<NYdb::NTopic::TWriteSessionEvent::TEvent> EventsQ_ {128_KB};
307+ std::thread FileWriter_;
308+
309+ TThreadPool Pool_;
310+ NYdb::NTopic::TWriterCounters::TPtr Counters_;
311+ ui64 SeqNo_ = 0 ;
312+ };
313+
149314struct TDummyPartitionSession : public NYdb ::NTopic::TPartitionSession {
150315 TDummyPartitionSession (ui64 sessionId, const TString& topicPath, ui64 partId) {
151316 PartitionSessionId = sessionId;
@@ -231,8 +396,13 @@ std::shared_ptr<NYdb::NTopic::ISimpleBlockingWriteSession> TFileTopicClient::Cre
231396}
232397
233398std::shared_ptr<NYdb::NTopic::IWriteSession> TFileTopicClient::CreateWriteSession (const NYdb::NTopic::TWriteSessionSettings& settings) {
234- Y_UNUSED (settings);
235- return nullptr ;
399+ TString topicPath = settings.Path_ ;
400+ auto topicsIt = Topics_.find (make_pair (" pq" , topicPath));
401+ Y_ENSURE (topicsIt != Topics_.end ());
402+ auto filePath = topicsIt->second .FilePath ;
403+ Y_ENSURE (filePath);
404+
405+ return std::make_shared<TFileTopicWriteSession>(TFile (*filePath, EOpenMode::TEnum::RdWr));
236406}
237407
238408NYdb::TAsyncStatus TFileTopicClient::CommitOffset (const TString& path, ui64 partitionId, const TString& consumerName, ui64 offset,
0 commit comments