@@ -108,6 +108,7 @@ OSD::OSD(int id, uint32_t nonce,
108108 log_client (cluster_msgr.get(), LogClient::NO_FLAGS),
109109 clog (log_client.create_channel())
110110{
111+ ceph_assert (seastar::this_shard_id () == PRIMARY_CORE);
111112 for (auto msgr : {std::ref (cluster_msgr), std::ref (public_msgr),
112113 std::ref (hb_front_msgr), std::ref (hb_back_msgr)}) {
113114 msgr.get ()->set_auth_server (monc.get ());
@@ -378,8 +379,6 @@ seastar::future<> OSD::start()
378379 }).then ([this ] {
379380 return shard_dispatchers.start (
380381 std::ref (*this ),
381- whoami,
382- std::ref (store),
383382 std::ref (pg_to_shard_mappings));
384383 });
385384 }).then ([this ] {
@@ -776,9 +775,7 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
776775 case MSG_OSD_PG_UPDATE_LOG_MISSING:
777776 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
778777 {
779- return conn.get_foreign ().then ([this , m = std::move (m)](auto f_conn) {
780- return shard_dispatchers.local ().ms_dispatch (std::move (f_conn), std::move (m));
781- });
778+ return shard_dispatchers.local ().ms_dispatch (conn, std::move (m));
782779 }
783780 default :
784781 {
@@ -792,22 +789,26 @@ OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
792789
793790seastar::future<>
794791OSD::ShardDispatcher::ms_dispatch (
795- crimson::net::ConnectionFRef f_conn ,
792+ crimson::net::ConnectionRef conn ,
796793 MessageRef m)
797794{
798795 if (seastar::this_shard_id () != PRIMARY_CORE) {
799796 switch (m->get_type ()) {
800797 case CEPH_MSG_OSD_MAP:
801798 case MSG_COMMAND:
802799 case MSG_OSD_MARK_ME_DOWN:
803- return container ().invoke_on (PRIMARY_CORE,
804- [f_conn = std::move (f_conn), m = std::move (m)]
805- (auto & local_dispatcher) mutable {
806- return local_dispatcher.ms_dispatch (std::move (f_conn), std::move (m));
800+ // FIXME: order is not guaranteed in this path
801+ return conn.get_foreign (
802+ ).then ([this , m=std::move (m)](auto f_conn) {
803+ return container ().invoke_on (PRIMARY_CORE,
804+ [f_conn=std::move (f_conn), m=std::move (m)]
805+ (auto & local_dispatcher) mutable {
806+ auto conn = make_local_shared_foreign (std::move (f_conn));
807+ return local_dispatcher.ms_dispatch (conn, std::move (m));
808+ });
807809 });
808810 }
809811 }
810- crimson::net::ConnectionRef conn = make_local_shared_foreign (std::move (f_conn));
811812
812813 switch (m->get_type ()) {
813814 case CEPH_MSG_OSD_MAP:
@@ -1037,7 +1038,7 @@ seastar::future<> OSD::ShardDispatcher::_handle_osd_map(Ref<MOSDMap> m)
10371038 pg_shard_manager.get_meta_coll ().store_superblock (t, osd.superblock );
10381039 pg_shard_manager.set_superblock (osd.superblock );
10391040 logger ().debug (" OSD::handle_osd_map: do_transaction..." );
1040- return store.get_sharded_store ().do_transaction (
1041+ return osd. store .get_sharded_store ().do_transaction (
10411042 pg_shard_manager.get_meta_coll ().collection (),
10421043 std::move (t));
10431044 });
@@ -1053,7 +1054,7 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
10531054 Ref<MOSDMap> m)
10541055{
10551056 ceph_assert (seastar::this_shard_id () == PRIMARY_CORE);
1056- logger ().info (" osd.{}: committed_osd_maps({}, {})" , whoami, first, last);
1057+ logger ().info (" osd.{}: committed_osd_maps({}, {})" , osd. whoami , first, last);
10571058 // advance through the new maps
10581059 return seastar::do_for_each (boost::make_counting_iterator (first),
10591060 boost::make_counting_iterator (last + 1 ),
@@ -1065,8 +1066,8 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
10651066 return pg_shard_manager.update_map (std::move (o));
10661067 }).then ([this ] {
10671068 if (get_shard_services ().get_up_epoch () == 0 &&
1068- osd.osdmap ->is_up (whoami) &&
1069- osd.osdmap ->get_addrs (whoami) == osd.public_msgr ->get_myaddrs ()) {
1069+ osd.osdmap ->is_up (osd. whoami ) &&
1070+ osd.osdmap ->get_addrs (osd. whoami ) == osd.public_msgr ->get_myaddrs ()) {
10701071 return pg_shard_manager.set_up_epoch (
10711072 osd.osdmap ->get_epoch ()
10721073 ).then ([this ] {
@@ -1080,15 +1081,15 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
10801081 });
10811082 }).then ([m, this ] {
10821083 auto fut = seastar::now ();
1083- if (osd.osdmap ->is_up (whoami)) {
1084- const auto up_from = osd.osdmap ->get_up_from (whoami);
1084+ if (osd.osdmap ->is_up (osd. whoami )) {
1085+ const auto up_from = osd.osdmap ->get_up_from (osd. whoami );
10851086 logger ().info (" osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}" ,
1086- whoami, osd.osdmap ->get_epoch (), up_from, osd.bind_epoch ,
1087+ osd. whoami , osd.osdmap ->get_epoch (), up_from, osd.bind_epoch ,
10871088 pg_shard_manager.get_osd_state_string ());
10881089 if (osd.bind_epoch < up_from &&
1089- osd.osdmap ->get_addrs (whoami) == osd.public_msgr ->get_myaddrs () &&
1090+ osd.osdmap ->get_addrs (osd. whoami ) == osd.public_msgr ->get_myaddrs () &&
10901091 pg_shard_manager.is_booting ()) {
1091- logger ().info (" osd.{}: activating..." , whoami);
1092+ logger ().info (" osd.{}: activating..." , osd. whoami );
10921093 fut = pg_shard_manager.set_active ().then ([this ] {
10931094 osd.beacon_timer .arm_periodic (
10941095 std::chrono::seconds (local_conf ()->osd_beacon_report_interval ));
@@ -1107,15 +1108,15 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
11071108 return check_osdmap_features ().then ([this ] {
11081109 // yay!
11091110 logger ().info (" osd.{}: committed_osd_maps: broadcasting osdmaps up"
1110- " to {} epoch to pgs" , whoami, osd.osdmap ->get_epoch ());
1111+ " to {} epoch to pgs" , osd. whoami , osd.osdmap ->get_epoch ());
11111112 return pg_shard_manager.broadcast_map_to_pgs (osd.osdmap ->get_epoch ());
11121113 });
11131114 });
11141115 }).then ([m, this ] {
11151116 if (pg_shard_manager.is_active ()) {
1116- logger ().info (" osd.{}: now active" , whoami);
1117- if (!osd.osdmap ->exists (whoami) ||
1118- osd.osdmap ->is_stop (whoami)) {
1117+ logger ().info (" osd.{}: now active" , osd. whoami );
1118+ if (!osd.osdmap ->exists (osd. whoami ) ||
1119+ osd.osdmap ->is_stop (osd. whoami )) {
11191120 return osd.shutdown ();
11201121 }
11211122 if (osd.should_restart ()) {
@@ -1124,17 +1125,17 @@ seastar::future<> OSD::ShardDispatcher::committed_osd_maps(
11241125 return seastar::now ();
11251126 }
11261127 } else if (pg_shard_manager.is_preboot ()) {
1127- logger ().info (" osd.{}: now preboot" , whoami);
1128+ logger ().info (" osd.{}: now preboot" , osd. whoami );
11281129
11291130 if (m->get_source ().is_mon ()) {
11301131 return osd._preboot (
11311132 m->cluster_osdmap_trim_lower_bound , m->newest_map );
11321133 } else {
1133- logger ().info (" osd.{}: start_boot" , whoami);
1134+ logger ().info (" osd.{}: start_boot" , osd. whoami );
11341135 return osd.start_boot ();
11351136 }
11361137 } else {
1137- logger ().info (" osd.{}: now {}" , whoami,
1138+ logger ().info (" osd.{}: now {}" , osd. whoami ,
11381139 pg_shard_manager.get_osd_state_string ());
11391140 // XXX
11401141 return seastar::now ();
@@ -1376,8 +1377,9 @@ seastar::future<> OSD::ShardDispatcher::handle_peering_op(
13761377
13771378seastar::future<> OSD::ShardDispatcher::check_osdmap_features ()
13781379{
1379- return store.write_meta (" require_osd_release" ,
1380- stringify ((int )osd.osdmap ->require_osd_release ));
1380+ return osd.store .write_meta (
1381+ " require_osd_release" ,
1382+ stringify ((int )osd.osdmap ->require_osd_release ));
13811383}
13821384
13831385seastar::future<> OSD::prepare_to_stop ()
0 commit comments