2626#include " files-async.hpp"
2727#include " db-utils.h"
2828#include " td/utils/PathView.h"
29+ #include " td/utils/Random.h"
30+
31+ #include < csignal>
2932
3033namespace ton {
3134
@@ -107,6 +110,10 @@ std::string DbStatistics::to_string_and_reset() {
107110 return ss.str ();
108111}
109112
113+ void PackageWriter::tear_down () {
114+ sync_now ();
115+ }
116+
110117void PackageWriter::append (std::string filename, td::BufferSlice data,
111118 td::Promise<std::pair<td::uint64, td::uint64>> promise) {
112119 td::uint64 offset, size;
@@ -119,14 +126,45 @@ void PackageWriter::append(std::string filename, td::BufferSlice data,
119126 return ;
120127 }
121128 start = td::Timestamp::now ();
122- offset = p->append (std::move (filename), std::move (data), !async_mode_ );
129+ offset = p->append (std::move (filename), std::move (data), false );
123130 end = td::Timestamp::now ();
124131 size = p->size ();
125132 }
126133 if (statistics_) {
127134 statistics_->record_write ((end.at () - start.at ()) * 1e6 , data_size);
128135 }
129- promise.set_value (std::pair<td::uint64, td::uint64>{offset, size});
136+ sync ([=, promise = std::move (promise)](td::Result<td::Unit>) mutable {
137+ promise.set_value (std::pair<td::uint64, td::uint64>{offset, size});
138+ });
139+ }
140+
141+ void PackageWriter::sync (td::Promise<td::Unit> promise) {
142+ if (!async_mode_) {
143+ sync_now ();
144+ promise.set_value (td::Unit{});
145+ return ;
146+ }
147+ sync_waiters_.push_back (std::move (promise));
148+ if (!sync_pending_) {
149+ sync_pending_ = true ;
150+ td::actor::send_closure_later (actor_id (this ), &PackageWriter::sync_now);
151+ }
152+ }
153+
154+ void PackageWriter::sync_now () {
155+ auto p = package_.lock ();
156+ if (p) {
157+ p->sync ();
158+ }
159+ sync_pending_ = false ;
160+ auto waiters = std::move (sync_waiters_);
161+ sync_waiters_.clear ();
162+ if (!waiters.empty ()) {
163+ LOG (DEBUG) << " Writer huge transaction size = " << waiters.size ();
164+ }
165+ for (auto &promise: waiters) {
166+ promise.set_value (td::Unit{});
167+ }
130168}
131169
132170class PackageReader : public td ::actor::Actor {
@@ -168,6 +206,10 @@ static std::string package_info_to_str(BlockSeqno seqno, ShardIdFull shard_prefi
168206 return PSTRING () << seqno << " ." << shard_prefix.workchain << " :" << shard_to_str (shard_prefix.shard );
169207}
170208
209+ void ArchiveSlice::tear_down () {
210+ commit_transaction_now ();
211+ }
212+
171213void ArchiveSlice::add_handle (BlockHandle handle, td::Promise<td::Unit> promise) {
172214 if (destroyed_) {
173215 promise.set_error (td::Status::Error (ErrorCode::notready, " package already gc'd" ));
@@ -239,16 +281,17 @@ void ArchiveSlice::add_handle(BlockHandle handle, td::Promise<td::Unit> promise)
239281 kv_->set (shard_key.as_slice (), shard_value.as_slice ()).ensure ();
240282 }
241283 kv_->set (get_db_key_block_info (handle->id ()), handle->serialize ().as_slice ()).ensure ();
242- commit_transaction ();
243-
244- handle->flushed_upto (version);
245- handle->set_handle_moved_to_archive ();
284+ commit_transaction ([=, this , promise = std::move (promise)](td::Result<td::Unit> R) mutable {
285+ R. ensure ();
286+ handle->flushed_upto (version);
287+ handle->set_handle_moved_to_archive ();
246288
247- if (handle->need_flush ()) {
248- update_handle (std::move (handle), std::move (promise));
249- } else {
250- promise.set_value (td::Unit ());
251- }
289+ if (handle->need_flush ()) {
290+ update_handle (std::move (handle), std::move (promise));
291+ } else {
292+ promise.set_value (td::Unit ());
293+ }
294+ });
252295}
253296
254297void ArchiveSlice::update_handle (BlockHandle handle, td::Promise<td::Unit> promise) {
@@ -269,12 +312,13 @@ void ArchiveSlice::update_handle(BlockHandle handle, td::Promise<td::Unit> promi
269312 kv_->set (get_db_key_block_info (handle->id ()), handle->serialize ().as_slice ()).ensure ();
270313 handle->flushed_upto (version);
271314 } while (handle->need_flush ());
272- commit_transaction ();
273- if (!temp_) {
274- handle->set_handle_moved_to_archive ();
275- }
276-
277- promise.set_value (td::Unit ());
315+ commit_transaction ([=, this , promise = std::move (promise)](td::Result<td::Unit> R) mutable {
316+ R.ensure ();
317+ if (!temp_) {
318+ handle->set_handle_moved_to_archive ();
319+ }
320+ promise.set_value (td::Unit ());
321+ });
278322}
279323
280324void ArchiveSlice::add_file (BlockHandle handle, FileReference ref_id, td::BufferSlice data,
@@ -326,7 +370,7 @@ void ArchiveSlice::add_file_cont(size_t idx, FileReference ref_id, td::uint64 of
326370 kv_->set (" status" , td::to_string (size)).ensure ();
327371 kv_->set (ref_id.hash ().to_hex (), td::to_string (offset)).ensure ();
328372 }
329- commit_transaction ();
373+ commit_transaction (std::move (promise) );
330374 promise.set_value (td::Unit ());
331375}
332376
@@ -712,6 +756,7 @@ void ArchiveSlice::do_close() {
712756 if (destroyed_) {
713757 return ;
714758 }
759+ commit_transaction_now ();
715760 CHECK (status_ != st_closed && active_queries_ == 0 );
716761 LOG (DEBUG) << " Closing archive slice " << db_path_;
717762 status_ = st_closed;
@@ -741,30 +786,48 @@ void ArchiveSlice::end_async_query() {
741786}
742787
743788void ArchiveSlice::begin_transaction () {
744- if (!async_mode_ || !huge_transaction_started_) {
745- kv_->begin_transaction ().ensure ();
746- if (async_mode_) {
747- huge_transaction_started_ = true ;
748- }
789+ if (transaction_started_) {
790+ return ;
749791 }
792+ transaction_started_ = true ;
793+ kv_->begin_transaction ().ensure ();
750794}
751795
752- void ArchiveSlice::commit_transaction () {
753- if (!async_mode_ || huge_transaction_size_++ >= 100 ) {
754- kv_->commit_transaction ().ensure ();
755- if (async_mode_) {
756- huge_transaction_size_ = 0 ;
757- huge_transaction_started_ = false ;
758- }
796+ void ArchiveSlice::commit_transaction (td::Promise<td::Unit> promise) {
797+ if (!transaction_started_ || !async_mode_) {
798+ commit_transaction_now ();
799+ promise.set_value (td::Unit{});
800+ return ;
801+ }
802+ transaction_commit_waiters_.push_back (std::move (promise));
803+ if (!transaction_commit_pending_) {
804+ transaction_commit_pending_ = true ;
805+ td::actor::send_closure_later (actor_id (this ), &ArchiveSlice::commit_transaction_now);
806+ }
807+ }
808+
809+ void ArchiveSlice::commit_transaction_now () {
810+ if (!transaction_started_) {
811+ return ;
812+ }
813+ kv_->commit_transaction ().ensure ();
814+ transaction_started_ = false ;
815+ transaction_commit_pending_ = false ;
816+ auto waiters = std::move (transaction_commit_waiters_);
817+ transaction_commit_waiters_.clear ();
818+ if (!waiters.empty ()) {
819+ LOG (DEBUG) << " Huge transaction size = " << waiters.size ();
820+ }
821+ for (auto &promise: waiters) {
822+ promise.set_value (td::Unit{});
759823 }
760824}
761825
826+
762827void ArchiveSlice::set_async_mode (bool mode, td::Promise<td::Unit> promise) {
763828 async_mode_ = mode;
764- if (!async_mode_ && huge_transaction_started_ && kv_) {
765- kv_->commit_transaction ().ensure ();
766- huge_transaction_size_ = 0 ;
767- huge_transaction_started_ = false ;
829+ if (!async_mode_) {
830+ commit_transaction_now ();
768831 }
769832
770833 td::MultiPromise mp;
@@ -820,7 +883,7 @@ td::Result<ArchiveSlice::PackageInfo *> ArchiveSlice::choose_package(BlockSeqno
820883 if (shard_separated_) {
821884 kv_->set (PSTRING () << " info." << v, package_info_to_str (masterchain_seqno, shard_prefix)).ensure ();
822885 }
823- commit_transaction ();
886+ commit_transaction_now ();
824887 add_package (masterchain_seqno, shard_prefix, 0 , default_package_version ());
825888 return &packages_[v];
826889 } else {
@@ -874,6 +937,7 @@ void destroy_db(std::string name, td::uint32 attempt, td::Promise<td::Unit> prom
874937} // namespace
875938
876939void ArchiveSlice::destroy (td::Promise<td::Unit> promise) {
940+ commit_transaction_now ();
877941 before_query ();
878942 destroyed_ = true ;
879943
@@ -1081,7 +1145,8 @@ void ArchiveSlice::truncate(BlockSeqno masterchain_seqno, ConstBlockHandle, td::
10811145 package->writer .reset ();
10821146 td::unlink (package->path ).ensure ();
10831147 td::rename (package->path + " .new" , package->path ).ensure ();
1084- package->writer = td::actor::create_actor<PackageWriter>(" writer" , new_package, async_mode_);
1148+ package->writer = td::actor::create_actor<PackageWriter>(
1149+ PSTRING () << " writer." << td::PathView (package->path ).file_name (), new_package, async_mode_);
10851150 }
10861151
10871152 std::vector<PackageInfo> new_packages_info;
0 commit comments