@@ -172,6 +172,10 @@ static seastar::future<> run(
172172 bool is_stopped = false ;
173173 std::optional<seastar::future<>> fut_report;
174174
175+ unsigned conn_count = 0 ;
176+ unsigned msg_count = 0 ;
177+ MessageRef last_msg;
178+
175179 // available in all shards
176180 unsigned msg_len;
177181 bufferlist msg_data;
@@ -188,6 +192,28 @@ static seastar::future<> run(
188192 }
189193 }
190194
195+ void ms_handle_connect (
196+ crimson::net::ConnectionRef,
197+ seastar::shard_id) override {
198+ ceph_abort (" impossible, server won't connect" );
199+ }
200+
201+ void ms_handle_accept (
202+ crimson::net::ConnectionRef,
203+ seastar::shard_id new_shard,
204+ bool is_replace) override {
205+ ceph_assert_always (new_shard == seastar::this_shard_id ());
206+ auto &server = container ().local ();
207+ ++server.conn_count ;
208+ }
209+
210+ void ms_handle_reset (
211+ crimson::net::ConnectionRef,
212+ bool ) override {
213+ auto &server = container ().local ();
214+ --server.conn_count ;
215+ }
216+
191217 std::optional<seastar::future<>> ms_dispatch (
192218 crimson::net::ConnectionRef c, MessageRef m) override {
193219 assert (c->get_shard_id () == seastar::this_shard_id ());
@@ -205,7 +231,12 @@ static seastar::future<> run(
205231 bufferlist data (server.msg_data );
206232 rep->write (0 , server.msg_len , data);
207233 rep->set_tid (m->get_tid ());
234+ ++server.msg_count ;
208235 std::ignore = c->send (std::move (rep));
236+
237+ if (server.msg_count % 16 == 0 ) {
238+ server.last_msg = std::move (m);
239+ }
209240 return {seastar::now ()};
210241 }
211242
@@ -251,51 +282,136 @@ static seastar::future<> run(
251282 }
252283
253284 private:
285+ struct ShardReport {
286+ unsigned msg_count = 0 ;
287+
288+ // per-interval metrics
289+ double reactor_utilization;
290+ unsigned conn_count = 0 ;
291+ int msg_size = 0 ;
292+ unsigned msg_count_interval = 0 ;
293+ };
294+
295+ // should not be called frequently to impact performance
296+ void get_report (ShardReport& last) {
297+ unsigned last_msg_count = last.msg_count ;
298+ int msg_size = -1 ;
299+ if (last_msg) {
300+ auto msg = boost::static_pointer_cast<MOSDOp>(last_msg);
301+ msg->finish_decode ();
302+ ceph_assert_always (msg->ops .size () == 1 );
303+ msg_size = msg->ops [0 ].op .extent .length ;
304+ last_msg.reset ();
305+ }
306+
307+ last.msg_count = msg_count;
308+ last.reactor_utilization = get_reactor_utilization ();
309+ last.conn_count = conn_count;
310+ last.msg_size = msg_size;
311+ last.msg_count_interval = msg_count - last_msg_count;
312+ }
313+
254314 struct TimerReport {
255315 unsigned elapsed = 0u ;
316+ mono_time start_time = mono_clock::zero();
317+ std::vector<ShardReport> reports;
256318
257- seastar::future<> ticktock (bool is_fixed_cpu) {
258- return seastar::sleep (1s
259- ).then ([this , is_fixed_cpu] {
260- ++elapsed;
261- if (is_fixed_cpu) {
262- std::ostringstream sout;
263- sout << elapsed
264- << " s -- server reactor utilization: "
265- << get_reactor_utilization ();
266- std::cout << sout.str () << std::endl;
267- return seastar::now ();
268- } else {
269- return seastar::do_with (
270- std::vector<double >(seastar::smp::count),
271- [this ](auto &rus) {
272- return seastar::smp::invoke_on_all ([&rus] {
273- rus[seastar::this_shard_id ()] = get_reactor_utilization ();
274- }).then ([this , &rus] {
275- std::ostringstream sout;
276- sout << elapsed
277- << " s -- server reactor utilization: " ;
278- for (double ru : rus) {
279- sout << ru << " ," ;
280- }
281- std::cout << sout.str () << std::endl;
282- });
283- });
284- }
285- });
286- }
319+ TimerReport (unsigned shards) : reports(shards) {}
287320 };
288321
289322 void start_report () {
290323 seastar::promise<> pr_report;
291324 fut_report = pr_report.get_future ();
292325 seastar::do_with (
293- TimerReport (),
326+ TimerReport (seastar::smp::count ),
294327 [this ](auto &report) {
295328 return seastar::do_until (
296329 [this ] { return is_stopped; },
297330 [&report, this ] {
298- return report.ticktock (is_fixed_cpu);
331+ return seastar::sleep (2s
332+ ).then ([&report, this ] {
333+ report.elapsed += 2 ;
334+ if (is_fixed_cpu) {
335+ return seastar::smp::submit_to (msgr_sid,
336+ [&report, this ] {
337+ auto &server = container ().local ();
338+ server.get_report (report.reports [seastar::this_shard_id ()]);
339+ }).then ([&report, this ] {
340+ auto now = mono_clock::now ();
341+ auto prv = report.start_time ;
342+ report.start_time = now;
343+ if (prv == mono_clock::zero ()) {
344+ // cannot compute duration
345+ return ;
346+ }
347+ std::chrono::duration<double > duration_d = now - prv;
348+ double duration = duration_d.count ();
349+ auto &ireport = report.reports [msgr_sid];
350+ double iops = ireport.msg_count_interval / duration;
351+ double throughput_MB = -1 ;
352+ if (ireport.msg_size >= 0 ) {
353+ throughput_MB = iops * ireport.msg_size / 1048576 ;
354+ }
355+ std::ostringstream sout;
356+ sout << setfill (' ' )
357+ << report.elapsed
358+ << " (" << std::setw (5 ) << duration << " ) "
359+ << std::setw (9 ) << iops << " IOPS "
360+ << std::setw (8 ) << throughput_MB << " MiB/s "
361+ << ireport.reactor_utilization
362+ << " (" << ireport.conn_count << " )" ;
363+ std::cout << sout.str () << std::endl;
364+ });
365+ } else {
366+ return seastar::smp::invoke_on_all ([&report, this ] {
367+ auto &server = container ().local ();
368+ server.get_report (report.reports [seastar::this_shard_id ()]);
369+ }).then ([&report, this ] {
370+ auto now = mono_clock::now ();
371+ auto prv = report.start_time ;
372+ report.start_time = now;
373+ if (prv == mono_clock::zero ()) {
374+ // cannot compute duration
375+ return ;
376+ }
377+ std::chrono::duration<double > duration_d = now - prv;
378+ double duration = duration_d.count ();
379+ unsigned num_msgs = 0 ;
380+ // -1 means unavailable, -2 means mismatch
381+ int msg_size = -1 ;
382+ for (auto &i : report.reports ) {
383+ if (i.msg_size >= 0 ) {
384+ if (msg_size == -2 ) {
385+ // pass
386+ } else if (msg_size == -1 ) {
387+ msg_size = i.msg_size ;
388+ } else {
389+ if (msg_size != i.msg_size ) {
390+ msg_size = -2 ;
391+ }
392+ }
393+ }
394+ num_msgs += i.msg_count_interval ;
395+ }
396+ double iops = num_msgs / duration;
397+ double throughput_MB = msg_size;
398+ if (msg_size >= 0 ) {
399+ throughput_MB = iops * msg_size / 1048576 ;
400+ }
401+ std::ostringstream sout;
402+ sout << setfill (' ' )
403+ << report.elapsed
404+ << " (" << std::setw (5 ) << duration << " ) "
405+ << std::setw (9 ) << iops << " IOPS "
406+ << std::setw (8 ) << throughput_MB << " MiB/s " ;
407+ for (auto &i : report.reports ) {
408+ sout << i.reactor_utilization
409+ << " (" << i.conn_count << " ) " ;
410+ }
411+ std::cout << sout.str () << std::endl;
412+ });
413+ }
414+ });
299415 }
300416 );
301417 }).then ([this ] {
@@ -699,11 +815,11 @@ static seastar::future<> run(
699815 void report_header () const {
700816 std::ostringstream sout;
701817 sout << std::setfill (' ' )
702- << std::setw (7 ) << " sec"
703- << std::setw (6 ) << " depth"
704- << std::setw (8 ) << " IOPS"
705- << std::setw (8 ) << " MB/s"
706- << std::setw (8 ) << " lat(ms)" ;
818+ << std::setw (6 ) << " sec"
819+ << std::setw (7 ) << " depth"
820+ << std::setw (10 ) << " IOPS"
821+ << std::setw (9 ) << " MB/s"
822+ << std::setw (9 ) << " lat(ms)" ;
707823 std::cout << sout.str () << std::endl;
708824 }
709825
@@ -724,10 +840,14 @@ static seastar::future<> run(
724840 double iops = ops/elapsed_s;
725841 std::ostringstream sout;
726842 sout << setfill (' ' )
727- << std::setw (7 ) << elapsed_s
843+ << std::setw (5 ) << elapsed_s
844+ << " "
728845 << std::setw (6 ) << depth
729- << std::setw (8 ) << iops
846+ << " "
847+ << std::setw (9 ) << iops
848+ << " "
730849 << std::setw (8 ) << iops * bytes_of_block / 1048576
850+ << " "
731851 << std::setw (8 ) << (sampled_total_lat_s / sampled_count * 1000 )
732852 << " -- " ;
733853 if (server_reactor_utilization.has_value ()) {
0 commit comments