@@ -146,13 +146,63 @@ class C_MDL_WriteError : public MDSIOContextBase {
146146};
147147
148148
149+ class C_MDL_WriteHead : public MDSIOContextBase {
150+ public:
151+ explicit C_MDL_WriteHead (MDLog* m)
152+ : MDSIOContextBase(true )
153+ , mdlog(m)
154+ {}
155+ void print (ostream& out) const override {
156+ out << " mdlog_write_head" ;
157+ }
158+ protected:
159+ void finish (int r) override {
160+ mdlog->finish_head_waiters ();
161+ }
162+ MDSRank *get_mds () override {return mdlog->mds ;}
163+
164+ MDLog *mdlog;
165+ };
166+
167+ void MDLog::finish_head_waiters ()
168+ {
169+ ceph_assert (ceph_mutex_is_locked_by_me (mds->mds_lock ));
170+
171+ auto && last_committed = journaler->get_last_committed ();
172+ auto & expire_pos = last_committed.expire_pos ;
173+
174+ dout (20 ) << __func__ << " expire_pos=" << std::hex << expire_pos << dendl;
175+
176+ {
177+ auto last = waiting_for_expire.upper_bound (expire_pos);
178+ for (auto it = waiting_for_expire.begin (); it != last; it++) {
179+ finish_contexts (g_ceph_context, it->second );
180+ }
181+ waiting_for_expire.erase (waiting_for_expire.begin (), last);
182+ }
183+ }
184+
149185void MDLog::write_head (MDSContext *c)
150186{
151- Context *fin = NULL ;
152- if (c != NULL ) {
153- fin = new C_IO_Wrapper (mds, c);
187+ ceph_assert (ceph_mutex_is_locked_by_me (mds->mds_lock ));
188+
189+ auto && last_written = journaler->get_last_written ();
190+ auto expire_pos = journaler->get_expire_pos ();
191+ dout (10 ) << __func__ << " last_written=" << last_written << " current expire_pos=" << std::hex << expire_pos << dendl;
192+
193+ if (last_written.expire_pos < expire_pos) {
194+ if (c != NULL ) {
195+ dout (25 ) << __func__ << " queueing waiter " << c << dendl;
196+ waiting_for_expire[expire_pos].push_back (c);
197+ }
198+
199+ auto * fin = new C_MDL_WriteHead (this );
200+ journaler->write_head (fin);
201+ } else {
202+ if (c) {
203+ c->complete (0 );
204+ }
154205 }
155- journaler->write_head (fin);
156206}
157207
158208uint64_t MDLog::get_read_pos () const
@@ -174,6 +224,8 @@ uint64_t MDLog::get_safe_pos() const
174224
175225void MDLog::create (MDSContext *c)
176226{
227+ ceph_assert (ceph_mutex_is_locked_by_me (mds->mds_lock ));
228+
177229 dout (5 ) << " create empty log" << dendl;
178230
179231 C_GatherBuilder gather (g_ceph_context);
@@ -287,6 +339,8 @@ LogSegment* MDLog::_start_new_segment(SegmentBoundary* sb)
287339 logger->set (l_mdl_seg, segments.size ());
288340 sb->set_seq (event_seq);
289341
342+ dout (20 ) << __func__ << " : starting new segment " << *ls << dendl;
343+
290344 // Adjust to next stray dir
291345 if (!mds->is_stopping ()) {
292346 mds->mdcache ->advance_stray ();
@@ -583,17 +637,6 @@ void MDLog::shutdown()
583637 }
584638}
585639
586- class C_OFT_Committed : public MDSInternalContext {
587- MDLog *mdlog;
588- uint64_t seq;
589- public:
590- C_OFT_Committed (MDLog *l, uint64_t s) :
591- MDSInternalContext (l->mds), mdlog(l), seq(s) {}
592- void finish (int ret) override {
593- mdlog->trim_expired_segments ();
594- }
595- };
596-
597640void MDLog::try_to_commit_open_file_table (uint64_t last_seq)
598641{
599642 ceph_assert (ceph_mutex_is_locked_by_me (submit_mutex));
@@ -608,8 +651,7 @@ void MDLog::try_to_commit_open_file_table(uint64_t last_seq)
608651 if (mds->mdcache ->open_file_table .is_any_dirty () ||
609652 last_seq > mds->mdcache ->open_file_table .get_committed_log_seq ()) {
610653 submit_mutex.unlock ();
611- mds->mdcache ->open_file_table .commit (new C_OFT_Committed (this , last_seq),
612- last_seq, CEPH_MSG_PRIO_HIGH);
654+ mds->mdcache ->open_file_table .commit (nullptr , last_seq, CEPH_MSG_PRIO_HIGH);
613655 submit_mutex.lock ();
614656 }
615657}
@@ -642,7 +684,7 @@ void MDLog::trim()
642684 max_ev = events_per_segment + 1 ;
643685 }
644686
645- submit_mutex. lock () ;
687+ std::unique_lock locker{ submit_mutex} ;
646688
647689 // trim!
648690 dout (10 ) << " trim "
@@ -653,7 +695,6 @@ void MDLog::trim()
653695 << dendl;
654696
655697 if (segments.empty ()) {
656- submit_mutex.unlock ();
657698 return ;
658699 }
659700
@@ -723,22 +764,23 @@ void MDLog::trim()
723764 new_expiring_segments++;
724765 expiring_segments.insert (ls);
725766 expiring_events += ls->num_events ;
726- submit_mutex .unlock ();
767+ locker .unlock ();
727768
728769 uint64_t last_seq = ls->seq ;
729770 try_expire (ls, op_prio);
730771 log_trim_counter.hit ();
731772 trim_end = ceph::coarse_mono_clock::now ();
732773
733- submit_mutex .lock ();
774+ locker .lock ();
734775 p = segments.lower_bound (last_seq + 1 );
735776 }
736777 }
737778
779+ ceph_assert (locker.owns_lock ());
780+
738781 try_to_commit_open_file_table (get_last_segment_seq ());
739782
740- // discard expired segments and unlock submit_mutex
741- _trim_expired_segments ();
783+ _trim_expired_segments (locker);
742784}
743785
744786class C_MaybeExpiredSegment : public MDSInternalContext {
@@ -760,17 +802,18 @@ class C_MaybeExpiredSegment : public MDSInternalContext {
760802 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
761803 * segment.
762804 */
763- int MDLog::trim_all ( )
805+ int MDLog::trim_to (SegmentBoundary:: seq_t seq )
764806{
765- submit_mutex. lock ( );
807+ std::unique_lock locker (submit_mutex );
766808
767809 dout (10 ) << __func__ << " : "
768- << segments.size ()
810+ << seq
811+ << " " << segments.size ()
769812 << " /" << expiring_segments.size ()
770813 << " /" << expired_segments.size () << dendl;
771814
772- uint64_t last_seq = 0 ;
773- if (!segments.empty ()) {
815+ uint64_t last_seq = seq ;
816+ if (last_seq == 0 || !segments.empty ()) {
774817 last_seq = get_last_segment_seq ();
775818 try_to_commit_open_file_table (last_seq);
776819 }
@@ -785,7 +828,7 @@ int MDLog::trim_all()
785828 // Caller should have flushed journaler before calling this
786829 if (pending_events.count (ls->seq )) {
787830 dout (5 ) << __func__ << " : " << *ls << " has pending events" << dendl;
788- submit_mutex .unlock ();
831+ locker .unlock ();
789832 return -CEPHFS_EAGAIN;
790833 }
791834
@@ -797,17 +840,17 @@ int MDLog::trim_all()
797840 ceph_assert (expiring_segments.count (ls) == 0 );
798841 expiring_segments.insert (ls);
799842 expiring_events += ls->num_events ;
800- submit_mutex .unlock ();
843+ locker .unlock ();
801844
802845 uint64_t next_seq = ls->seq + 1 ;
803846 try_expire (ls, CEPH_MSG_PRIO_DEFAULT);
804847
805- submit_mutex .lock ();
848+ locker .lock ();
806849 p = segments.lower_bound (next_seq);
807850 }
808851 }
809852
810- _trim_expired_segments ();
853+ _trim_expired_segments (locker );
811854
812855 return 0 ;
813856}
@@ -848,14 +891,12 @@ void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
848891 try_expire (ls, op_prio);
849892}
850893
851- void MDLog::_trim_expired_segments ()
894+ void MDLog::_trim_expired_segments (auto & locker, MDSContext* ctx )
852895{
853896 ceph_assert (ceph_mutex_is_locked_by_me (submit_mutex));
854-
855- uint64_t const oft_committed_seq = mds->mdcache ->open_file_table .get_committed_log_seq ();
897+ ceph_assert (locker.owns_lock ());
856898
857899 // trim expired segments?
858- bool trimmed = false ;
859900 uint64_t end = 0 ;
860901 for (auto it = segments.begin (); it != segments.end (); ++it) {
861902 auto & [seq, ls] = *it;
@@ -891,34 +932,20 @@ void MDLog::_trim_expired_segments()
891932 } else {
892933 logger->set (l_mdl_expos, jexpire_pos);
893934 }
894- trimmed = true ;
895935 }
896936
897937 if (!expired_segments.count (ls)) {
898938 dout (10 ) << __func__ << " waiting for expiry " << *ls << dendl;
899939 break ;
900940 }
901941
902- if (!mds_is_shutting_down && ls->seq >= oft_committed_seq) {
903- dout (10 ) << __func__ << " defer expire for open file table committedseq " << oft_committed_seq
904- << " <= " << ls->seq << " /" << ls->offset << dendl;
905- break ;
906- }
907-
908942 end = seq;
909943 dout (10 ) << __func__ << " : maybe expiring " << *ls << dendl;
910944 }
911945
912- submit_mutex .unlock ();
946+ locker .unlock ();
913947
914- if (trimmed)
915- journaler->write_head (0 );
916- }
917-
918- void MDLog::trim_expired_segments ()
919- {
920- submit_mutex.lock ();
921- _trim_expired_segments ();
948+ write_head (ctx);
922949}
923950
924951void MDLog::_expired (LogSegment *ls)
0 commit comments