@@ -7668,9 +7668,19 @@ int BlueStore::_is_bluefs(bool create, bool* ret)
76687668* opens both DB and dependant super_meta, FreelistManager and allocator
76697669* in the proper order
76707670*/
7671- int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
7672- {
7673- dout(5) << __func__ << "::NCB::read_only=" << read_only << ", to_repair=" << to_repair << dendl;
7671+ int BlueStore::_open_db_and_around(
7672+ bool read_only,
7673+ bool to_repair,
7674+ bool apply_deferred,
7675+ bool remove_deferred)
7676+ {
7677+ dout(5) << __func__ << "read_only=" << read_only
7678+ << ", to_repair=" << to_repair
7679+ << ", deferred=" << (apply_deferred?"apply;":"noapply;")
7680+ << (remove_deferred?"remove":"noremove") << dendl;
7681+ ceph_assert(remove_deferred == false || apply_deferred == true);
7682+ ceph_assert(read_only == false || remove_deferred == false);
7683+ std::vector<std::string> keys_to_remove;
76747684 {
76757685 string type;
76767686 int r = read_meta("type", &type);
@@ -7730,6 +7740,11 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
77307740 if (bdev_label_multi) {
77317741 _main_bdev_label_try_reserve();
77327742 }
7743+ // This is the place where we can apply deferred writes
7744+ // without risk of some interaction with RocksDB allocating.
7745+ if (apply_deferred) {
7746+ _deferred_replay(remove_deferred ? &keys_to_remove : nullptr);
7747+ }
77337748
77347749 // Re-open in the proper mode(s).
77357750
@@ -7747,6 +7762,14 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
77477762 _post_init_alloc();
77487763 }
77497764
7765+ if (remove_deferred && !keys_to_remove.empty()) {
7766+ KeyValueDB::Transaction deferred_keys_remove_txn = db->get_transaction();
7767+ for (auto& s : keys_to_remove) {
7768+ deferred_keys_remove_txn->rmkey(PREFIX_DEFERRED, s);
7769+ }
7770+ db->submit_transaction_sync(deferred_keys_remove_txn);
7771+ }
7772+
77507773 // when function is called in repair mode (to_repair=true) we skip db->open()/create()
77517774 // we can't change bluestore allocation so no need to invlidate allocation-file
77527775 if (fm->is_null_manager() && !read_only && !to_repair) {
@@ -9081,7 +9104,7 @@ int BlueStore::mount_readonly()
90819104 }
90829105 });
90839106
9084- r = _deferred_replay();
9107+ r = _deferred_replay(nullptr );
90859108 if (r < 0) {
90869109 return r;
90879110 }
@@ -9217,8 +9240,7 @@ int BlueStore::_mount()
92179240 return -EINVAL;
92189241 }
92199242
9220- dout(5) << __func__ << "::NCB::calling open_db_and_around(read/write)" << dendl;
9221- int r = _open_db_and_around(false);
9243+ int r = _open_db_and_around(false, false, true, true);
92229244 if (r < 0) {
92239245 return r;
92249246 }
@@ -9256,11 +9278,6 @@ int BlueStore::_mount()
92569278 }
92579279 });
92589280
9259- r = _deferred_replay();
9260- if (r < 0) {
9261- return r;
9262- }
9263-
92649281 mempool_thread.init();
92659282
92669283 if ((!per_pool_stat_collection || per_pool_omap != OMAP_PER_PG) &&
@@ -10616,7 +10633,7 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
1061610633
1061710634 // in deep mode we need R/W write access to be able to replay deferred ops
1061810635 const bool read_only = !(repair || depth == FSCK_DEEP);
10619- int r = _open_db_and_around(read_only);
10636+ int r = _open_db_and_around(read_only, false, !read_only, !read_only );
1062010637 if (r < 0) {
1062110638 return r;
1062210639 }
@@ -10642,17 +10659,6 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
1064210659 mempool_thread.shutdown();
1064310660 _shutdown_cache();
1064410661 });
10645- // we need finisher and kv_{sync,finalize}_thread *just* for replay
10646- // enable in repair or deep mode modes only
10647- if (!read_only) {
10648- _kv_start();
10649- r = _deferred_replay();
10650- _kv_stop();
10651- }
10652-
10653- if (r < 0) {
10654- return r;
10655- }
1065610662 return _fsck_on_open(depth, repair);
1065710663}
1065810664
@@ -14700,6 +14706,7 @@ void BlueStore::_kv_sync_thread()
1470014706 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1470114707 std::unique_lock l{kv_lock};
1470214708 ceph_assert(!kv_sync_started);
14709+ ceph_assert(!db_was_opened_read_only);
1470314710 kv_sync_started = true;
1470414711 kv_cond.notify_all();
1470514712
@@ -15254,7 +15261,7 @@ void BlueStore::_deferred_aio_finish(OpSequencer *osr)
1525415261 }
1525515262}
1525615263
15257- int BlueStore::_deferred_replay()
15264+ int BlueStore::_deferred_replay(std::vector<std::string>* keys_to_remove )
1525815265{
1525915266 dout(10) << __func__ << " start" << dendl;
1526015267 int count = 0;
@@ -15270,50 +15277,46 @@ int BlueStore::_deferred_replay()
1527015277 }
1527115278 if (tracepoint_debug_deferred_replay_start) tracepoint_debug_deferred_replay_start();
1527215279 IOContext ioctx(cct, nullptr);
15273- KeyValueDB::Transaction t = db->get_transaction();
1527415280 KeyValueDB::Iterator it = db->get_iterator(PREFIX_DEFERRED);
15275- for (it->lower_bound(string()); it->valid(); it->next(), ++count ) {
15281+ for (it->lower_bound(string()); it->valid(); /*iterator update outside*/ ) {
1527615282 dout(20) << __func__ << " replay " << pretty_binary_string(it->key())
1527715283 << dendl;
15278- t->rmkey(PREFIX_DEFERRED, it->key());
15279- bluestore_deferred_transaction_t *deferred_txn =
15280- new bluestore_deferred_transaction_t;
15284+ if (keys_to_remove) {
15285+ keys_to_remove->push_back(it->key());
15286+ }
15287+ bluestore_deferred_transaction_t deferred_txn;
1528115288 bufferlist bl = it->value();
1528215289 auto p = bl.cbegin();
1528315290 try {
15284- decode(*deferred_txn, p);
15291+ decode(deferred_txn, p);
15292+
15293+ bool has_some = _eliminate_outdated_deferred(&deferred_txn, bluefs_extents);
15294+ if (has_some) {
15295+ if (tracepoint_debug_deferred_replay_track) tracepoint_debug_deferred_replay_track(deferred_txn);
15296+ for (auto& op: deferred_txn.ops) {
15297+ for (auto& e : op.extents) {
15298+ bufferlist t;
15299+ op.data.splice(0, e.length, &t);
15300+ bdev->aio_write(e.offset, t, &ioctx, false);
15301+ }
15302+ }
15303+ }
1528515304 } catch (ceph::buffer::error& e) {
1528615305 derr << __func__ << " failed to decode deferred txn "
1528715306 << pretty_binary_string(it->key()) << dendl;
15288- delete deferred_txn;
1528915307 r = -EIO;
15290- goto out;
1529115308 }
15292- bool has_some = _eliminate_outdated_deferred(deferred_txn, bluefs_extents);
15293- if (has_some) {
15294- if (tracepoint_debug_deferred_replay_track) tracepoint_debug_deferred_replay_track(*deferred_txn);
15295- for (auto& op: deferred_txn->ops) {
15296- for (auto& e : op.extents) {
15297- bufferlist t;
15298- op.data.splice(0, e.length, &t);
15299- bdev->aio_write(e.offset, t, &ioctx, false);
15300- }
15301- }
15302- } else {
15303- delete deferred_txn;
15309+ // update loop iteration here, so we can inject action
15310+ ++count;
15311+ it->next();
15312+ if (ioctx.num_pending.load() > 100 || !it->valid()) {
15313+ dout(20) << __func__ << "submitting IO batch" << dendl;
15314+ bdev->aio_submit(&ioctx);
15315+ ioctx.aio_wait();
15316+ dout(20) << __func__ << "wait done" << dendl;
15317+ ioctx.release_running_aios();
1530415318 }
1530515319 }
15306- out:
15307- bdev->aio_submit(&ioctx);
15308- dout(20) << __func__ << "waiting to complete IO" << dendl;
15309- ioctx.aio_wait();
15310- dout(20) << __func__ << "wait done" << dendl;
15311- if (!db_was_opened_read_only) {
15312- db->submit_transaction_sync(t);
15313- dout(20) << __func__ << "removed L keys" << dendl;
15314- } else {
15315- dout(10) << __func__ << "DB read-pnly, skipped L keys removal" << dendl;
15316- }
1531715320 if (tracepoint_debug_deferred_replay_end) tracepoint_debug_deferred_replay_end();
1531815321 dout(10) << __func__ << " completed " << count << " events" << dendl;
1531915322 return r;
0 commit comments