Skip to content

Commit 55652f0

Browse files
committed
osdc: properly acquire locks for getters
This was left as a TODO. : / Signed-off-by: Patrick Donnelly <[email protected]>
1 parent ce5d84d commit 55652f0

File tree

2 files changed

+81
-38
lines changed

2 files changed

+81
-38
lines changed

src/osdc/Journaler.cc

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class Journaler::C_ReProbe : public Context {
158158
void Journaler::recover(Context *onread)
159159
{
160160
lock_guard l(lock);
161-
if (is_stopping()) {
161+
if (state == STATE_STOPPING) {
162162
onread->complete(-EAGAIN);
163163
return;
164164
}
@@ -218,7 +218,7 @@ void Journaler::_reread_head(Context *onfinish)
218218
void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
219219
{
220220
lock_guard l(lock);
221-
if (is_stopping()) {
221+
if (state == STATE_STOPPING) {
222222
finish->complete(-EAGAIN);
223223
return;
224224
}
@@ -250,7 +250,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
250250
void Journaler::_finish_read_head(int r, bufferlist& bl)
251251
{
252252
lock_guard l(lock);
253-
if (is_stopping())
253+
if (state == STATE_STOPPING)
254254
return;
255255

256256
ceph_assert(state == STATE_READHEAD);
@@ -342,7 +342,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
342342
C_OnFinisher *onfinish)
343343
{
344344
lock_guard l(lock);
345-
if (is_stopping()) {
345+
if (state == STATE_STOPPING) {
346346
onfinish->complete(-EAGAIN);
347347
return;
348348
}
@@ -359,7 +359,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
359359
void Journaler::_finish_probe_end(int r, uint64_t end)
360360
{
361361
lock_guard l(lock);
362-
if (is_stopping())
362+
if (state == STATE_STOPPING)
363363
return;
364364

365365
ceph_assert(state == STATE_PROBING);
@@ -413,7 +413,7 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
413413
{
414414
// Expect to be called back from finish_reread_head, which already takes lock
415415
// lock is locked
416-
if (is_stopping()) {
416+
if (state == STATE_STOPPING) {
417417
onfinish->complete(-EAGAIN);
418418
return;
419419
}
@@ -605,7 +605,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
605605
write_pos += wrote;
606606

607607
// flush previous object?
608-
uint64_t su = get_layout_period();
608+
uint64_t su = layout.get_period();
609609
ceph_assert(su > 0);
610610
uint64_t write_off = write_pos % su;
611611
uint64_t write_obj = write_pos / su;
@@ -630,7 +630,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
630630

631631
void Journaler::_do_flush(unsigned amount)
632632
{
633-
if (is_stopping())
633+
if (state == STATE_STOPPING)
634634
return;
635635
if (write_pos == flush_pos)
636636
return;
@@ -645,7 +645,7 @@ void Journaler::_do_flush(unsigned amount)
645645

646646
// zero at least two full periods ahead. this ensures
647647
// that the next object will not exist.
648-
uint64_t period = get_layout_period();
648+
uint64_t period = layout.get_period();
649649
if (flush_pos + len + 2*period > prezero_pos) {
650650
_issue_prezero();
651651

@@ -718,7 +718,7 @@ void Journaler::_do_flush(unsigned amount)
718718
void Journaler::wait_for_flush(Context *onsafe)
719719
{
720720
lock_guard l(lock);
721-
if (is_stopping()) {
721+
if (state == STATE_STOPPING) {
722722
if (onsafe)
723723
onsafe->complete(-EAGAIN);
724724
return;
@@ -752,7 +752,7 @@ void Journaler::_wait_for_flush(Context *onsafe)
752752
void Journaler::flush(Context *onsafe)
753753
{
754754
lock_guard l(lock);
755-
if (is_stopping()) {
755+
if (state == STATE_STOPPING) {
756756
if (onsafe)
757757
onsafe->complete(-EAGAIN);
758758
return;
@@ -812,7 +812,7 @@ void Journaler::_issue_prezero()
812812
* issue zero requests based on write_pos, even though the invariant
813813
* is that we zero ahead of flush_pos.
814814
*/
815-
uint64_t period = get_layout_period();
815+
uint64_t period = layout.get_period();
816816
uint64_t to = write_pos + period * num_periods + period - 1;
817817
to -= to % period;
818818

@@ -1062,7 +1062,7 @@ void Journaler::_issue_read(uint64_t len)
10621062
// here because it will wait for all object reads to complete before
10631063
// giving us back any data. this way we can process whatever bits
10641064
// come in that are contiguous.
1065-
uint64_t period = get_layout_period();
1065+
uint64_t period = layout.get_period();
10661066
while (len > 0) {
10671067
uint64_t e = requested_pos + period;
10681068
e -= e % period;
@@ -1079,7 +1079,7 @@ void Journaler::_issue_read(uint64_t len)
10791079

10801080
void Journaler::_prefetch()
10811081
{
1082-
if (is_stopping())
1082+
if (state == STATE_STOPPING)
10831083
return;
10841084

10851085
ldout(cct, 10) << "_prefetch" << dendl;
@@ -1096,7 +1096,7 @@ void Journaler::_prefetch()
10961096
uint64_t raw_target = read_pos + pf;
10971097

10981098
// read full log segments, so increase if necessary
1099-
uint64_t period = get_layout_period();
1099+
uint64_t period = layout.get_period();
11001100
uint64_t remainder = raw_target % period;
11011101
uint64_t adjustment = remainder ? period - remainder : 0;
11021102
uint64_t target = raw_target + adjustment;
@@ -1215,8 +1215,8 @@ void Journaler::erase(Context *completion)
12151215
lock_guard l(lock);
12161216

12171217
// Async delete the journal data
1218-
uint64_t first = trimmed_pos / get_layout_period();
1219-
uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
1218+
uint64_t first = trimmed_pos / layout.get_period();
1219+
uint64_t num = (write_pos - trimmed_pos) / layout.get_period() + 2;
12201220
filer.purge_range(ino, &layout, SnapContext(), first, num,
12211221
ceph::real_clock::now(), 0,
12221222
wrap_finisher(new C_EraseFinish(
@@ -1231,7 +1231,7 @@ void Journaler::erase(Context *completion)
12311231
void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
12321232
{
12331233
lock_guard l(lock);
1234-
if (is_stopping()) {
1234+
if (state == STATE_STOPPING) {
12351235
completion->complete(-EAGAIN);
12361236
return;
12371237
}
@@ -1309,7 +1309,7 @@ void Journaler::wait_for_readable(Context *onreadable)
13091309

13101310
void Journaler::_wait_for_readable(Context *onreadable)
13111311
{
1312-
if (is_stopping()) {
1312+
if (state == STATE_STOPPING) {
13131313
finisher->queue(onreadable, -EAGAIN);
13141314
return;
13151315
}
@@ -1354,11 +1354,11 @@ void Journaler::trim()
13541354

13551355
void Journaler::_trim()
13561356
{
1357-
if (is_stopping())
1357+
if (state == STATE_STOPPING)
13581358
return;
13591359

13601360
ceph_assert(!readonly);
1361-
uint64_t period = get_layout_period();
1361+
uint64_t period = layout.get_period();
13621362
uint64_t trim_to = last_committed.expire_pos;
13631363
trim_to -= trim_to % period;
13641364
ldout(cct, 10) << "trim last_commited head was " << last_committed
@@ -1633,8 +1633,8 @@ void Journaler::check_isreadable()
16331633
{
16341634
std::unique_lock l(lock);
16351635
while (!_is_readable() &&
1636-
get_read_pos() < get_write_pos() &&
1637-
!get_error()) {
1636+
read_pos < write_pos &&
1637+
!error) {
16381638
C_SaferCond readable_waiter;
16391639
_wait_for_readable(&readable_waiter);
16401640
l.unlock();

src/osdc/Journaler.h

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,10 @@ class Journaler {
222222
private:
223223
// me
224224
CephContext *cct;
225-
std::mutex lock;
225+
mutable ceph::mutex lock;
226226
const std::string name;
227-
typedef std::lock_guard<std::mutex> lock_guard;
228-
typedef std::unique_lock<std::mutex> unique_lock;
227+
typedef std::lock_guard<ceph::mutex> lock_guard;
228+
typedef std::unique_lock<ceph::mutex> unique_lock;
229229
Finisher *finisher;
230230
Header last_written;
231231
inodeno_t ino;
@@ -408,7 +408,7 @@ class Journaler {
408408
Journaler(const std::string &name_, inodeno_t ino_, int64_t pool,
409409
const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) :
410410
last_committed(mag),
411-
cct(obj->cct), name(name_), finisher(f), last_written(mag),
411+
cct(obj->cct), lock(ceph::make_mutex("Journaler::" + name_)), name(name_), finisher(f), last_written(mag),
412412
ino(ino_), pg_pool(pool), readonly(true),
413413
stream_format(-1), journal_stream(-1),
414414
magic(mag),
@@ -528,24 +528,67 @@ class Journaler {
528528

529529
// Synchronous getters
530530
// ===================
531-
// TODO: need some locks on reads for true safety
532531
uint64_t get_layout_period() const {
532+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
533+
lock_guard l(lock);
533534
return layout.get_period();
534535
}
535-
file_layout_t& get_layout() { return layout; }
536-
bool is_active() { return state == STATE_ACTIVE; }
537-
bool is_stopping() { return state == STATE_STOPPING; }
538-
int get_error() { return error; }
539-
bool is_readonly() { return readonly; }
536+
file_layout_t get_layout() const {
537+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
538+
lock_guard l(lock);
539+
return layout;
540+
}
541+
bool is_active() const {
542+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
543+
lock_guard l(lock);
544+
return state == STATE_ACTIVE;
545+
}
546+
bool is_stopping() const {
547+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
548+
lock_guard l(lock);
549+
return state == STATE_STOPPING;
550+
}
551+
int get_error() const {
552+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
553+
lock_guard l(lock);
554+
return error;
555+
}
556+
bool is_readonly() const {
557+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
558+
lock_guard l(lock);
559+
return readonly;
560+
}
540561
bool is_readable();
541562
bool _is_readable();
542563
bool try_read_entry(bufferlist& bl);
543-
uint64_t get_write_pos() const { return write_pos; }
544-
uint64_t get_write_safe_pos() const { return safe_pos; }
545-
uint64_t get_read_pos() const { return read_pos; }
546-
uint64_t get_expire_pos() const { return expire_pos; }
547-
uint64_t get_trimmed_pos() const { return trimmed_pos; }
564+
uint64_t get_write_pos() const {
565+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
566+
lock_guard l(lock);
567+
return write_pos;
568+
}
569+
uint64_t get_write_safe_pos() const {
570+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
571+
lock_guard l(lock);
572+
return safe_pos;
573+
}
574+
uint64_t get_read_pos() const {
575+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
576+
lock_guard l(lock);
577+
return read_pos;
578+
}
579+
uint64_t get_expire_pos() const {
580+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
581+
lock_guard l(lock);
582+
return expire_pos;
583+
}
584+
uint64_t get_trimmed_pos() const {
585+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
586+
lock_guard l(lock);
587+
return trimmed_pos;
588+
}
548589
size_t get_journal_envelope_size() const {
590+
ceph_assert(!ceph_mutex_is_locked_by_me(lock));
591+
lock_guard l(lock);
549592
return journal_stream.get_envelope_size();
550593
}
551594
void check_isreadable();

0 commit comments

Comments
 (0)