1111#include < fmt/os.h>
1212#include < fmt/ostream.h>
1313#include < seastar/core/timer.hh>
14+ #include < seastar/coroutine/parallel_for_each.hh>
1415
1516#include " common/pick_address.h"
1617#include " include/util.h"
@@ -1146,11 +1147,11 @@ seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
11461147 INFO (" {}" , *m);
11471148 if (m->fsid != superblock.cluster_fsid ) {
11481149 WARN (" fsid mismatched" );
1149- return seastar::now () ;
1150+ co_return ;
11501151 }
11511152 if (pg_shard_manager.is_initializing ()) {
11521153 WARN (" i am still initializing" );
1153- return seastar::now () ;
1154+ co_return ;
11541155 }
11551156
11561157 const auto first = m->get_first ();
@@ -1171,183 +1172,145 @@ seastar::future<> OSD::_handle_osd_map(Ref<MOSDMap> m)
11711172 // make sure there is something new, here, before we bother flushing
11721173 // the queues and such
11731174 if (last <= superblock.get_newest_map ()) {
1174- return seastar::now () ;
1175+ co_return ;
11751176 }
11761177 // missing some?
11771178 epoch_t start = superblock.get_newest_map () + 1 ;
11781179 if (first > start) {
11791180 INFO (" message skips epochs {}..{}" ,
11801181 start, first - 1 );
11811182 if (m->cluster_osdmap_trim_lower_bound <= start) {
1182- return get_shard_services ().osdmap_subscribe (start, false );
1183+ co_return co_await get_shard_services ().osdmap_subscribe (start, false );
11831184 }
11841185 // always try to get the full range of maps--as many as we can. this
11851186 // 1- is good to have
11861187 // 2- is at present the only way to ensure that we get a *full* map as
11871188 // the first map!
11881189 if (m->cluster_osdmap_trim_lower_bound < first) {
1189- return get_shard_services ().osdmap_subscribe (
1190+ co_return co_await get_shard_services ().osdmap_subscribe (
11901191 m->cluster_osdmap_trim_lower_bound - 1 , true );
11911192 }
11921193 }
11931194
1194- return seastar::do_with (ceph::os::Transaction{},
1195- [=, this ](auto & t) {
1196- return pg_shard_manager.store_maps (t, start, m).then ([=, this , &t] {
1197- // even if this map isn't from a mon, we may have satisfied our subscription
1198- monc->sub_got (" osdmap" , last);
1195+ ceph::os::Transaction t;
1196+ co_await pg_shard_manager.store_maps (t, start, m);
11991197
1200- if (!superblock.is_maps_empty ()) {
1201- pg_shard_manager.trim_maps (t, superblock);
1202- // TODO: once we support pg splitting, update pg_num_history here
1203- // pg_num_history.prune(superblock.get_oldest_map());
1204- }
1198+ // even if this map isn't from a mon, we may have satisfied our subscription
1199+ monc->sub_got (" osdmap" , last);
12051200
1206- superblock.insert_osdmap_epochs (first, last);
1207- superblock.current_epoch = last;
1201+ if (!superblock.is_maps_empty ()) {
1202+ pg_shard_manager.trim_maps (t, superblock);
1203+ }
12081204
1209- // note in the superblock that we were clean thru the prior epoch
1210- if (boot_epoch && boot_epoch >= superblock.mounted ) {
1211- superblock.mounted = boot_epoch;
1212- superblock.clean_thru = last;
1213- }
1214- pg_shard_manager.get_meta_coll ().store_superblock (t, superblock);
1215- return pg_shard_manager.set_superblock (superblock).then (
1216- [FNAME, this , &t] {
1217- DEBUG (" submitting transaction" );
1218- return store.get_sharded_store ().do_transaction (
1219- pg_shard_manager.get_meta_coll ().collection (),
1220- std::move (t));
1221- });
1222- });
1223- }).then ([=, this ] {
1224- // TODO: write to superblock and commit the transaction
1225- return committed_osd_maps (start, last, m);
1226- });
1205+ superblock.insert_osdmap_epochs (first, last);
1206+ superblock.current_epoch = last;
1207+
1208+ // note in the superblock that we were clean thru the prior epoch
1209+ if (boot_epoch && boot_epoch >= superblock.mounted ) {
1210+ superblock.mounted = boot_epoch;
1211+ superblock.clean_thru = last;
1212+ }
1213+ pg_shard_manager.get_meta_coll ().store_superblock (t, superblock);
1214+ co_await pg_shard_manager.set_superblock (superblock);
1215+
1216+ DEBUG (" submitting transaction" );
1217+ co_await store.get_sharded_store ().do_transaction (
1218+ pg_shard_manager.get_meta_coll ().collection (), std::move (t));
1219+
1220+ // TODO: write to superblock and commit the transaction
1221+ co_await committed_osd_maps (start, last, m);
12271222}
12281223
12291224seastar::future<> OSD::committed_osd_maps (
1230- version_t first,
1231- version_t last,
1225+ epoch_t first,
1226+ epoch_t last,
12321227 Ref<MOSDMap> m)
12331228{
12341229 LOG_PREFIX (OSD::committed_osd_maps);
12351230 ceph_assert (seastar::this_shard_id () == PRIMARY_CORE);
12361231 INFO (" osd.{} ({}, {})" , whoami, first, last);
12371232 // advance through the new maps
12381233 auto old_map = osdmap;
1239- return seastar::do_for_each (boost::make_counting_iterator (first),
1240- boost::make_counting_iterator (last + 1 ),
1241- [this , old_map, FNAME](epoch_t cur) {
1242- return pg_shard_manager.get_local_map (
1243- cur
1244- ).then ([this , old_map, FNAME](OSDMapService::local_cached_map_t && o) {
1245- osdmap = make_local_shared_foreign (OSDMapService::local_cached_map_t (o));
1246- std::set<int > old_osds;
1247- old_map->get_all_osds (old_osds);
1248- return seastar::parallel_for_each (
1249- old_osds,
1250- [this , FNAME, old_map](auto &osd_id) {
1251- DEBUG (" osd.{}: whoami ? {}, old up ? {} , now down ? {}" ,
1252- osd_id, osd_id != whoami,
1253- old_map->is_up (osd_id), osdmap->is_down (osd_id));
1254- if (osd_id != whoami &&
1255- old_map->is_up (osd_id) &&
1256- osdmap->is_down (osd_id)) {
1257- DEBUG (" osd.{}: mark osd.{} down" , whoami, osd_id);
1258- return cluster_msgr->mark_down (
1259- osdmap->get_cluster_addrs (osd_id).front ());
1260- }
1261- return seastar::now ();
1262- }).then ([this , o=std::move (o)]() mutable {
1263- return pg_shard_manager.update_map (std::move (o));
1264- });
1265- }).then ([this ] {
1266- if (get_shard_services ().get_up_epoch () == 0 &&
1267- osdmap->is_up (whoami) &&
1268- osdmap->get_addrs (whoami) == public_msgr->get_myaddrs ()) {
1269- return pg_shard_manager.set_up_epoch (
1270- osdmap->get_epoch ()
1271- ).then ([this ] {
1272- if (!boot_epoch) {
1273- boot_epoch = osdmap->get_epoch ();
1274- }
1275- });
1276- } else {
1277- return seastar::now ();
1234+ for (epoch_t cur = first; cur <= last; cur++) {
1235+ OSDMapService::local_cached_map_t && o = co_await pg_shard_manager.get_local_map (cur);
1236+ osdmap = make_local_shared_foreign (OSDMapService::local_cached_map_t (o));
1237+ std::set<int > old_osds;
1238+ old_map->get_all_osds (old_osds);
1239+ co_await seastar::coroutine::parallel_for_each (old_osds,
1240+ [this , FNAME, old_map](auto &osd_id) -> seastar::future<> {
1241+ DEBUG (" osd.{}: whoami ? {}, old up ? {} , now down ? {}" ,
1242+ osd_id, osd_id != whoami,
1243+ old_map->is_up (osd_id), osdmap->is_down (osd_id));
1244+ if (osd_id != whoami &&
1245+ old_map->is_up (osd_id) &&
1246+ osdmap->is_down (osd_id)) {
1247+ DEBUG (" osd.{}: mark osd.{} down" , whoami, osd_id);
1248+ co_await cluster_msgr->mark_down (osdmap->get_cluster_addrs (osd_id).front ());
12781249 }
12791250 });
1280- }).then ([FNAME, m, this ] {
1281- auto fut = seastar::now ();
1282- if (osdmap->is_up (whoami)) {
1283- const auto up_from = osdmap->get_up_from (whoami);
1284- INFO (" osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}" ,
1285- whoami, osdmap->get_epoch (), up_from, bind_epoch,
1286- pg_shard_manager.get_osd_state_string ());
1287- if (bind_epoch < up_from &&
1288- osdmap->get_addrs (whoami) == public_msgr->get_myaddrs () &&
1289- pg_shard_manager.is_booting ()) {
1290- INFO (" osd.{}: activating..." , whoami);
1291- fut = pg_shard_manager.set_active ().then ([this ] {
1292- beacon_timer.arm_periodic (
1293- std::chrono::seconds (local_conf ()->osd_beacon_report_interval ));
1294- // timer continuation rearms when complete
1295- tick_timer.arm (
1296- std::chrono::seconds (TICK_INTERVAL));
1297- });
1298- }
1299- } else {
1300- if (pg_shard_manager.is_prestop ()) {
1301- got_stop_ack ();
1302- return seastar::now ();
1251+
1252+ co_await pg_shard_manager.update_map (std::move (o));
1253+ if (get_shard_services ().get_up_epoch () == 0 &&
1254+ osdmap->is_up (whoami) &&
1255+ osdmap->get_addrs (whoami) == public_msgr->get_myaddrs ()) {
1256+ co_await pg_shard_manager.set_up_epoch (osdmap->get_epoch ());
1257+ if (!boot_epoch) {
1258+ boot_epoch = osdmap->get_epoch ();
13031259 }
13041260 }
1305- return fut.then ([this ] {
1306- return update_heartbeat_peers ();
1307- }).then ([FNAME, this ] {
1308- return check_osdmap_features ().then ([FNAME, this ] {
1309- // yay!
1310- INFO (" osd.{}: committed_osd_maps: broadcasting osdmaps up"
1311- " to {} epoch to pgs" , whoami, osdmap->get_epoch ());
1312- return pg_shard_manager.broadcast_map_to_pgs (osdmap->get_epoch ());
1313- });
1314- });
1315- }).then ([FNAME, m, this ] {
1316- if (pg_shard_manager.is_active ()) {
1317- INFO (" osd.{}: now active" , whoami);
1318- if (!osdmap->exists (whoami) ||
1319- osdmap->is_stop (whoami)) {
1320- return shutdown ();
1321- }
1322- if (should_restart ()) {
1323- return restart ();
1324- } else if (!pg_shard_manager.is_stopping ()) {
1325- /*
1326- * TODO: Missing start_waiting_for_healthy() counterpart.
1327- * Only subscribe to the next map until implemented.
1328- * See https://tracker.ceph.com/issues/66832
1329- */
1330- return get_shard_services ().osdmap_subscribe (osdmap->get_epoch () + 1 , false );
1331- } else {
1332- return seastar::now ();
1333- }
1334- } else if (pg_shard_manager.is_preboot ()) {
1335- INFO (" osd.{}: now preboot" , whoami);
1261+ }
13361262
1337- if (m->get_source ().is_mon ()) {
1338- return _preboot (
1339- m->cluster_osdmap_trim_lower_bound , m->newest_map );
1340- } else {
1341- INFO (" osd.{}: start_boot" , whoami);
1342- return start_boot ();
1343- }
1263+ if (osdmap->is_up (whoami)) {
1264+ const auto up_from = osdmap->get_up_from (whoami);
1265+ INFO (" osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}" ,
1266+ whoami, osdmap->get_epoch (), up_from, bind_epoch,
1267+ pg_shard_manager.get_osd_state_string ());
1268+ if (bind_epoch < up_from &&
1269+ osdmap->get_addrs (whoami) == public_msgr->get_myaddrs () &&
1270+ pg_shard_manager.is_booting ()) {
1271+ INFO (" osd.{}: activating..." , whoami);
1272+ co_await pg_shard_manager.set_active ();
1273+ beacon_timer.arm_periodic (
1274+ std::chrono::seconds (local_conf ()->osd_beacon_report_interval ));
1275+ // timer continuation rearms when complete
1276+ tick_timer.arm (std::chrono::seconds (TICK_INTERVAL));
1277+ }
1278+ co_await update_heartbeat_peers ();
1279+ co_await check_osdmap_features ();
1280+ // yay!
1281+ INFO (" osd.{}: committed_osd_maps: broadcasting osdmaps up"
1282+ " to {} epoch to pgs" , whoami, osdmap->get_epoch ());
1283+ co_await pg_shard_manager.broadcast_map_to_pgs (osdmap->get_epoch ());
1284+ } else {
1285+ if (pg_shard_manager.is_prestop ()) {
1286+ got_stop_ack ();
1287+ }
1288+ }
1289+
1290+ if (pg_shard_manager.is_active ()) {
1291+ INFO (" osd.{}: now active" , whoami);
1292+ if (!osdmap->exists (whoami) || osdmap->is_stop (whoami)) {
1293+ co_await shutdown ();
1294+ } else if (should_restart ()) {
1295+ co_await restart ();
1296+ } else if (!pg_shard_manager.is_stopping ()) {
1297+ /*
1298+ * TODO: Missing start_waiting_for_healthy() counterpart.
1299+ * Only subscribe to the next map until implemented.
1300+ * See https://tracker.ceph.com/issues/66832
1301+ */
1302+ co_await get_shard_services ().osdmap_subscribe (osdmap->get_epoch () + 1 , false );
1303+ }
1304+ } else if (pg_shard_manager.is_preboot ()) {
1305+ INFO (" osd.{}: now preboot" , whoami);
1306+ if (m->get_source ().is_mon ()) {
1307+ co_await _preboot (m->cluster_osdmap_trim_lower_bound , m->newest_map );
13441308 } else {
1345- INFO (" osd.{}: now {}" , whoami,
1346- pg_shard_manager.get_osd_state_string ());
1347- // XXX
1348- return seastar::now ();
1309+ INFO (" osd.{}: start_boot" , whoami);
1310+ co_await start_boot ();
13491311 }
1350- });
1312+ }
1313+ INFO (" osd.{}: now {}" , whoami, pg_shard_manager.get_osd_state_string ());
13511314}
13521315
13531316seastar::future<> OSD::handle_osd_op (
0 commit comments