Skip to content

Commit 048ce81

Browse files
authored
Merge pull request ceph#56677 from athanatos/sjust/for-review/wip-replica-read
osd,crimson/osd: rework of replica read and related state Reviewed-by: Matan Breizman <[email protected]>
2 parents 0a586d2 + dda683b commit 048ce81

38 files changed

+887
-192
lines changed

qa/suites/rados/thrash-erasure-code/workloads/ec-small-objects-balanced.yaml

Lines changed: 0 additions & 21 deletions
This file was deleted.

qa/tasks/rados.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ def task(ctx, config):
3636
write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED.
3737
This mean data don't access in the near future.
3838
Let osd backend don't keep data in cache.
39+
pct_update_delay: delay before primary propogates pct on write pause,
40+
defaults to 5s if balance_reads is set
3941
4042
For example::
4143
@@ -139,6 +141,7 @@ def task(ctx, config):
139141
object_size = int(config.get('object_size', 4000000))
140142
op_weights = config.get('op_weights', {})
141143
testdir = teuthology.get_testdir(ctx)
144+
pct_update_delay = None
142145
args = [
143146
'adjust-ulimits',
144147
'ceph-coverage',
@@ -166,6 +169,7 @@ def task(ctx, config):
166169
args.extend(['--pool-snaps'])
167170
if config.get('balance_reads', False):
168171
args.extend(['--balance-reads'])
172+
pct_update_delay = config.get('pct_update_delay', 5);
169173
if config.get('localize_reads', False):
170174
args.extend(['--localize-reads'])
171175
if config.get('max_attr_len', None):
@@ -274,6 +278,10 @@ def thread():
274278
if config.get('fast_read', False):
275279
manager.raw_cluster_cmd(
276280
'osd', 'pool', 'set', pool, 'fast_read', 'true')
281+
if pct_update_delay:
282+
manager.raw_cluster_cmd(
283+
'osd', 'pool', 'set', pool,
284+
'pct_update_delay', str(pct_update_delay));
277285
min_size = config.get('min_size', None);
278286
if min_size is not None:
279287
manager.raw_cluster_cmd(

src/common/intrusive_timer.h

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
4+
#pragma once
5+
6+
#include <mutex>
7+
#include <condition_variable>
8+
9+
#include <boost/intrusive/set.hpp>
10+
11+
#include "common/ceph_time.h"
12+
13+
namespace ceph::common {
14+
15+
/**
16+
* intrusive_timer
17+
*
18+
* SafeTimer (common/Timer.h) isn't well suited to usage in high
19+
* usage pathways for a few reasons:
20+
* - Usage generally requires allocation of a fresh context for each
21+
* scheduled operation. One could override Context::complete to avoid
22+
* destroying the instance, but actually reusing the instance is tricky
23+
* as SafeTimer doesn't guarrantee cancelation if safe_callbacks is false.
24+
* - SafeTimer only guarrantees cancelation if safe_timer is true, which
25+
* it generally won't be if the user needs to call into SafeTimer while
26+
* holding locks taken by callbacks.
27+
*
28+
* This implementation allows the user to repeatedly schedule and cancel
29+
* an object inheriting from the callback_t interface below while
30+
* guarranteeing cancelation provided that the user holds the lock
31+
* associated with a particular callback while calling into intrusive_timer.
32+
*/
33+
class intrusive_timer {
34+
using clock_t = ceph::coarse_real_clock;
35+
36+
public:
37+
/**
38+
* callback_t
39+
*
40+
* Objects inheriting from callback_t can be scheduled
41+
* via intrusive_timer.
42+
*/
43+
class callback_t : public boost::intrusive::set_base_hook<> {
44+
friend class intrusive_timer;
45+
clock_t::time_point schedule_point;
46+
unsigned incarnation = 0;
47+
48+
public:
49+
/**
50+
* add_ref, dec_ref
51+
*
52+
* callback_t must remain live and all methods must remain
53+
* safe to call as long as calls to add_ref() outnumber calls
54+
* to dec_ref().
55+
*/
56+
virtual void add_ref() = 0;
57+
virtual void dec_ref() = 0;
58+
59+
/**
60+
* lock, unlock
61+
*
62+
* For any specific callback_t, must lock/unlock a lock held while
63+
* accessing intrusive_timer public methods for that callback_t
64+
* instance.
65+
*/
66+
virtual void lock() = 0;
67+
virtual void unlock() = 0;
68+
69+
/// Invokes callback, will be called with lock held
70+
virtual void invoke() = 0;
71+
72+
/**
73+
* is_scheduled
74+
*
75+
* Return true iff callback is scheduled to be invoked.
76+
* May only be validly invoked while lock associated with
77+
* callback_t instance is held.
78+
*/
79+
bool is_scheduled() const { return incarnation % 2 == 1; }
80+
virtual ~callback_t() = default;
81+
82+
/// Order callback_t by schedule_point
83+
auto operator<=>(const callback_t &rhs) const {
84+
return std::make_pair(schedule_point, this) <=>
85+
std::make_pair(rhs.schedule_point, &rhs);
86+
}
87+
};
88+
89+
private:
90+
/// protects events, stopping
91+
std::mutex lock;
92+
93+
/// stopping, cv used to signal that t should halt
94+
std::condition_variable cv;
95+
bool stopping = false;
96+
97+
/// queued events ordered by callback_t::schedule_point
98+
boost::intrusive::set<callback_t> events;
99+
100+
/// thread responsible for calling scheduled callbacks
101+
std::thread t;
102+
103+
/// peek front of queue, null if empty
104+
callback_t *peek() {
105+
return events.empty() ? nullptr : &*(events.begin());
106+
}
107+
108+
/// entry point for t
109+
void _run() {
110+
std::unique_lock l(lock);
111+
while (true) {
112+
if (stopping) {
113+
return;
114+
}
115+
116+
auto next = peek();
117+
if (!next) {
118+
cv.wait(l);
119+
continue;
120+
}
121+
122+
if (next->schedule_point > clock_t::now()) {
123+
cv.wait_until(l, next->schedule_point);
124+
continue;
125+
}
126+
127+
// we release the reference below
128+
events.erase(*next);
129+
130+
/* cancel() and schedule_after() both hold both intrusive_timer::lock
131+
* and the callback_t lock (precondition of both) while mutating
132+
* next->incarnation, so this read is safe. We're relying on the
133+
* fact that only this method in this thread will access
134+
* next->incarnation under only one of the two. */
135+
auto incarnation = next->incarnation;
136+
l.unlock();
137+
{
138+
/* Note that intrusive_timer::cancel may observe that
139+
* callback_t::is_scheduled() returns true while
140+
* callback_t::is_linked() is false since we drop
141+
* intrusive_timer::lock between removing next from the
142+
* queue and incrementing callback_t::incarnation here
143+
* under the callback_t lock. In that case, cancel()
144+
* increments incarnation logically canceling the callback
145+
* but leaves the reference for us to drop.
146+
*/
147+
std::unique_lock m(*next);
148+
if (next->incarnation == incarnation) {
149+
/* As above, cancel() and schedule_after() hold both locks so this
150+
* mutation and read are safe. */
151+
++next->incarnation;
152+
next->invoke();
153+
}
154+
/* else, next was canceled between l.unlock() and next->lock().
155+
* Note that if incarnation does not match, we do nothing to next
156+
* other than drop our reference -- it might well have been
157+
* rescheduled already! */
158+
}
159+
next->dec_ref();
160+
l.lock();
161+
}
162+
}
163+
164+
public:
165+
intrusive_timer() : t([this] { _run(); }) {}
166+
167+
/**
168+
* schedule_after
169+
*
170+
* Schedule cb to run after the specified period.
171+
* The lock associated with cb must be held.
172+
* cb must not already be scheduled.
173+
*
174+
* @param cb [in] callback to schedule
175+
* @param after [in] period after which to schedule cb
176+
*/
177+
template <typename T>
178+
void schedule_after(callback_t &cb, T after) {
179+
ceph_assert(!cb.is_scheduled());
180+
std::unique_lock l(lock);
181+
ceph_assert(!cb.is_linked());
182+
183+
++cb.incarnation;
184+
cb.schedule_point = clock_t::now() + after;
185+
186+
cb.add_ref();
187+
events.insert(cb);
188+
189+
cv.notify_one();
190+
}
191+
192+
/**
193+
* cancel
194+
*
195+
* Cancel already scheduled cb.
196+
* The lock associated with cb must be held.
197+
*
198+
* @param cb [in] callback to cancel
199+
*/
200+
void cancel(callback_t &cb) {
201+
ceph_assert(cb.is_scheduled());
202+
std::unique_lock l(lock);
203+
++cb.incarnation;
204+
205+
if (cb.is_linked()) {
206+
events.erase(cb);
207+
cb.dec_ref();
208+
}
209+
}
210+
211+
/// Stop intrusive_timer
212+
void stop() {
213+
{
214+
std::unique_lock l(lock);
215+
stopping = true;
216+
cv.notify_one();
217+
}
218+
t.join();
219+
}
220+
};
221+
222+
}

src/crimson/osd/ops_executer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -828,7 +828,7 @@ void OpsExecuter::fill_op_params(OpsExecuter::modified_by m)
828828
osd_op_params->mtime = msg->get_mtime();
829829
osd_op_params->at_version = pg->get_next_version();
830830
osd_op_params->pg_trim_to = pg->get_pg_trim_to();
831-
osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
831+
osd_op_params->pg_committed_to = pg->get_pg_committed_to();
832832
osd_op_params->last_complete = pg->get_info().last_complete;
833833
osd_op_params->user_modify = (m == modified_by::user);
834834
}

src/crimson/osd/osd_operations/client_request.cc

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "crimson/osd/osd_operations/client_request.h"
1515
#include "crimson/osd/osd_connection_priv.h"
1616
#include "osd/object_state_fmt.h"
17+
#include "osd/osd_perf_counters.h"
1718

1819
SET_SUBSYS(osd);
1920

@@ -190,15 +191,25 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib
190191
DEBUGDPP("{}.{}: dropping misdirected op",
191192
pg, *this, this_instance_id);
192193
co_return;
193-
} else if (const hobject_t& hoid = m->get_hobj();
194-
!pg.get_peering_state().can_serve_replica_read(hoid)) {
194+
}
195+
196+
pg.get_perf_logger().inc(l_osd_replica_read);
197+
if (pg.is_unreadable_object(m->get_hobj())) {
198+
DEBUGDPP("{}.{}: {} missing on replica, bouncing to primary",
199+
pg, *this, this_instance_id, m->get_hobj());
200+
pg.get_perf_logger().inc(l_osd_replica_read_redirect_missing);
201+
co_await reply_op_error(pgref, -EAGAIN);
202+
co_return;
203+
} else if (!pg.get_peering_state().can_serve_replica_read(m->get_hobj())) {
195204
DEBUGDPP("{}.{}: unstable write on replica, bouncing to primary",
196205
pg, *this, this_instance_id);
206+
pg.get_perf_logger().inc(l_osd_replica_read_redirect_conflict);
197207
co_await reply_op_error(pgref, -EAGAIN);
198208
co_return;
199209
} else {
200210
DEBUGDPP("{}.{}: serving replica read on oid {}",
201211
pg, *this, this_instance_id, m->get_hobj());
212+
pg.get_perf_logger().inc(l_osd_replica_read_served);
202213
}
203214
}
204215

src/crimson/osd/osd_operations/osdop_params.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ struct osd_op_params_t {
1212
utime_t mtime;
1313
eversion_t at_version;
1414
eversion_t pg_trim_to;
15-
eversion_t min_last_complete_ondisk;
15+
eversion_t pg_committed_to;
1616
eversion_t last_complete;
1717
bool user_modify = false;
1818
ObjectCleanRegions clean_regions;

src/crimson/osd/osd_operations/peering_event.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
166166
ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
167167
q.query.epoch_sent,
168168
map_epoch, empty,
169-
PastIntervals{}});
169+
PastIntervals{},
170+
PG_FEATURE_CRIMSON_ALL});
170171
}
171172
}
172173
}

0 commit comments

Comments
 (0)