@@ -146,13 +146,63 @@ class C_MDL_WriteError : public MDSIOContextBase {
146146};
147147
148148
149+ class C_MDL_WriteHead : public MDSIOContextBase {
150+ public:
151+ explicit C_MDL_WriteHead (MDLog* m)
152+ : MDSIOContextBase(true )
153+ , mdlog(m)
154+ {}
155+ void print (ostream& out) const override {
156+ out << " mdlog_write_head" ;
157+ }
158+ protected:
159+ void finish (int r) override {
160+ mdlog->finish_head_waiters ();
161+ }
162+ MDSRank *get_mds () override {return mdlog->mds ;}
163+
164+ MDLog *mdlog;
165+ };
166+
167+ void MDLog::finish_head_waiters ()
168+ {
169+ ceph_assert (ceph_mutex_is_locked_by_me (mds->mds_lock ));
170+
171+ auto && last_committed = journaler->get_last_committed ();
172+ auto & expire_pos = last_committed.expire_pos ;
173+
174+ dout (20 ) << __func__ << " expire_pos=" << std::hex << expire_pos << dendl;
175+
176+ {
177+ auto last = waiting_for_expire.upper_bound (expire_pos);
178+ for (auto it = waiting_for_expire.begin (); it != last; it++) {
179+ finish_contexts (g_ceph_context, it->second );
180+ }
181+ waiting_for_expire.erase (waiting_for_expire.begin (), last);
182+ }
183+ }
184+
149185void MDLog::write_head (MDSContext *c)
150186{
151- Context *fin = NULL ;
152- if (c != NULL ) {
153- fin = new C_IO_Wrapper (mds, c);
187+ ceph_assert (ceph_mutex_is_locked_by_me (mds->mds_lock ));
188+
189+ auto && last_written = journaler->get_last_written ();
190+ auto expire_pos = journaler->get_expire_pos ();
191+ dout (10 ) << __func__ << " last_written=" << last_written << " current expire_pos=" << std::hex << expire_pos << dendl;
192+
193+ if (last_written.expire_pos < expire_pos) {
194+ if (c != NULL ) {
195+ dout (25 ) << __func__ << " queueing waiter " << c << dendl;
196+ waiting_for_expire[expire_pos].push_back (c);
197+ }
198+
199+ auto * fin = new C_MDL_WriteHead (this );
200+ journaler->write_head (fin);
201+ } else {
202+ if (c) {
203+ c->complete (0 );
204+ }
154205 }
155- journaler->write_head (fin);
156206}
157207
158208uint64_t MDLog::get_read_pos () const
@@ -174,6 +224,8 @@ uint64_t MDLog::get_safe_pos() const
174224
175225void MDLog::create (MDSContext *c)
176226{
227+ ceph_assert (ceph_mutex_is_locked_by_me (mds->mds_lock ));
228+
177229 dout (5 ) << " create empty log" << dendl;
178230
179231 C_GatherBuilder gather (g_ceph_context);
@@ -845,7 +897,6 @@ void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx)
845897 ceph_assert (locker.owns_lock ());
846898
847899 // trim expired segments?
848- bool trimmed = false ;
849900 uint64_t end = 0 ;
850901 for (auto it = segments.begin (); it != segments.end (); ++it) {
851902 auto & [seq, ls] = *it;
@@ -881,7 +932,6 @@ void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx)
881932 } else {
882933 logger->set (l_mdl_expos, jexpire_pos);
883934 }
884- trimmed = true ;
885935 }
886936
887937 if (!expired_segments.count (ls)) {
@@ -895,13 +945,7 @@ void MDLog::_trim_expired_segments(auto& locker, MDSContext* ctx)
895945
896946 locker.unlock ();
897947
898- if (trimmed) {
899- write_head (ctx);
900- } else {
901- if (ctx) {
902- ctx->complete (0 );
903- }
904- }
948+ write_head (ctx);
905949}
906950
907951void MDLog::_expired (LogSegment *ls)
0 commit comments