@@ -7361,7 +7361,7 @@ int BlueStore::_init_alloc()
73617361 << ", free 0x" << alloc->get_free()
73627362 << ", fragmentation " << alloc->get_fragmentation()
73637363 << std::dec << dendl;
7364-
7364+ if (tracepoint_debug_init_alloc_done) tracepoint_debug_init_alloc_done();
73657365 return 0;
73667366}
73677367
@@ -7769,9 +7769,19 @@ int BlueStore::_is_bluefs(bool create, bool* ret)
77697769* opens both DB and dependant super_meta, FreelistManager and allocator
77707770* in the proper order
77717771*/
7772- int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
7773- {
7774- dout(5) << __func__ << "::NCB::read_only=" << read_only << ", to_repair=" << to_repair << dendl;
7772+ int BlueStore::_open_db_and_around(
7773+ bool read_only,
7774+ bool to_repair,
7775+ bool apply_deferred,
7776+ bool remove_deferred)
7777+ {
7778+ dout(5) << __func__ << "read_only=" << read_only
7779+ << ", to_repair=" << to_repair
7780+ << ", deferred=" << (apply_deferred?"apply;":"noapply;")
7781+ << (remove_deferred?"remove":"noremove") << dendl;
7782+ ceph_assert(remove_deferred == false || apply_deferred == true);
7783+ ceph_assert(read_only == false || remove_deferred == false);
7784+ std::vector<std::string> keys_to_remove;
77757785 {
77767786 string type;
77777787 int r = read_meta("type", &type);
@@ -7831,6 +7841,11 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
78317841 if (bdev_label_multi) {
78327842 _main_bdev_label_try_reserve();
78337843 }
7844+ // This is the place where we can apply deferred writes
7845+ // without risk of some interaction with RocksDB allocating.
7846+ if (apply_deferred) {
7847+ _deferred_replay(remove_deferred ? &keys_to_remove : nullptr);
7848+ }
78347849
78357850 // Re-open in the proper mode(s).
78367851
@@ -7848,6 +7863,14 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
78487863 _post_init_alloc();
78497864 }
78507865
7866+ if (remove_deferred && !keys_to_remove.empty()) {
7867+ KeyValueDB::Transaction deferred_keys_remove_txn = db->get_transaction();
7868+ for (auto& s : keys_to_remove) {
7869+ deferred_keys_remove_txn->rmkey(PREFIX_DEFERRED, s);
7870+ }
7871+ db->submit_transaction_sync(deferred_keys_remove_txn);
7872+ }
7873+
78517874 // when function is called in repair mode (to_repair=true) we skip db->open()/create()
78527875 // we can't change bluestore allocation so no need to invlidate allocation-file
78537876 if (fm->is_null_manager() && !read_only && !to_repair) {
@@ -9229,7 +9252,7 @@ int BlueStore::mount_readonly()
92299252 }
92309253 });
92319254
9232- r = _deferred_replay();
9255+ r = _deferred_replay(nullptr );
92339256 if (r < 0) {
92349257 return r;
92359258 }
@@ -9365,8 +9388,7 @@ int BlueStore::_mount()
93659388 return -EINVAL;
93669389 }
93679390
9368- dout(5) << __func__ << "::NCB::calling open_db_and_around(read/write)" << dendl;
9369- int r = _open_db_and_around(false);
9391+ int r = _open_db_and_around(false, false, true, true);
93709392 if (r < 0) {
93719393 return r;
93729394 }
@@ -9404,11 +9426,6 @@ int BlueStore::_mount()
94049426 }
94059427 });
94069428
9407- r = _deferred_replay();
9408- if (r < 0) {
9409- return r;
9410- }
9411-
94129429 mempool_thread.init();
94139430
94149431 if ((!per_pool_stat_collection || per_pool_omap != OMAP_PER_PG) &&
@@ -10764,7 +10781,7 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
1076410781
1076510782 // in deep mode we need R/W write access to be able to replay deferred ops
1076610783 const bool read_only = !(repair || depth == FSCK_DEEP);
10767- int r = _open_db_and_around(read_only);
10784+ int r = _open_db_and_around(read_only, false, !read_only, !read_only );
1076810785 if (r < 0) {
1076910786 return r;
1077010787 }
@@ -10790,17 +10807,6 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
1079010807 mempool_thread.shutdown();
1079110808 _shutdown_cache();
1079210809 });
10793- // we need finisher and kv_{sync,finalize}_thread *just* for replay
10794- // enable in repair or deep mode modes only
10795- if (!read_only) {
10796- _kv_start();
10797- r = _deferred_replay();
10798- _kv_stop();
10799- }
10800-
10801- if (r < 0) {
10802- return r;
10803- }
1080410810 return _fsck_on_open(depth, repair);
1080510811}
1080610812
@@ -14953,6 +14959,7 @@ void BlueStore::_kv_sync_thread()
1495314959 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1495414960 std::unique_lock l{kv_lock};
1495514961 ceph_assert(!kv_sync_started);
14962+ ceph_assert(!db_was_opened_read_only);
1495614963 kv_sync_started = true;
1495714964 kv_cond.notify_all();
1495814965
@@ -15119,7 +15126,7 @@ void BlueStore::_kv_sync_thread()
1511915126 auto sync_start = mono_clock::now();
1512015127#endif
1512115128 // submit synct synchronously (block and wait for it to commit)
15122- int r = db_was_opened_read_only || cct->_conf->bluestore_debug_omit_kv_commit ?
15129+ int r = cct->_conf->bluestore_debug_omit_kv_commit ?
1512315130 0 : db->submit_transaction_sync(synct);
1512415131 ceph_assert(r == 0);
1512515132
@@ -15504,7 +15511,7 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
1550415511 }
1550515512}
1550615513
15507- int BlueStore::_deferred_replay()
15514+ int BlueStore::_deferred_replay(std::vector<std::string>* keys_to_remove )
1550815515{
1550915516 dout(10) << __func__ << " start" << dendl;
1551015517 int count = 0;
@@ -15518,48 +15525,49 @@ int BlueStore::_deferred_replay()
1551815525 }
1551915526 );
1552015527 }
15521- CollectionRef ch = _get_collection(coll_t::meta());
15522- bool fake_ch = false;
15523- if (!ch) {
15524- // hmm, replaying initial mkfs?
15525- ch = static_cast<Collection*>(create_new_collection(coll_t::meta()).get());
15526- fake_ch = true;
15527- }
15528- OpSequencer *osr = static_cast<OpSequencer*>(ch->osr.get());
15528+ if (tracepoint_debug_deferred_replay_start) tracepoint_debug_deferred_replay_start();
15529+ IOContext ioctx(cct, nullptr);
1552915530 KeyValueDB::Iterator it = db->get_iterator(PREFIX_DEFERRED);
15530- for (it->lower_bound(string()); it->valid(); it->next(), ++count ) {
15531+ for (it->lower_bound(string()); it->valid(); /*iterator update outside*/ ) {
1553115532 dout(20) << __func__ << " replay " << pretty_binary_string(it->key())
1553215533 << dendl;
15533- bluestore_deferred_transaction_t *deferred_txn =
15534- new bluestore_deferred_transaction_t;
15534+ if (keys_to_remove) {
15535+ keys_to_remove->push_back(it->key());
15536+ }
15537+ bluestore_deferred_transaction_t deferred_txn;
1553515538 bufferlist bl = it->value();
1553615539 auto p = bl.cbegin();
1553715540 try {
15538- decode(*deferred_txn, p);
15541+ decode(deferred_txn, p);
15542+
15543+ bool has_some = _eliminate_outdated_deferred(&deferred_txn, bluefs_extents);
15544+ if (has_some) {
15545+ if (tracepoint_debug_deferred_replay_track) tracepoint_debug_deferred_replay_track(deferred_txn);
15546+ for (auto& op: deferred_txn.ops) {
15547+ for (auto& e : op.extents) {
15548+ bufferlist t;
15549+ op.data.splice(0, e.length, &t);
15550+ bdev->aio_write(e.offset, t, &ioctx, false);
15551+ }
15552+ }
15553+ }
1553915554 } catch (ceph::buffer::error& e) {
1554015555 derr << __func__ << " failed to decode deferred txn "
1554115556 << pretty_binary_string(it->key()) << dendl;
15542- delete deferred_txn;
1554315557 r = -EIO;
15544- goto out;
1554515558 }
15546- bool has_some = _eliminate_outdated_deferred(deferred_txn, bluefs_extents);
15547- if (has_some) {
15548- TransContext *txc = _txc_create(ch.get(), osr, nullptr);
15549- txc->deferred_txn = deferred_txn;
15550- txc->set_state(TransContext::STATE_KV_DONE);
15551- _txc_state_proc(txc);
15552- } else {
15553- delete deferred_txn;
15559+ // update loop iteration here, so we can inject action
15560+ ++count;
15561+ it->next();
15562+ if (ioctx.num_pending.load() > 100 || !it->valid()) {
15563+ dout(20) << __func__ << "submitting IO batch" << dendl;
15564+ bdev->aio_submit(&ioctx);
15565+ ioctx.aio_wait();
15566+ dout(20) << __func__ << "wait done" << dendl;
15567+ ioctx.release_running_aios();
1555415568 }
1555515569 }
15556- out:
15557- dout(20) << __func__ << " draining osr" << dendl;
15558- _osr_register_zombie(osr);
15559- _osr_drain_all();
15560- if (fake_ch) {
15561- new_coll_map.clear();
15562- }
15570+ if (tracepoint_debug_deferred_replay_end) tracepoint_debug_deferred_replay_end();
1556315571 dout(10) << __func__ << " completed " << count << " events" << dendl;
1556415572 return r;
1556515573}
0 commit comments