@@ -477,12 +477,9 @@ class PipelineExitBarrierI {
477477 // / Waits for exit barrier
478478 virtual std::optional<seastar::future<>> wait () = 0;
479479
480- // / Releases pipeline resources, after or without waiting
481- // FIXME: currently, exit() will discard the associated future even if it is
482- // still unresolved, which is discouraged by seastar.
483- virtual void exit () = 0;
484-
485- // / Must ensure that resources are released, likely by calling exit()
480+ // / Releases pipeline resources.
481+ // / If wait() has been called,
482+ // / must release after the wait future is resolved.
486483 virtual ~PipelineExitBarrierI () {}
487484};
488485
@@ -492,11 +489,6 @@ class PipelineStageIT : public BlockerT<T> {
492489#ifndef NDEBUG
493490 const core_id_t core = seastar::this_shard_id();
494491#endif
495-
496- template <class ... Args>
497- decltype (auto ) enter(Args&&... args) {
498- return static_cast <T*>(this )->enter (std::forward<Args>(args)...);
499- }
500492};
501493
502494class PipelineHandle {
@@ -506,6 +498,57 @@ class PipelineHandle {
506498 return barrier ? barrier->wait () : std::nullopt ;
507499 }
508500
501+ template <typename OpT, typename T>
502+ std::optional<seastar::future<>>
503+ do_enter_maybe_sync (
504+ T &stage,
505+ typename T::BlockingEvent::template Trigger<OpT>&& t,
506+ PipelineExitBarrierI::Ref&& moved_barrier) {
507+ assert (!barrier);
508+ if constexpr (!T::is_enter_sync) {
509+ auto fut = t.maybe_record_blocking (stage.enter (t), stage);
510+ return std::move (fut
511+ ).then ([this , t=std::move (t),
512+ moved_barrier=std::move (moved_barrier)](auto &&barrier_ref) {
513+ // destruct moved_barrier and unlock after entered
514+ assert (!barrier);
515+ barrier = std::move (barrier_ref);
516+ return seastar::now ();
517+ });
518+ } else {
519+ auto barrier_ref = stage.enter (t);
520+ // destruct moved_barrier and unlock after entered
521+ barrier = std::move (barrier_ref);
522+ return std::nullopt ;
523+ }
524+ }
525+
526+ template <typename OpT, typename T>
527+ std::optional<seastar::future<>>
528+ enter_maybe_sync (T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
529+ assert (stage.core == seastar::this_shard_id ());
530+ auto wait_fut = wait_barrier ();
531+ auto moved_barrier = std::move (barrier);
532+ barrier.reset ();
533+ if (wait_fut.has_value ()) {
534+ return wait_fut.value (
535+ ).then ([this , &stage, t=std::move (t),
536+ moved_barrier=std::move (moved_barrier)]() mutable {
537+ auto ret = do_enter_maybe_sync<OpT, T>(
538+ stage, std::move (t), std::move (moved_barrier));
539+ if constexpr (!T::is_enter_sync) {
540+ return std::move (ret.value ());
541+ } else {
542+ assert (ret == std::nullopt );
543+ return seastar::now ();
544+ }
545+ });
546+ } else {
547+ return do_enter_maybe_sync<OpT, T>(
548+ stage, std::move (t), std::move (moved_barrier));
549+ }
550+ }
551+
509552public:
510553 PipelineHandle () = default ;
511554
@@ -523,36 +566,44 @@ class PipelineHandle {
523566 template <typename OpT, typename T>
524567 seastar::future<>
525568 enter (T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
526- assert (stage.core == seastar::this_shard_id ());
527- auto wait_fut = wait_barrier ();
528- if (wait_fut.has_value ()) {
529- return wait_fut.value ().then ([this , &stage, t=std::move (t)] () mutable {
530- auto fut = t.maybe_record_blocking (stage.enter (t), stage);
531- return std::move (fut).then (
532- [this , t=std::move (t)](auto &&barrier_ref) mutable {
533- exit ();
534- barrier = std::move (barrier_ref);
535- return seastar::now ();
536- });
537- });
569+ auto ret = enter_maybe_sync<OpT, T>(stage, std::move (t));
570+ if (ret.has_value ()) {
571+ return std::move (ret.value ());
538572 } else {
539- auto fut = t.maybe_record_blocking (stage.enter (t), stage);
540- return std::move (fut).then (
541- [this , t=std::move (t)](auto &&barrier_ref) mutable {
542- exit ();
543- barrier = std::move (barrier_ref);
544- return seastar::now ();
545- });
573+ return seastar::now ();
546574 }
547575 }
548576
577+ /* *
578+ * Synchronously leaves the previous stage and enters the next stage.
579+ * Required for the use case which needs ordering upon entering an
580+ * ordered concurrent phase.
581+ */
582+ template <typename OpT, typename T>
583+ void
584+ enter_sync (T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
585+ static_assert (T::is_enter_sync);
586+ auto ret = enter_maybe_sync<OpT, T>(stage, std::move (t));
587+ // Expect that barrier->wait() (leaving the previous stage)
588+ // also returns nullopt, see enter_maybe_sync() above
589+ ceph_assert (!ret.has_value ());
590+ }
591+
549592 /* *
550593 * Completes pending exit barrier without entering a new one.
551594 */
552595 seastar::future<> complete () {
553596 auto ret = wait_barrier ();
597+ auto moved_barrier = std::move (barrier);
554598 barrier.reset ();
555- return ret ? std::move (ret.value ()) : seastar::now ();
599+ if (ret) {
600+ return std::move (ret.value ()
601+ ).then ([moved_barrier=std::move (moved_barrier)] {
602+ // destruct moved_barrier and unlock after wait()
603+ });
604+ } else {
605+ return seastar::now ();
606+ }
556607 }
557608
558609 /* *
@@ -592,16 +643,10 @@ class OrderedExclusivePhaseT : public PipelineStageIT<T> {
592643 return std::nullopt ;
593644 }
594645
595- void exit () final {
596- if (phase) {
597- assert (phase->core == seastar::this_shard_id ());
598- phase->exit (op_id);
599- phase = nullptr ;
600- }
601- }
602-
603646 ~ExitBarrier () final {
604- exit ();
647+ assert (phase);
648+ assert (phase->core == seastar::this_shard_id ());
649+ phase->exit (op_id);
605650 }
606651 };
607652
@@ -611,6 +656,8 @@ class OrderedExclusivePhaseT : public PipelineStageIT<T> {
611656 }
612657
613658public:
659+ static constexpr bool is_enter_sync = false ;
660+
614661 template <class TriggerT >
615662 seastar::future<PipelineExitBarrierI::Ref> enter (TriggerT& t) {
616663 waiting++;
@@ -690,33 +737,32 @@ class OrderedConcurrentPhaseT : public PipelineStageIT<T> {
690737 return trigger.maybe_record_exit_barrier (std::move (ret));
691738 }
692739
693- void exit () final {
740+ ~ExitBarrier () final {
741+ assert (phase);
742+ assert (phase->core == seastar::this_shard_id ());
694743 if (barrier) {
695- assert (phase);
696- assert (phase->core == seastar::this_shard_id ());
744+ // wait() hasn't been called
745+
746+ // FIXME: should not discard future,
747+ // it's discouraged by seastar and may cause shutdown issues.
697748 std::ignore = std::move (*barrier
698749 ).then ([phase=this ->phase ] {
699750 phase->mutex .unlock ();
700751 });
701- barrier = std::nullopt ;
702- phase = nullptr ;
703- } else if (phase) {
704- assert (phase->core == seastar::this_shard_id ());
752+ } else {
753+ // wait() has been called, must unlock
754+ // after the wait() future is resolved.
705755 phase->mutex .unlock ();
706- phase = nullptr ;
707756 }
708757 }
709-
710- ~ExitBarrier () final {
711- exit ();
712- }
713758 };
714759
715760public:
761+ static constexpr bool is_enter_sync = true ;
762+
716763 template <class TriggerT >
717- seastar::future<PipelineExitBarrierI::Ref> enter (TriggerT& t) {
718- return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
719- new ExitBarrier<TriggerT>{*this , mutex.lock (), t});
764+ PipelineExitBarrierI::Ref enter (TriggerT& t) {
765+ return std::make_unique<ExitBarrier<TriggerT>>(*this , mutex.lock (), t);
720766 }
721767
722768private:
@@ -740,16 +786,15 @@ class UnorderedStageT : public PipelineStageIT<T> {
740786 return std::nullopt ;
741787 }
742788
743- void exit () final {}
744-
745789 ~ExitBarrier () final {}
746790 };
747791
748792public:
749- template <class ... IgnoreArgs>
750- seastar::future<PipelineExitBarrierI::Ref> enter (IgnoreArgs&&...) {
751- return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
752- new ExitBarrier);
793+ static constexpr bool is_enter_sync = true ;
794+
795+ template <class TriggerT >
796+ PipelineExitBarrierI::Ref enter (TriggerT&) {
797+ return std::make_unique<ExitBarrier>();
753798 }
754799};
755800
0 commit comments