@@ -477,7 +477,9 @@ class PipelineExitBarrierI {
477477 // / Waits for exit barrier
478478 virtual std::optional<seastar::future<>> wait () = 0;
479479
480- // / Releases pipeline resources, after or without waiting
480+ // / Releases pipeline resources.
481+ // / If wait() has been called,
482+ // / must release after the wait future is resolved.
481483 virtual ~PipelineExitBarrierI () {}
482484};
483485
@@ -498,18 +500,24 @@ class PipelineHandle {
498500
499501 template <typename OpT, typename T>
500502 std::optional<seastar::future<>>
501- do_enter_maybe_sync (T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
503+ do_enter_maybe_sync (
504+ T &stage,
505+ typename T::BlockingEvent::template Trigger<OpT>&& t,
506+ PipelineExitBarrierI::Ref&& moved_barrier) {
507+ assert (!barrier);
502508 if constexpr (!T::is_enter_sync) {
503509 auto fut = t.maybe_record_blocking (stage.enter (t), stage);
504- return std::move (fut).then (
505- [this , t=std::move (t)](auto &&barrier_ref) {
506- exit ();
510+ return std::move (fut
511+ ).then ([this , t=std::move (t),
512+ moved_barrier=std::move (moved_barrier)](auto &&barrier_ref) {
513+ // destruct moved_barrier and unlock after entered
514+ assert (!barrier);
507515 barrier = std::move (barrier_ref);
508516 return seastar::now ();
509517 });
510518 } else {
511519 auto barrier_ref = stage.enter (t);
512- exit ();
520+ // destruct moved_barrier and unlock after entered
513521 barrier = std::move (barrier_ref);
514522 return std::nullopt ;
515523 }
@@ -520,10 +528,14 @@ class PipelineHandle {
520528 enter_maybe_sync (T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
521529 assert (stage.core == seastar::this_shard_id ());
522530 auto wait_fut = wait_barrier ();
531+ auto moved_barrier = std::move (barrier);
532+ barrier.reset ();
523533 if (wait_fut.has_value ()) {
524534 return wait_fut.value (
525- ).then ([this , &stage, t=std::move (t)]() mutable {
526- auto ret = do_enter_maybe_sync<OpT, T>(stage, std::move (t));
535+ ).then ([this , &stage, t=std::move (t),
536+ moved_barrier=std::move (moved_barrier)]() mutable {
537+ auto ret = do_enter_maybe_sync<OpT, T>(
538+ stage, std::move (t), std::move (moved_barrier));
527539 if constexpr (!T::is_enter_sync) {
528540 return std::move (ret.value ());
529541 } else {
@@ -532,7 +544,8 @@ class PipelineHandle {
532544 }
533545 });
534546 } else {
535- return do_enter_maybe_sync<OpT, T>(stage, std::move (t));
547+ return do_enter_maybe_sync<OpT, T>(
548+ stage, std::move (t), std::move (moved_barrier));
536549 }
537550 }
538551
@@ -581,8 +594,16 @@ class PipelineHandle {
581594 */
582595 seastar::future<> complete () {
583596 auto ret = wait_barrier ();
597+ auto moved_barrier = std::move (barrier);
584598 barrier.reset ();
585- return ret ? std::move (ret.value ()) : seastar::now ();
599+ if (ret) {
600+ return std::move (ret.value ()
601+ ).then ([moved_barrier=std::move (moved_barrier)] {
602+ // destruct moved_barrier and unlock after wait()
603+ });
604+ } else {
605+ return seastar::now ();
606+ }
586607 }
587608
588609 /* *
@@ -729,7 +750,8 @@ class OrderedConcurrentPhaseT : public PipelineStageIT<T> {
729750 phase->mutex .unlock ();
730751 });
731752 } else {
732- // wait() has been called
753+ // wait() has been called, must unlock
754+ // after the wait() future is resolved.
733755 phase->mutex .unlock ();
734756 }
735757 }
0 commit comments