@@ -485,10 +485,13 @@ void LatencyTestPublisher::CommandReaderListener::on_data_available(
485485 TestCommandType command;
486486 SampleInfo info;
487487
488- if (reader->take_next_sample (
489- &command, &info) == RETCODE_OK
490- && info.valid_data )
488+ while (reader->take_next_sample (&command, &info) == RETCODE_OK)
491489 {
490+ if (!info.valid_data )
491+ {
492+ continue ;
493+ }
494+
492495 if (command.m_command == BEGIN
493496 || command.m_command == END )
494497 {
@@ -498,10 +501,6 @@ void LatencyTestPublisher::CommandReaderListener::on_data_available(
498501 latency_publisher_->command_msg_cv_ .notify_one ();
499502 }
500503 }
501- else
502- {
503- EPROSIMA_LOG_INFO (LatencyTest, " Problem reading command message" );
504- }
505504}
506505
507506void LatencyTestPublisher::LatencyDataReaderListener::on_data_available (
@@ -513,110 +512,139 @@ void LatencyTestPublisher::LatencyDataReaderListener::on_data_available(
513512 LoanableSequence<LatencyType> data_seq;
514513 std::chrono::duration<uint32_t , std::nano> bounce_time (0 );
515514
516- if (pub->data_loans_ )
515+ ReturnCode_t ret_code = RETCODE_OK;
516+ while (RETCODE_OK == ret_code)
517517 {
518- if (RETCODE_OK != reader-> take (data_seq, infos, 1 ) )
518+ if (pub-> data_loans_ )
519519 {
520- EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed loaned test data" );
521- return ;
520+ ret_code = reader->take (data_seq, infos, 1 );
521+ if (RETCODE_OK != ret_code)
522+ {
523+ if (RETCODE_NO_DATA != ret_code)
524+ {
525+ EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed loaned test data" );
526+ }
527+ return ;
528+ }
522529 }
523- }
524- else
525- {
526- SampleInfo info;
527- void * data = pub->dynamic_types_ ?
528- (void *)pub->dynamic_data_in_ :
529- (void *)pub->latency_data_in_ ;
530-
531- // Retrieved echoed data
532- if (reader->take_next_sample (
533- data, &info) != RETCODE_OK
534- || !info.valid_data )
530+ else
535531 {
536- EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed test data" );
537- return ;
538- }
539- }
540-
541- // Atomic managemente of the sample
542- bool notify = false ;
543- {
544- std::lock_guard<std::mutex> lock (pub->mutex_ );
532+ SampleInfo info;
533+ void * data = pub->dynamic_types_ ?
534+ (void *)pub->dynamic_data_in_ :
535+ (void *)pub->latency_data_in_ ;
536+
537+ // Retrieved echoed data
538+ ret_code = reader->take_next_sample (data, &info);
539+ if (RETCODE_OK != ret_code)
540+ {
541+ if (RETCODE_NO_DATA != ret_code)
542+ {
543+ EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed test data" );
544+ }
545+ return ;
546+ }
545547
546- if (pub->data_loans_ )
547- {
548- // we have requested a single sample
549- assert (infos.length () == 1 && data_seq.length () == 1 );
550- // we have already released the former loan
551- assert (pub->latency_data_in_ == nullptr );
552- // reference the loaned data
553- pub->latency_data_in_ = &data_seq[0 ];
554- // retrieve the bounce time
555- bounce_time = std::chrono::duration<uint32_t , std::nano>(pub->latency_data_in_ ->bounce );
548+ if (!info.valid_data )
549+ {
550+ // No valid data, continue to next sample
551+ continue ;
552+ }
556553 }
557554
558- // Check if is the expected echo message
559- uint32_t dyn_value_in {0 };
560- uint32_t dyn_value_out {0 };
561- if (pub->dynamic_types_ )
555+ // Atomic management of the sample
556+ bool notify = false ;
557+ // This loop allows us to the scope of the lock_guard without using goto.
558+ // We need this to avoid calling return_loan() while the mutex is locked, as it
559+ // may cause an ABBA deadlock.
560+ while (true )
562561 {
563- (*pub->dynamic_data_in_ )->get_uint32_value (dyn_value_in, 0 );
564- (*pub->dynamic_data_out_ )->get_uint32_value (dyn_value_out, 0 );
565- }
562+ std::lock_guard<std::mutex> lock (pub->mutex_ );
566563
567- if ((pub->dynamic_types_ && dyn_value_in != dyn_value_out)
568- || (!pub->dynamic_types_ && pub->latency_data_in_ ->seqnum != pub->latency_data_out_ ->seqnum ))
569- {
570- EPROSIMA_LOG_INFO (LatencyTest, " Echo message received is not the expected one" );
571- }
572- else
573- {
574- // Factor of 2 below is to calculate the roundtrip divided by two. Note that nor the overhead does not
575- // need to be halved, as we access the clock twice per round trip
576- pub->end_time_ = std::chrono::steady_clock::now ();
577- pub->end_time_ -= bounce_time;
578- auto roundtrip = std::chrono::duration<double , std::micro>(pub->end_time_ - pub->start_time_ ) / 2.0 ;
579- roundtrip -= pub->overhead_time_ ;
580-
581- // Discard samples were loan failed due to payload outages
582- // in that case the roundtrip will match the os scheduler quantum slice
583- if (roundtrip.count () > 0
584- && !(pub->data_loans_ && roundtrip.count () > 10000 ))
564+ if (pub->data_loans_ )
585565 {
586- pub->times_ .push_back (roundtrip);
587- ++pub->received_count_ ;
566+ // we have requested a single sample
567+ assert (infos.length () == 1 && data_seq.length () == 1 );
568+ // we have already released the former loan
569+ assert (pub->latency_data_in_ == nullptr );
570+ // check if the sample is valid
571+ if (!infos[0 ].valid_data )
572+ {
573+ // Avoid processing when the sample does not have data
574+ break ;
575+ }
576+ // reference the loaned data
577+ pub->latency_data_in_ = &data_seq[0 ];
578+ // retrieve the bounce time
579+ bounce_time = std::chrono::duration<uint32_t , std::nano>(pub->latency_data_in_ ->bounce );
588580 }
589581
590- // Reset seqnum from out data
582+ // Check if is the expected echo message
583+ uint32_t dyn_value_in {0 };
584+ uint32_t dyn_value_out {0 };
591585 if (pub->dynamic_types_ )
592586 {
593- (*pub->dynamic_data_out_ )->set_uint32_value (0 , 0 );
587+ (*pub->dynamic_data_in_ )->get_uint32_value (dyn_value_in, 0 );
588+ (*pub->dynamic_data_out_ )->get_uint32_value (dyn_value_out, 0 );
589+ }
590+
591+ if ((pub->dynamic_types_ && dyn_value_in != dyn_value_out)
592+ || (!pub->dynamic_types_ && pub->latency_data_in_ ->seqnum != pub->latency_data_out_ ->seqnum ))
593+ {
594+ EPROSIMA_LOG_INFO (LatencyTest, " Echo message received is not the expected one" );
594595 }
595596 else
596597 {
597- pub->latency_data_out_ ->seqnum = 0 ;
598+ // Factor of 2 below is to calculate the roundtrip divided by two. Note that nor the overhead does not
599+ // need to be halved, as we access the clock twice per round trip
600+ pub->end_time_ = std::chrono::steady_clock::now ();
601+ pub->end_time_ -= bounce_time;
602+ auto roundtrip = std::chrono::duration<double , std::micro>(pub->end_time_ - pub->start_time_ ) / 2.0 ;
603+ roundtrip -= pub->overhead_time_ ;
604+
605+ // Discard samples were loan failed due to payload outages
606+ // in that case the roundtrip will match the os scheduler quantum slice
607+ if (roundtrip.count () > 0
608+ && !(pub->data_loans_ && roundtrip.count () > 10000 ))
609+ {
610+ pub->times_ .push_back (roundtrip);
611+ ++pub->received_count_ ;
612+ }
613+
614+ // Reset seqnum from out data
615+ if (pub->dynamic_types_ )
616+ {
617+ (*pub->dynamic_data_out_ )->set_uint32_value (0 , 0 );
618+ }
619+ else
620+ {
621+ pub->latency_data_out_ ->seqnum = 0 ;
622+ }
623+ }
624+
625+ if (pub->data_loans_ )
626+ {
627+ pub->latency_data_in_ = nullptr ;
598628 }
629+
630+ ++pub->data_msg_count_ ;
631+ notify = pub->data_msg_count_ >= pub->subscribers_ ;
632+
633+ // Break the loop (i.e. exit the lock_guard scope)
634+ break ;
599635 }
600636
601- if (pub-> data_loans_ )
637+ if (notify )
602638 {
603- pub->latency_data_in_ = nullptr ;
639+ pub->data_msg_cv_ . notify_one () ;
604640 }
605641
606- ++pub->data_msg_count_ ;
607- notify = pub->data_msg_count_ >= pub->subscribers_ ;
608- }
609-
610- if (notify)
611- {
612- pub->data_msg_cv_ .notify_one ();
613- }
614-
615- // release the loan if any
616- if (pub->data_loans_
617- && RETCODE_OK != reader->return_loan (data_seq, infos))
618- {
619- EPROSIMA_LOG_ERROR (LatencyTest, " Problem returning loaned test data" );
642+ // release the loan if any
643+ if (pub->data_loans_
644+ && RETCODE_OK != reader->return_loan (data_seq, infos))
645+ {
646+ EPROSIMA_LOG_ERROR (LatencyTest, " Problem returning loaned test data" );
647+ }
620648 }
621649}
622650
0 commit comments