@@ -166,17 +166,37 @@ class PG : public boost::intrusive_ref_counter<
166166 return std::size (snap_trimq);
167167 }
168168
169+ /* *
170+ * complete_rctx
171+ *
172+ * complete_rctx is responsible for submitting writes and messages
173+ * resulting from processing a PeeringState event as well as resolving
174+ * any asyncronous actions initiated by the PeeringState::Listener
175+ * callbacks below. The caller is responsible for calling complete_rctx
176+ * and waiting for the future to resolve before exiting the
177+ * PGPeeringPipeline::process stage (see osd_operations/peering_event.h).
178+ *
179+ * orderer below ensures that operations submitted on the OSD-wide
180+ * OSDSingleton instance are completed in the order initiated. This is
181+ * specifically important for operations on the local and remote async
182+ * reserver instances, as well as setting and clearing pg_temp mapping
183+ * requests.
184+ */
185+ ShardServices::singleton_orderer_t orderer;
169186 seastar::future<> complete_rctx (PeeringCtx &&rctx) {
170- return seastar::when_all_succeed (
171- get_need_up_thru ()
172- ? shard_services.send_alive (
173- get_same_interval_since ())
174- : seastar::now (),
187+ shard_services.send_pg_temp (orderer);
188+ if (get_need_up_thru ()) {
189+ shard_services.send_alive (orderer, get_same_interval_since ());
190+ }
191+
192+ ShardServices::singleton_orderer_t o;
193+ std::swap (o, orderer);
194+ return seastar::when_all (
175195 shard_services.dispatch_context (
176196 get_collection_ref (),
177197 std::move (rctx)),
178- shard_services.send_pg_temp ( )
179- ).then ([](auto ){});
198+ shard_services.run_orderer ( std::move (o) )
199+ ).then ([](auto ) {});
180200 }
181201
182202 void send_cluster_message (
@@ -186,13 +206,21 @@ class PG : public boost::intrusive_ref_counter<
186206 SUBDEBUGDPP (
187207 osd, " message {} to {} share_map_update {}" ,
188208 *this , *m, osd, share_map_update);
189- (void )shard_services.send_to_osd (osd, std::move (m), epoch);
209+ /* We don't bother to queue this one in the orderer because capturing the
210+ * message ref in std::function is problematic as it isn't copyable. This
211+ * is solvable, but it's not quite worth the effort at the moment as we
212+ * aren't worried about ordering of message send events except between
213+ * messages to the same target within an interval, which doesn't really
214+ * happen while processing a single event. It'll probably be worth
215+ * generalizing the orderer structure to fix this in the future, probably
216+ * by using std::move_only_function once widely available. */
217+ std::ignore = shard_services.send_to_osd (osd, std::move (m), epoch);
190218 }
191219
192220 void send_pg_created (pg_t pgid) final {
193221 LOG_PREFIX (PG::send_pg_created);
194222 SUBDEBUGDPP (osd, " pgid {}" , *this , pgid);
195- ( void ) shard_services.send_pg_created (pgid);
223+ shard_services.send_pg_created (orderer, pgid);
196224 }
197225
198226 bool try_flush_or_schedule_async () final ;
@@ -237,37 +265,35 @@ class PG : public boost::intrusive_ref_counter<
237265 SUBDEBUGDPP (
238266 osd, " priority {} on_grant {} on_preempt {}" ,
239267 *this , on_grant->get_desc (), on_preempt->get_desc ());
240- // TODO -- we probably want to add a mechanism for blocking on this
241- // after handling the peering event
242- std::ignore = shard_services.local_request_reservation (
268+ shard_services.local_request_reservation (
269+ orderer,
243270 pgid,
244271 on_grant ? make_lambda_context ([this , on_grant=std::move (on_grant)] (int ) {
245272 start_peering_event_operation (std::move (*on_grant));
246273 }) : nullptr ,
247274 priority,
248275 on_preempt ? make_lambda_context (
249276 [this , on_preempt=std::move (on_preempt)] (int ) {
250- start_peering_event_operation (std::move (*on_preempt));
251- }) : nullptr );
277+ start_peering_event_operation (std::move (*on_preempt));
278+ }) : nullptr
279+ );
252280 }
253281
254282 void update_local_background_io_priority (
255283 unsigned priority) final {
256284 LOG_PREFIX (PG::update_local_background_io_priority);
257285 SUBDEBUGDPP (osd, " priority {}" , *this , priority);
258- // TODO -- we probably want to add a mechanism for blocking on this
259- // after handling the peering event
260- std::ignore = shard_services.local_update_priority (
286+ shard_services.local_update_priority (
287+ orderer,
261288 pgid,
262289 priority);
263290 }
264291
265292 void cancel_local_background_io_reservation () final {
266293 LOG_PREFIX (PG::cancel_local_background_io_reservation);
267294 SUBDEBUGDPP (osd, " " , *this );
268- // TODO -- we probably want to add a mechanism for blocking on this
269- // after handling the peering event
270- std::ignore = shard_services.local_cancel_reservation (
295+ shard_services.local_cancel_reservation (
296+ orderer,
271297 pgid);
272298 }
273299
@@ -279,27 +305,24 @@ class PG : public boost::intrusive_ref_counter<
279305 SUBDEBUGDPP (
280306 osd, " priority {} on_grant {} on_preempt {}" ,
281307 *this , on_grant->get_desc (), on_preempt->get_desc ());
282- // TODO -- we probably want to add a mechanism for blocking on this
283- // after handling the peering event
284- std::ignore = shard_services.remote_request_reservation (
308+ shard_services.remote_request_reservation (
309+ orderer,
285310 pgid,
286311 on_grant ? make_lambda_context ([this , on_grant=std::move (on_grant)] (int ) {
287312 start_peering_event_operation (std::move (*on_grant));
288313 }) : nullptr ,
289314 priority,
290315 on_preempt ? make_lambda_context (
291316 [this , on_preempt=std::move (on_preempt)] (int ) {
292- start_peering_event_operation (std::move (*on_preempt));
293- }) : nullptr );
317+ start_peering_event_operation (std::move (*on_preempt));
318+ }) : nullptr
319+ );
294320 }
295321
296322 void cancel_remote_recovery_reservation () final {
297323 LOG_PREFIX (PG::cancel_remote_recovery_reservation);
298324 SUBDEBUGDPP (osd, " " , *this );
299- // TODO -- we probably want to add a mechanism for blocking on this
300- // after handling the peering event
301- std::ignore = shard_services.remote_cancel_reservation (
302- pgid);
325+ shard_services.remote_cancel_reservation (orderer, pgid);
303326 }
304327
305328 void schedule_event_on_commit (
@@ -326,16 +349,12 @@ class PG : public boost::intrusive_ref_counter<
326349 void queue_want_pg_temp (const std::vector<int > &wanted) final {
327350 LOG_PREFIX (PG::queue_want_pg_temp);
328351 SUBDEBUGDPP (osd, " wanted {}" , *this , wanted);
329- // TODO -- we probably want to add a mechanism for blocking on this
330- // after handling the peering event
331- std::ignore = shard_services.queue_want_pg_temp (pgid.pgid , wanted);
352+ shard_services.queue_want_pg_temp (orderer, pgid.pgid , wanted);
332353 }
333354 void clear_want_pg_temp () final {
334355 LOG_PREFIX (PG::clear_want_pg_temp);
335356 SUBDEBUGDPP (osd, " " , *this );
336- // TODO -- we probably want to add a mechanism for blocking on this
337- // after handling the peering event
338- std::ignore = shard_services.remove_want_pg_temp (pgid.pgid );
357+ shard_services.remove_want_pg_temp (orderer, pgid.pgid );
339358 }
340359 void check_recovery_sources (const OSDMapRef& newmap) final {
341360 // Not needed yet
0 commit comments