@@ -426,6 +426,146 @@ OMapInnerNode::make_balanced(omap_context_t oc, OMapNodeRef _right)
426426 );
427427}
428428
429+ OMapInnerNode::merge_entry_ret
430+ OMapInnerNode::do_merge (
431+ omap_context_t oc,
432+ internal_const_iterator_t liter,
433+ internal_const_iterator_t riter,
434+ OMapNodeRef l,
435+ OMapNodeRef r)
436+ {
437+ LOG_PREFIX (OMapInnerNode::do_merge);
438+ DEBUGT (" make_full_merge l {} r {} liter {} riter {}" ,
439+ oc.t , *l, *r, liter->get_key (), riter->get_key ());
440+ return l->make_full_merge (oc, r
441+ ).si_then ([liter=liter, riter=riter, l=l, r=r, oc, this , FNAME]
442+ (auto &&replacement) {
443+ DEBUGT (" to update parent: {}" , oc.t , *this );
444+ this ->update_child_ptr (
445+ liter.get_offset (),
446+ dynamic_cast <base_child_t *>(replacement.get ()));
447+ journal_inner_update (
448+ liter,
449+ replacement->get_laddr (),
450+ maybe_get_delta_buffer ());
451+ this ->remove_child_ptr (riter.get_offset ());
452+ journal_inner_remove (riter, maybe_get_delta_buffer ());
453+ // retire extent
454+ std::vector<laddr_t > dec_laddrs {l->get_laddr (), r->get_laddr ()};
455+ auto next = liter + 1 ;
456+ auto end = next == iter_cend () ? get_end () : next.get_key ();
457+ assert (end == r->get_end ());
458+ replacement->init_range (liter.get_key (), std::move (end));
459+ if (get_meta ().depth > 2 ) { // replacement is an inner node
460+ auto &rep = *replacement->template cast <OMapInnerNode>();
461+ rep.adjust_copy_src_dest_on_merge (
462+ oc.t ,
463+ *l->template cast <OMapInnerNode>(),
464+ *r->template cast <OMapInnerNode>());
465+ }
466+ return dec_ref (oc, dec_laddrs
467+ ).si_then ([this , oc, r=std::move (replacement)] {
468+ --(oc.t .get_omap_tree_stats ().extents_num_delta );
469+ if (extent_is_below_min ()) {
470+ return merge_entry_ret (
471+ interruptible::ready_future_marker{},
472+ mutation_result_t (mutation_status_t ::NEED_MERGE,
473+ std::nullopt , this ));
474+ } else {
475+ return merge_entry_ret (
476+ interruptible::ready_future_marker{},
477+ mutation_result_t (mutation_status_t ::SUCCESS,
478+ std::nullopt , std::nullopt ));
479+ }
480+ });
481+ });
482+ }
483+
484+ OMapInnerNode::merge_entry_ret
485+ OMapInnerNode::do_balance (
486+ omap_context_t oc,
487+ internal_const_iterator_t liter,
488+ internal_const_iterator_t riter,
489+ OMapNodeRef l,
490+ OMapNodeRef r)
491+ {
492+ LOG_PREFIX (OMapInnerNode::do_balance);
493+ DEBUGT (" balanced l {} r {} liter {} riter {}" ,
494+ oc.t , *l, *r, liter->get_key (), riter->get_key ());
495+ return l->make_balanced (oc, r
496+ ).si_then ([FNAME, liter=liter, riter=riter, l=l, r=r, oc, this ](auto tuple) {
497+ auto [replacement_l, replacement_r, replacement_pivot] = tuple;
498+ if (!replacement_pivot) {
499+ return merge_entry_ret (
500+ interruptible::ready_future_marker{},
501+ mutation_result_t (mutation_status_t ::SUCCESS,
502+ std::nullopt , std::nullopt ));
503+ }
504+ replacement_l->init_range (l->get_begin (), *replacement_pivot);
505+ replacement_r->init_range (*replacement_pivot, r->get_end ());
506+ DEBUGT (" to update parent: {} {} {}" ,
507+ oc.t , *this , *replacement_l, *replacement_r);
508+ if (get_meta ().depth > 2 ) { // l and r are inner nodes
509+ auto &left = *l->template cast <OMapInnerNode>();
510+ auto &right = *r->template cast <OMapInnerNode>();
511+ auto &rep_left = *replacement_l->template cast <OMapInnerNode>();
512+ auto &rep_right = *replacement_r->template cast <OMapInnerNode>();
513+ this ->adjust_copy_src_dest_on_balance (
514+ oc.t , left, right, true , rep_left, rep_right);
515+ }
516+
517+ // update operation will not cuase node overflow, so we can do it first
518+ this ->update_child_ptr (
519+ liter.get_offset (),
520+ dynamic_cast <base_child_t *>(replacement_l.get ()));
521+ journal_inner_update (
522+ liter,
523+ replacement_l->get_laddr (),
524+ maybe_get_delta_buffer ());
525+ bool overflow = extent_will_overflow (replacement_pivot->size (),
526+ std::nullopt );
527+ if (!overflow) {
528+ this ->update_child_ptr (
529+ riter.get_offset (),
530+ dynamic_cast <base_child_t *>(replacement_r.get ()));
531+ journal_inner_remove (riter, maybe_get_delta_buffer ());
532+ journal_inner_insert (
533+ riter,
534+ replacement_r->get_laddr (),
535+ *replacement_pivot,
536+ maybe_get_delta_buffer ());
537+ std::vector<laddr_t > dec_laddrs{l->get_laddr (), r->get_laddr ()};
538+ return dec_ref (oc, dec_laddrs
539+ ).si_then ([] {
540+ return merge_entry_ret (
541+ interruptible::ready_future_marker{},
542+ mutation_result_t (mutation_status_t ::SUCCESS,
543+ std::nullopt , std::nullopt ));
544+ });
545+ } else {
546+ DEBUGT (" balanced and split {} r {} riter {}" ,
547+ oc.t , *l, *r, riter.get_key ());
548+ // use remove and insert to instead of replace,
549+ // remove operation will not cause node split, so we can do it first
550+ this ->remove_child_ptr (riter.get_offset ());
551+ journal_inner_remove (riter, maybe_get_delta_buffer ());
552+ return make_split_insert (
553+ oc, riter, *replacement_pivot, replacement_r
554+ ).si_then ([this , oc, l = l, r = r](auto mresult) {
555+ std::vector<laddr_t > dec_laddrs{
556+ l->get_laddr (),
557+ r->get_laddr (),
558+ get_laddr ()};
559+ return dec_ref (oc, dec_laddrs
560+ ).si_then ([mresult = std::move (mresult)] {
561+ return merge_entry_ret (
562+ interruptible::ready_future_marker{}, mresult);
563+ });
564+ });
565+ }
566+ });
567+ }
568+
429569OMapInnerNode::merge_entry_ret
430570OMapInnerNode::merge_entry (
431571 omap_context_t oc,
@@ -450,131 +590,12 @@ OMapInnerNode::merge_entry(
450590 auto [liter, riter] = is_left ?
451591 std::make_pair (donor_iter, iter) : std::make_pair (iter, donor_iter);
452592 if (l->can_merge (r)) {
453- DEBUGT (" make_full_merge l {} r {} liter {} riter {}" ,
454- oc.t , *l, *r, liter->get_key (), riter->get_key ());
455593 assert (entry->extent_is_below_min ());
456- return l->make_full_merge (oc, r
457- ).si_then ([liter=liter, riter=riter, l=l, r=r, oc, this ]
458- (auto &&replacement) {
459- LOG_PREFIX (OMapInnerNode::merge_entry);
460- DEBUGT (" to update parent: {}" , oc.t , *this );
461- this ->update_child_ptr (
462- liter.get_offset (),
463- dynamic_cast <base_child_t *>(replacement.get ()));
464- journal_inner_update (
465- liter,
466- replacement->get_laddr (),
467- maybe_get_delta_buffer ());
468- this ->remove_child_ptr (riter.get_offset ());
469- journal_inner_remove (riter, maybe_get_delta_buffer ());
470- // retire extent
471- std::vector<laddr_t > dec_laddrs {l->get_laddr (), r->get_laddr ()};
472- auto next = liter + 1 ;
473- auto end = next == iter_cend () ? get_end () : next.get_key ();
474- assert (end == r->get_end ());
475- replacement->init_range (liter.get_key (), std::move (end));
476- if (get_meta ().depth > 2 ) { // replacement is an inner node
477- auto &rep = *replacement->template cast <OMapInnerNode>();
478- rep.adjust_copy_src_dest_on_merge (
479- oc.t ,
480- *l->template cast <OMapInnerNode>(),
481- *r->template cast <OMapInnerNode>());
482- }
483- return dec_ref (oc, dec_laddrs
484- ).si_then ([this , oc, r=std::move (replacement)] {
485- --(oc.t .get_omap_tree_stats ().extents_num_delta );
486- if (extent_is_below_min ()) {
487- return merge_entry_ret (
488- interruptible::ready_future_marker{},
489- mutation_result_t (mutation_status_t ::NEED_MERGE,
490- std::nullopt , this ));
491- } else {
492- return merge_entry_ret (
493- interruptible::ready_future_marker{},
494- mutation_result_t (mutation_status_t ::SUCCESS,
495- std::nullopt , std::nullopt ));
496- }
497- });
498- });
594+ return do_merge (oc, liter, riter, l, r);
499595 } else { // !l->can_merge(r)
500- DEBUGT (" balanced l {} r {} liter {} riter {}" ,
501- oc.t , *l, *r, liter->get_key (), riter->get_key ());
502- return l->make_balanced (oc, r
503- ).si_then ([liter=liter, riter=riter, l=l, r=r, oc, this ](auto tuple) {
504- LOG_PREFIX (OMapInnerNode::merge_entry);
505- auto [replacement_l, replacement_r, replacement_pivot] = tuple;
506- if (!replacement_pivot) {
507- return merge_entry_ret (
508- interruptible::ready_future_marker{},
509- mutation_result_t (mutation_status_t ::SUCCESS,
510- std::nullopt , std::nullopt ));
511- }
512- replacement_l->init_range (l->get_begin (), *replacement_pivot);
513- replacement_r->init_range (*replacement_pivot, r->get_end ());
514- DEBUGT (" to update parent: {} {} {}" ,
515- oc.t , *this , *replacement_l, *replacement_r);
516- if (get_meta ().depth > 2 ) { // l and r are inner nodes
517- auto &left = *l->template cast <OMapInnerNode>();
518- auto &right = *r->template cast <OMapInnerNode>();
519- auto &rep_left = *replacement_l->template cast <OMapInnerNode>();
520- auto &rep_right = *replacement_r->template cast <OMapInnerNode>();
521- this ->adjust_copy_src_dest_on_balance (
522- oc.t , left, right, true , rep_left, rep_right);
523- }
524-
525- // update operation will not cuase node overflow, so we can do it first
526- this ->update_child_ptr (
527- liter.get_offset (),
528- dynamic_cast <base_child_t *>(replacement_l.get ()));
529- journal_inner_update (
530- liter,
531- replacement_l->get_laddr (),
532- maybe_get_delta_buffer ());
533- bool overflow = extent_will_overflow (replacement_pivot->size (),
534- std::nullopt );
535- if (!overflow) {
536- this ->update_child_ptr (
537- riter.get_offset (),
538- dynamic_cast <base_child_t *>(replacement_r.get ()));
539- journal_inner_remove (riter, maybe_get_delta_buffer ());
540- journal_inner_insert (
541- riter,
542- replacement_r->get_laddr (),
543- *replacement_pivot,
544- maybe_get_delta_buffer ());
545- std::vector<laddr_t > dec_laddrs{l->get_laddr (), r->get_laddr ()};
546- return dec_ref (oc, dec_laddrs
547- ).si_then ([] {
548- return merge_entry_ret (
549- interruptible::ready_future_marker{},
550- mutation_result_t (mutation_status_t ::SUCCESS,
551- std::nullopt , std::nullopt ));
552- });
553- } else {
554- DEBUGT (" balanced and split {} r {} riter {}" ,
555- oc.t , *l, *r, riter.get_key ());
556- // use remove and insert to instead of replace,
557- // remove operation will not cause node split, so we can do it first
558- this ->remove_child_ptr (riter.get_offset ());
559- journal_inner_remove (riter, maybe_get_delta_buffer ());
560- return make_split_insert (
561- oc, riter, *replacement_pivot, replacement_r
562- ).si_then ([this , oc, l = l, r = r](auto mresult) {
563- std::vector<laddr_t > dec_laddrs{
564- l->get_laddr (),
565- r->get_laddr (),
566- get_laddr ()};
567- return dec_ref (oc, dec_laddrs
568- ).si_then ([mresult = std::move (mresult)] {
569- return merge_entry_ret (
570- interruptible::ready_future_marker{}, mresult);
571- });
572- });
573- }
574- });
596+ return do_balance (oc, liter, riter, l, r);
575597 }
576598 });
577-
578599}
579600
580601OMapInnerNode::internal_const_iterator_t
0 commit comments