@@ -396,36 +396,191 @@ OMapInnerNode::make_full_merge(omap_context_t oc, OMapNodeRef right)
396396}
397397
398398OMapInnerNode::make_balanced_ret
399- OMapInnerNode::make_balanced (omap_context_t oc, OMapNodeRef _right)
399+ OMapInnerNode::make_balanced (
400+ omap_context_t oc, OMapNodeRef _right, uint32_t pivot_idx)
400401{
401402 LOG_PREFIX (OMapInnerNode::make_balanced);
402403 DEBUGT (" l: {}, r: {}" , oc.t , *this , *_right);
403404 ceph_assert (_right->get_type () == TYPE);
404405 auto &right = *_right->cast <OMapInnerNode>();
405- auto pivot_idx = get_balance_pivot_idx (*this , right);
406- if (!pivot_idx) {
407- return make_balanced_ret (
408- interruptible::ready_future_marker{},
409- std::make_tuple (OMapNodeRef{}, OMapNodeRef{}, std::nullopt ));
410- }
411406 return oc.tm .alloc_extents <OMapInnerNode>(oc.t , oc.hint ,
412407 OMAP_INNER_BLOCK_SIZE, 2 )
413408 .si_then ([this , &right, pivot_idx, oc] (auto &&replacement_pair){
414409 auto replacement_left = replacement_pair.front ();
415410 auto replacement_right = replacement_pair.back ();
416- this ->balance_child_ptrs (oc.t , *this , right, * pivot_idx,
411+ this ->balance_child_ptrs (oc.t , *this , right, pivot_idx,
417412 *replacement_left, *replacement_right);
418413 return make_balanced_ret (
419414 interruptible::ready_future_marker{},
420415 std::make_tuple (replacement_left, replacement_right,
421- balance_into_new_nodes (*this , right, * pivot_idx,
416+ balance_into_new_nodes (*this , right, pivot_idx,
422417 *replacement_left, *replacement_right)));
423418 }).handle_error_interruptible (
424419 crimson::ct_error::enospc::assert_failure{" unexpected enospc" },
425420 make_balanced_iertr::pass_further{}
426421 );
427422}
428423
424+ OMapInnerNode::merge_entry_ret
425+ OMapInnerNode::do_merge (
426+ omap_context_t oc,
427+ internal_const_iterator_t liter,
428+ internal_const_iterator_t riter,
429+ OMapNodeRef l,
430+ OMapNodeRef r)
431+ {
432+ LOG_PREFIX (OMapInnerNode::do_merge);
433+ if (!is_mutable ()) {
434+ auto mut = oc.tm .get_mutable_extent (oc.t , this )->cast <OMapInnerNode>();
435+ auto mut_liter = mut->iter_idx (liter->get_offset ());
436+ auto mut_riter = mut->iter_idx (riter->get_offset ());
437+ return mut->do_merge (oc, mut_liter, mut_riter, l, r);
438+ }
439+ DEBUGT (" make_full_merge l {} r {} liter {} riter {}" ,
440+ oc.t , *l, *r, liter->get_key (), riter->get_key ());
441+ return l->make_full_merge (oc, r
442+ ).si_then ([liter=liter, riter=riter, l=l, r=r, oc, this , FNAME]
443+ (auto &&replacement) {
444+ DEBUGT (" to update parent: {}" , oc.t , *this );
445+ this ->update_child_ptr (
446+ liter.get_offset (),
447+ dynamic_cast <base_child_t *>(replacement.get ()));
448+ journal_inner_update (
449+ liter,
450+ replacement->get_laddr (),
451+ maybe_get_delta_buffer ());
452+ this ->remove_child_ptr (riter.get_offset ());
453+ journal_inner_remove (riter, maybe_get_delta_buffer ());
454+ // retire extent
455+ std::vector<laddr_t > dec_laddrs {l->get_laddr (), r->get_laddr ()};
456+ auto next = liter + 1 ;
457+ auto end = next == iter_cend () ? get_end () : next.get_key ();
458+ assert (end == r->get_end ());
459+ replacement->init_range (liter.get_key (), std::move (end));
460+ if (get_meta ().depth > 2 ) { // replacement is an inner node
461+ auto &rep = *replacement->template cast <OMapInnerNode>();
462+ rep.adjust_copy_src_dest_on_merge (
463+ oc.t ,
464+ *l->template cast <OMapInnerNode>(),
465+ *r->template cast <OMapInnerNode>());
466+ }
467+ return dec_ref (oc, dec_laddrs
468+ ).si_then ([this , oc, r=std::move (replacement)] {
469+ --(oc.t .get_omap_tree_stats ().extents_num_delta );
470+ if (extent_is_below_min ()) {
471+ return merge_entry_ret (
472+ interruptible::ready_future_marker{},
473+ mutation_result_t (mutation_status_t ::NEED_MERGE,
474+ std::nullopt , this ));
475+ } else {
476+ return merge_entry_ret (
477+ interruptible::ready_future_marker{},
478+ mutation_result_t (mutation_status_t ::SUCCESS,
479+ std::nullopt , std::nullopt ));
480+ }
481+ });
482+ });
483+ }
484+
485+ OMapInnerNode::merge_entry_ret
486+ OMapInnerNode::do_balance (
487+ omap_context_t oc,
488+ internal_const_iterator_t liter,
489+ internal_const_iterator_t riter,
490+ OMapNodeRef l,
491+ OMapNodeRef r)
492+ {
493+ LOG_PREFIX (OMapInnerNode::do_balance);
494+ std::optional<uint32_t > pivot_idx = 0 ;
495+ if (get_meta ().depth > 2 ) {
496+ pivot_idx = OMapInnerNode::get_balance_pivot_idx (
497+ static_cast <OMapInnerNode&>(*l), static_cast <OMapInnerNode&>(*r));
498+ } else {
499+ pivot_idx = OMapLeafNode::get_balance_pivot_idx (
500+ static_cast <OMapLeafNode&>(*l), static_cast <OMapLeafNode&>(*r));
501+ }
502+ if (!pivot_idx) {
503+ return merge_entry_ret (
504+ interruptible::ready_future_marker{},
505+ mutation_result_t (mutation_status_t ::SUCCESS,
506+ std::nullopt , std::nullopt ));
507+ }
508+ if (!is_mutable ()) {
509+ auto mut = oc.tm .get_mutable_extent (oc.t , this )->cast <OMapInnerNode>();
510+ auto mut_liter = mut->iter_idx (liter->get_offset ());
511+ auto mut_riter = mut->iter_idx (riter->get_offset ());
512+ return mut->do_balance (oc, mut_liter, mut_riter, l, r);
513+ }
514+ DEBUGT (" balanced l {} r {} liter {} riter {}" ,
515+ oc.t , *l, *r, liter->get_key (), riter->get_key ());
516+ return l->make_balanced (oc, r, *pivot_idx
517+ ).si_then ([FNAME, liter=liter, riter=riter, l=l, r=r, oc, this ](auto tuple) {
518+ auto [replacement_l, replacement_r, replacement_pivot] = tuple;
519+ replacement_l->init_range (l->get_begin (), replacement_pivot);
520+ replacement_r->init_range (replacement_pivot, r->get_end ());
521+ DEBUGT (" to update parent: {} {} {}" ,
522+ oc.t , *this , *replacement_l, *replacement_r);
523+ if (get_meta ().depth > 2 ) { // l and r are inner nodes
524+ auto &left = *l->template cast <OMapInnerNode>();
525+ auto &right = *r->template cast <OMapInnerNode>();
526+ auto &rep_left = *replacement_l->template cast <OMapInnerNode>();
527+ auto &rep_right = *replacement_r->template cast <OMapInnerNode>();
528+ this ->adjust_copy_src_dest_on_balance (
529+ oc.t , left, right, true , rep_left, rep_right);
530+ }
531+
532+ // update operation will not cuase node overflow, so we can do it first
533+ this ->update_child_ptr (
534+ liter.get_offset (),
535+ dynamic_cast <base_child_t *>(replacement_l.get ()));
536+ journal_inner_update (
537+ liter,
538+ replacement_l->get_laddr (),
539+ maybe_get_delta_buffer ());
540+ bool overflow = extent_will_overflow (replacement_pivot.size (),
541+ std::nullopt );
542+ if (!overflow) {
543+ this ->update_child_ptr (
544+ riter.get_offset (),
545+ dynamic_cast <base_child_t *>(replacement_r.get ()));
546+ journal_inner_remove (riter, maybe_get_delta_buffer ());
547+ journal_inner_insert (
548+ riter,
549+ replacement_r->get_laddr (),
550+ replacement_pivot,
551+ maybe_get_delta_buffer ());
552+ std::vector<laddr_t > dec_laddrs{l->get_laddr (), r->get_laddr ()};
553+ return dec_ref (oc, dec_laddrs
554+ ).si_then ([] {
555+ return merge_entry_ret (
556+ interruptible::ready_future_marker{},
557+ mutation_result_t (mutation_status_t ::SUCCESS,
558+ std::nullopt , std::nullopt ));
559+ });
560+ } else {
561+ DEBUGT (" balanced and split {} r {} riter {}" ,
562+ oc.t , *l, *r, riter.get_key ());
563+ // use remove and insert to instead of replace,
564+ // remove operation will not cause node split, so we can do it first
565+ this ->remove_child_ptr (riter.get_offset ());
566+ journal_inner_remove (riter, maybe_get_delta_buffer ());
567+ return make_split_insert (
568+ oc, riter, replacement_pivot, replacement_r
569+ ).si_then ([this , oc, l = l, r = r](auto mresult) {
570+ std::vector<laddr_t > dec_laddrs{
571+ l->get_laddr (),
572+ r->get_laddr (),
573+ get_laddr ()};
574+ return dec_ref (oc, dec_laddrs
575+ ).si_then ([mresult = std::move (mresult)] {
576+ return merge_entry_ret (
577+ interruptible::ready_future_marker{}, mresult);
578+ });
579+ });
580+ }
581+ });
582+ }
583+
429584OMapInnerNode::merge_entry_ret
430585OMapInnerNode::merge_entry (
431586 omap_context_t oc,
@@ -434,147 +589,22 @@ OMapInnerNode::merge_entry(
434589{
435590 LOG_PREFIX (OMapInnerNode::merge_entry);
436591 DEBUGT (" {}, parent: {}" , oc.t , *entry, *this );
437- if (!is_mutable ()) {
438- auto mut = oc.tm .get_mutable_extent (oc.t , this )->cast <OMapInnerNode>();
439- auto mut_iter = mut->iter_idx (iter->get_offset ());
440- return mut->merge_entry (oc, mut_iter, entry);
441- }
442592 auto is_left = (iter + 1 ) == iter_cend ();
443593 auto donor_iter = is_left ? iter - 1 : iter + 1 ;
444594 return get_child_node (oc, donor_iter
445595 ).si_then ([=, this ](auto &&donor) mutable {
446596 ceph_assert (!donor->is_btree_root ());
447- LOG_PREFIX (OMapInnerNode::merge_entry);
448597 auto [l, r] = is_left ?
449598 std::make_pair (donor, entry) : std::make_pair (entry, donor);
450599 auto [liter, riter] = is_left ?
451600 std::make_pair (donor_iter, iter) : std::make_pair (iter, donor_iter);
452601 if (l->can_merge (r)) {
453- DEBUGT (" make_full_merge l {} r {} liter {} riter {}" ,
454- oc.t , *l, *r, liter->get_key (), riter->get_key ());
455602 assert (entry->extent_is_below_min ());
456- return l->make_full_merge (oc, r
457- ).si_then ([liter=liter, riter=riter, l=l, r=r, oc, this ]
458- (auto &&replacement) {
459- LOG_PREFIX (OMapInnerNode::merge_entry);
460- DEBUGT (" to update parent: {}" , oc.t , *this );
461- this ->update_child_ptr (
462- liter.get_offset (),
463- dynamic_cast <base_child_t *>(replacement.get ()));
464- journal_inner_update (
465- liter,
466- replacement->get_laddr (),
467- maybe_get_delta_buffer ());
468- this ->remove_child_ptr (riter.get_offset ());
469- journal_inner_remove (riter, maybe_get_delta_buffer ());
470- // retire extent
471- std::vector<laddr_t > dec_laddrs {l->get_laddr (), r->get_laddr ()};
472- auto next = liter + 1 ;
473- auto end = next == iter_cend () ? get_end () : next.get_key ();
474- assert (end == r->get_end ());
475- replacement->init_range (liter.get_key (), std::move (end));
476- if (get_meta ().depth > 2 ) { // replacement is an inner node
477- auto &rep = *replacement->template cast <OMapInnerNode>();
478- rep.adjust_copy_src_dest_on_merge (
479- oc.t ,
480- *l->template cast <OMapInnerNode>(),
481- *r->template cast <OMapInnerNode>());
482- }
483- return dec_ref (oc, dec_laddrs
484- ).si_then ([this , oc, r=std::move (replacement)] {
485- --(oc.t .get_omap_tree_stats ().extents_num_delta );
486- if (extent_is_below_min ()) {
487- return merge_entry_ret (
488- interruptible::ready_future_marker{},
489- mutation_result_t (mutation_status_t ::NEED_MERGE,
490- std::nullopt , this ));
491- } else {
492- return merge_entry_ret (
493- interruptible::ready_future_marker{},
494- mutation_result_t (mutation_status_t ::SUCCESS,
495- std::nullopt , std::nullopt ));
496- }
497- });
498- });
603+ return do_merge (oc, liter, riter, l, r);
499604 } else { // !l->can_merge(r)
500- DEBUGT (" balanced l {} r {} liter {} riter {}" ,
501- oc.t , *l, *r, liter->get_key (), riter->get_key ());
502- return l->make_balanced (oc, r
503- ).si_then ([liter=liter, riter=riter, l=l, r=r, oc, this ](auto tuple) {
504- LOG_PREFIX (OMapInnerNode::merge_entry);
505- auto [replacement_l, replacement_r, replacement_pivot] = tuple;
506- if (!replacement_pivot) {
507- return merge_entry_ret (
508- interruptible::ready_future_marker{},
509- mutation_result_t (mutation_status_t ::SUCCESS,
510- std::nullopt , std::nullopt ));
511- }
512- replacement_l->init_range (l->get_begin (), *replacement_pivot);
513- replacement_r->init_range (*replacement_pivot, r->get_end ());
514- DEBUGT (" to update parent: {} {} {}" ,
515- oc.t , *this , *replacement_l, *replacement_r);
516- if (get_meta ().depth > 2 ) { // l and r are inner nodes
517- auto &left = *l->template cast <OMapInnerNode>();
518- auto &right = *r->template cast <OMapInnerNode>();
519- auto &rep_left = *replacement_l->template cast <OMapInnerNode>();
520- auto &rep_right = *replacement_r->template cast <OMapInnerNode>();
521- this ->adjust_copy_src_dest_on_balance (
522- oc.t , left, right, true , rep_left, rep_right);
523- }
524-
525- // update operation will not cuase node overflow, so we can do it first
526- this ->update_child_ptr (
527- liter.get_offset (),
528- dynamic_cast <base_child_t *>(replacement_l.get ()));
529- journal_inner_update (
530- liter,
531- replacement_l->get_laddr (),
532- maybe_get_delta_buffer ());
533- bool overflow = extent_will_overflow (replacement_pivot->size (),
534- std::nullopt );
535- if (!overflow) {
536- this ->update_child_ptr (
537- riter.get_offset (),
538- dynamic_cast <base_child_t *>(replacement_r.get ()));
539- journal_inner_remove (riter, maybe_get_delta_buffer ());
540- journal_inner_insert (
541- riter,
542- replacement_r->get_laddr (),
543- *replacement_pivot,
544- maybe_get_delta_buffer ());
545- std::vector<laddr_t > dec_laddrs{l->get_laddr (), r->get_laddr ()};
546- return dec_ref (oc, dec_laddrs
547- ).si_then ([] {
548- return merge_entry_ret (
549- interruptible::ready_future_marker{},
550- mutation_result_t (mutation_status_t ::SUCCESS,
551- std::nullopt , std::nullopt ));
552- });
553- } else {
554- DEBUGT (" balanced and split {} r {} riter {}" ,
555- oc.t , *l, *r, riter.get_key ());
556- // use remove and insert to instead of replace,
557- // remove operation will not cause node split, so we can do it first
558- this ->remove_child_ptr (riter.get_offset ());
559- journal_inner_remove (riter, maybe_get_delta_buffer ());
560- return make_split_insert (
561- oc, riter, *replacement_pivot, replacement_r
562- ).si_then ([this , oc, l = l, r = r](auto mresult) {
563- std::vector<laddr_t > dec_laddrs{
564- l->get_laddr (),
565- r->get_laddr (),
566- get_laddr ()};
567- return dec_ref (oc, dec_laddrs
568- ).si_then ([mresult = std::move (mresult)] {
569- return merge_entry_ret (
570- interruptible::ready_future_marker{}, mresult);
571- });
572- });
573- }
574- });
605+ return do_balance (oc, liter, riter, l, r);
575606 }
576607 });
577-
578608}
579609
580610OMapInnerNode::internal_const_iterator_t
@@ -805,13 +835,14 @@ OMapLeafNode::make_full_merge(omap_context_t oc, OMapNodeRef right)
805835}
806836
807837OMapLeafNode::make_balanced_ret
808- OMapLeafNode::make_balanced (omap_context_t oc, OMapNodeRef _right)
838+ OMapLeafNode::make_balanced (
839+ omap_context_t oc, OMapNodeRef _right, uint32_t pivot_idx)
809840{
810841 ceph_assert (_right->get_type () == TYPE);
811842 LOG_PREFIX (OMapLeafNode::make_balanced);
812843 DEBUGT (" this: {}" , oc.t , *this );
813844 return oc.tm .alloc_extents <OMapLeafNode>(oc.t , oc.hint , get_len (), 2 )
814- .si_then ([this , _right] (auto &&replacement_pair) {
845+ .si_then ([this , _right, pivot_idx ] (auto &&replacement_pair) {
815846 auto replacement_left = replacement_pair.front ();
816847 auto replacement_right = replacement_pair.back ();
817848 auto &right = *_right->cast <OMapLeafNode>();
@@ -820,7 +851,7 @@ OMapLeafNode::make_balanced(omap_context_t oc, OMapNodeRef _right)
820851 std::make_tuple (
821852 replacement_left, replacement_right,
822853 balance_into_new_nodes (
823- *this , right,
854+ *this , right, pivot_idx,
824855 *replacement_left, *replacement_right)));
825856 }).handle_error_interruptible (
826857 crimson::ct_error::enospc::assert_failure{" unexpected enospc" },
0 commit comments