Skip to content

Commit d812176

Browse files
authored
Merge pull request ceph#58464 from athanatos/sjust/wip-66316-async-reserver
crimson: peering event processing fixes, wait for async operations started during peering events Reviewed-by: Matan Breizman <[email protected]>
2 parents baaf7c8 + e12e92c commit d812176

File tree

7 files changed

+313
-248
lines changed

7 files changed

+313
-248
lines changed

src/crimson/osd/osd_operations/peering_event.cc

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,25 @@ seastar::future<> PeeringEvent<T>::with_pg(
8989
}).then_interruptible([this, pg](auto) {
9090
return this->template enter_stage<interruptor>(peering_pp(*pg).process);
9191
}).then_interruptible([this, pg, &shard_services] {
92-
return pg->do_peering_event(evt, ctx
93-
).then_interruptible([this] {
92+
/* The DeleteSome event invokes PeeringListener::do_delete_work, which
93+
* needs to return (without a future) the object to start with on the next
94+
* call. As a consequence, crimson's do_delete_work implementation needs
95+
* to use get() for the object listing. To support that, we wrap
96+
* PG::do_peering_event with interruptor::async here.
97+
*
98+
* Otherwise, it's not ok to yield during peering event handler. Doing so
99+
* allows other continuations to observe PeeringState in the middle
100+
* of, for instance, a map advance. The interface *does not* support such
101+
* usage. DeleteSome happens not to trigger that problem so it's ok for
102+
* now, but we'll want to remove that as well.
103+
* https://tracker.ceph.com/issues/66708
104+
*/
105+
return interruptor::async([this, pg, &shard_services] {
106+
pg->do_peering_event(evt, ctx);
107+
complete_rctx(shard_services, pg).get();
108+
}).then_interruptible([this] {
94109
return that()->get_handle().complete();
95-
}).then_interruptible([this, pg, &shard_services] {
96-
return complete_rctx(shard_services, pg);
97110
});
98-
}).then_interruptible([pg, &shard_services]()
99-
-> typename T::template interruptible_future<> {
100-
if (!pg->get_need_up_thru()) {
101-
return seastar::now();
102-
}
103-
return shard_services.send_alive(pg->get_same_interval_since());
104-
}).then_interruptible([&shard_services] {
105-
return shard_services.send_pg_temp();
106111
});
107112
}, [this](std::exception_ptr ep) {
108113
LOG_PREFIX(PeeringEvent<T>::with_pg);
@@ -128,9 +133,7 @@ PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg)
128133
using interruptor = typename T::interruptor;
129134
LOG_PREFIX(PeeringEvent<T>::complete_rctx);
130135
DEBUGI("{}: submitting ctx", *this);
131-
return shard_services.dispatch_context(
132-
pg->get_collection_ref(),
133-
std::move(ctx));
136+
return pg->complete_rctx(std::move(ctx));
134137
}
135138

136139
ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()

src/crimson/osd/osd_operations/pg_advance_map.cc

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -80,48 +80,33 @@ seastar::future<> PGAdvanceMap::start()
8080
* See: https://tracker.ceph.com/issues/61744
8181
*/
8282
from = pg->get_osdmap_epoch();
83-
auto fut = seastar::now();
8483
if (do_init) {
85-
fut = pg->handle_initialize(rctx
86-
).then([this] {
87-
return pg->handle_activate_map(rctx);
88-
});
84+
pg->handle_initialize(rctx);
85+
pg->handle_activate_map(rctx);
8986
}
90-
return fut.then([this] {
91-
ceph_assert(std::cmp_less_equal(*from, to));
92-
return seastar::do_for_each(
93-
boost::make_counting_iterator(*from + 1),
94-
boost::make_counting_iterator(to + 1),
95-
[this](epoch_t next_epoch) {
96-
logger().debug("{}: start: getting map {}",
97-
*this, next_epoch);
98-
return shard_services.get_map(next_epoch).then(
99-
[this] (cached_map_t&& next_map) {
100-
logger().debug("{}: advancing map to {}",
101-
*this, next_map->get_epoch());
102-
return pg->handle_advance_map(next_map, rctx);
103-
});
104-
}).then([this] {
105-
return pg->handle_activate_map(rctx).then([this] {
106-
logger().debug("{}: map activated", *this);
107-
if (do_init) {
108-
shard_services.pg_created(pg->get_pgid(), pg);
109-
logger().info("PGAdvanceMap::start new pg {}", *pg);
110-
}
111-
return seastar::when_all_succeed(
112-
pg->get_need_up_thru()
113-
? shard_services.send_alive(
114-
pg->get_same_interval_since())
115-
: seastar::now(),
116-
shard_services.dispatch_context(
117-
pg->get_collection_ref(),
118-
std::move(rctx)));
87+
ceph_assert(std::cmp_less_equal(*from, to));
88+
return seastar::do_for_each(
89+
boost::make_counting_iterator(*from + 1),
90+
boost::make_counting_iterator(to + 1),
91+
[this](epoch_t next_epoch) {
92+
logger().debug("{}: start: getting map {}",
93+
*this, next_epoch);
94+
return shard_services.get_map(next_epoch).then(
95+
[this] (cached_map_t&& next_map) {
96+
logger().debug("{}: advancing map to {}",
97+
*this, next_map->get_epoch());
98+
pg->handle_advance_map(next_map, rctx);
99+
return seastar::now();
119100
});
120-
}).then_unpack([this] {
121-
logger().debug("{}: sending pg temp", *this);
122-
return shard_services.send_pg_temp();
123-
});
124-
});
101+
}).then([this] {
102+
pg->handle_activate_map(rctx);
103+
logger().debug("{}: map activated", *this);
104+
if (do_init) {
105+
shard_services.pg_created(pg->get_pgid(), pg);
106+
logger().info("PGAdvanceMap::start new pg {}", *pg);
107+
}
108+
return pg->complete_rctx(std::move(rctx));
109+
});
125110
}).then([this] {
126111
logger().debug("{}: complete", *this);
127112
return handle.complete();

src/crimson/osd/pg.cc

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -738,61 +738,49 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
738738
});
739739
}
740740

741-
PG::interruptible_future<> PG::do_peering_event(
741+
void PG::do_peering_event(
742742
PGPeeringEvent& evt, PeeringCtx &rctx)
743743
{
744744
if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
745745
peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
746746
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
747-
return interruptor::now();
748747
} else {
749748
logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
750-
// all peering event handling needs to be run in a dedicated seastar::thread,
751-
// so that event processing can involve I/O reqs freely, for example: PG::on_removal,
752-
// PG::on_new_interval
753-
return interruptor::async([this, &evt, &rctx] {
754-
peering_state.handle_event(
755-
evt.get_event(),
756-
&rctx);
757-
peering_state.write_if_dirty(rctx.transaction);
758-
});
749+
peering_state.handle_event(
750+
evt.get_event(),
751+
&rctx);
752+
peering_state.write_if_dirty(rctx.transaction);
759753
}
760754
}
761755

762-
seastar::future<> PG::handle_advance_map(
756+
void PG::handle_advance_map(
763757
cached_map_t next_map, PeeringCtx &rctx)
764758
{
765-
return seastar::async([this, next_map=std::move(next_map), &rctx] {
766-
vector<int> newup, newacting;
767-
int up_primary, acting_primary;
768-
next_map->pg_to_up_acting_osds(
769-
pgid.pgid,
770-
&newup, &up_primary,
771-
&newacting, &acting_primary);
772-
peering_state.advance_map(
773-
next_map,
774-
peering_state.get_osdmap(),
775-
newup,
776-
up_primary,
777-
newacting,
778-
acting_primary,
779-
rctx);
780-
osdmap_gate.got_map(next_map->get_epoch());
781-
});
782-
}
783-
784-
seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
759+
vector<int> newup, newacting;
760+
int up_primary, acting_primary;
761+
next_map->pg_to_up_acting_osds(
762+
pgid.pgid,
763+
&newup, &up_primary,
764+
&newacting, &acting_primary);
765+
peering_state.advance_map(
766+
next_map,
767+
peering_state.get_osdmap(),
768+
newup,
769+
up_primary,
770+
newacting,
771+
acting_primary,
772+
rctx);
773+
osdmap_gate.got_map(next_map->get_epoch());
774+
}
775+
776+
void PG::handle_activate_map(PeeringCtx &rctx)
785777
{
786-
return seastar::async([this, &rctx] {
787-
peering_state.activate_map(rctx);
788-
});
778+
peering_state.activate_map(rctx);
789779
}
790780

791-
seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
781+
void PG::handle_initialize(PeeringCtx &rctx)
792782
{
793-
return seastar::async([this, &rctx] {
794-
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
795-
});
783+
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
796784
}
797785

798786
void PG::init_collection_pool_opts()

0 commit comments

Comments
 (0)