@@ -485,10 +485,13 @@ void LatencyTestPublisher::CommandReaderListener::on_data_available(
485485 TestCommandType command;
486486 SampleInfo info;
487487
488- if (reader->take_next_sample (
489- &command, &info) == RETCODE_OK
490- && info.valid_data )
488+ while (reader->take_next_sample (&command, &info) == RETCODE_OK)
491489 {
490+ if (!info.valid_data )
491+ {
492+ continue ;
493+ }
494+
492495 if (command.m_command == BEGIN
493496 || command.m_command == END )
494497 {
@@ -498,10 +501,6 @@ void LatencyTestPublisher::CommandReaderListener::on_data_available(
498501 latency_publisher_->command_msg_cv_ .notify_one ();
499502 }
500503 }
501- else
502- {
503- EPROSIMA_LOG_INFO (LatencyTest, " Problem reading command message" );
504- }
505504}
506505
507506void LatencyTestPublisher::LatencyDataReaderListener::on_data_available (
@@ -513,110 +512,132 @@ void LatencyTestPublisher::LatencyDataReaderListener::on_data_available(
513512 LoanableSequence<LatencyType> data_seq;
514513 std::chrono::duration<uint32_t , std::nano> bounce_time (0 );
515514
516- if (pub->data_loans_ )
515+ ReturnCode_t ret_code = RETCODE_OK;
516+ while (RETCODE_OK == ret_code)
517517 {
518- if (RETCODE_OK != reader-> take (data_seq, infos, 1 ) )
518+ if (pub-> data_loans_ )
519519 {
520- EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed loaned test data" );
521- return ;
520+ ret_code = reader->take (data_seq, infos, 1 );
521+ if (RETCODE_OK != ret_code)
522+ {
523+ if (RETCODE_NO_DATA != ret_code)
524+ {
525+ EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed loaned test data" );
526+ }
527+ return ;
528+ }
522529 }
523- }
524- else
525- {
526- SampleInfo info;
527- void * data = pub->dynamic_types_ ?
528- (void *)pub->dynamic_data_in_ :
529- (void *)pub->latency_data_in_ ;
530-
531- // Retrieved echoed data
532- if (reader->take_next_sample (
533- data, &info) != RETCODE_OK
534- || !info.valid_data )
530+ else
535531 {
536- EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed test data" );
537- return ;
538- }
539- }
540-
541- // Atomic managemente of the sample
542- bool notify = false ;
543- {
544- std::lock_guard<std::mutex> lock (pub->mutex_ );
532+ SampleInfo info;
533+ void * data = pub->dynamic_types_ ?
534+ (void *)pub->dynamic_data_in_ :
535+ (void *)pub->latency_data_in_ ;
536+
537+ // Retrieved echoed data
538+ ret_code = reader->take_next_sample (data, &info);
539+ if (RETCODE_OK != ret_code)
540+ {
541+ if (RETCODE_NO_DATA != ret_code)
542+ {
543+ EPROSIMA_LOG_ERROR (LatencyTest, " Problem reading Subscriber echoed test data" );
544+ }
545+ return ;
546+ }
545547
546- if (pub->data_loans_ )
547- {
548- // we have requested a single sample
549- assert (infos.length () == 1 && data_seq.length () == 1 );
550- // we have already released the former loan
551- assert (pub->latency_data_in_ == nullptr );
552- // reference the loaned data
553- pub->latency_data_in_ = &data_seq[0 ];
554- // retrieve the bounce time
555- bounce_time = std::chrono::duration<uint32_t , std::nano>(pub->latency_data_in_ ->bounce );
548+ if (!info.valid_data )
549+ {
550+ // No valid data, continue to next sample
551+ continue ;
552+ }
556553 }
557554
558- // Check if is the expected echo message
559- uint32_t dyn_value_in {0 };
560- uint32_t dyn_value_out {0 };
561- if (pub->dynamic_types_ )
555+ // Atomic management of the sample
556+ bool notify = false ;
562557 {
563- (*pub->dynamic_data_in_ )->get_uint32_value (dyn_value_in, 0 );
564- (*pub->dynamic_data_out_ )->get_uint32_value (dyn_value_out, 0 );
565- }
558+ std::lock_guard<std::mutex> lock (pub->mutex_ );
566559
567- if ((pub->dynamic_types_ && dyn_value_in != dyn_value_out)
568- || (!pub->dynamic_types_ && pub->latency_data_in_ ->seqnum != pub->latency_data_out_ ->seqnum ))
569- {
570- EPROSIMA_LOG_INFO (LatencyTest, " Echo message received is not the expected one" );
571- }
572- else
573- {
574- // Factor of 2 below is to calculate the roundtrip divided by two. Note that nor the overhead does not
575- // need to be halved, as we access the clock twice per round trip
576- pub->end_time_ = std::chrono::steady_clock::now ();
577- pub->end_time_ -= bounce_time;
578- auto roundtrip = std::chrono::duration<double , std::micro>(pub->end_time_ - pub->start_time_ ) / 2.0 ;
579- roundtrip -= pub->overhead_time_ ;
580-
581- // Discard samples were loan failed due to payload outages
582- // in that case the roundtrip will match the os scheduler quantum slice
583- if (roundtrip.count () > 0
584- && !(pub->data_loans_ && roundtrip.count () > 10000 ))
560+ if (pub->data_loans_ )
585561 {
586- pub->times_ .push_back (roundtrip);
587- ++pub->received_count_ ;
562+ // we have requested a single sample
563+ assert (infos.length () == 1 && data_seq.length () == 1 );
564+ // we have already released the former loan
565+ assert (pub->latency_data_in_ == nullptr );
566+ // check if the sample is valid
567+ if (!infos[0 ].valid_data )
568+ {
569+ reader->return_loan (data_seq, infos);
570+ continue ;
571+ }
572+ // reference the loaned data
573+ pub->latency_data_in_ = &data_seq[0 ];
574+ // retrieve the bounce time
575+ bounce_time = std::chrono::duration<uint32_t , std::nano>(pub->latency_data_in_ ->bounce );
588576 }
589577
590- // Reset seqnum from out data
578+ // Check if is the expected echo message
579+ uint32_t dyn_value_in {0 };
580+ uint32_t dyn_value_out {0 };
591581 if (pub->dynamic_types_ )
592582 {
593- (*pub->dynamic_data_out_ )->set_uint32_value (0 , 0 );
583+ (*pub->dynamic_data_in_ )->get_uint32_value (dyn_value_in, 0 );
584+ (*pub->dynamic_data_out_ )->get_uint32_value (dyn_value_out, 0 );
585+ }
586+
587+ if ((pub->dynamic_types_ && dyn_value_in != dyn_value_out)
588+ || (!pub->dynamic_types_ && pub->latency_data_in_ ->seqnum != pub->latency_data_out_ ->seqnum ))
589+ {
590+ EPROSIMA_LOG_INFO (LatencyTest, " Echo message received is not the expected one" );
594591 }
595592 else
596593 {
597- pub->latency_data_out_ ->seqnum = 0 ;
594+ // Factor of 2 below is to calculate the roundtrip divided by two. Note that nor the overhead does not
595+ // need to be halved, as we access the clock twice per round trip
596+ pub->end_time_ = std::chrono::steady_clock::now ();
597+ pub->end_time_ -= bounce_time;
598+ auto roundtrip = std::chrono::duration<double , std::micro>(pub->end_time_ - pub->start_time_ ) / 2.0 ;
599+ roundtrip -= pub->overhead_time_ ;
600+
601+ // Discard samples were loan failed due to payload outages
602+ // in that case the roundtrip will match the os scheduler quantum slice
603+ if (roundtrip.count () > 0
604+ && !(pub->data_loans_ && roundtrip.count () > 10000 ))
605+ {
606+ pub->times_ .push_back (roundtrip);
607+ ++pub->received_count_ ;
608+ }
609+
610+ // Reset seqnum from out data
611+ if (pub->dynamic_types_ )
612+ {
613+ (*pub->dynamic_data_out_ )->set_uint32_value (0 , 0 );
614+ }
615+ else
616+ {
617+ pub->latency_data_out_ ->seqnum = 0 ;
618+ }
619+ }
620+
621+ if (pub->data_loans_ )
622+ {
623+ pub->latency_data_in_ = nullptr ;
598624 }
625+
626+ ++pub->data_msg_count_ ;
627+ notify = pub->data_msg_count_ >= pub->subscribers_ ;
599628 }
600629
601- if (pub-> data_loans_ )
630+ if (notify )
602631 {
603- pub->latency_data_in_ = nullptr ;
632+ pub->data_msg_cv_ . notify_one () ;
604633 }
605634
606- ++pub->data_msg_count_ ;
607- notify = pub->data_msg_count_ >= pub->subscribers_ ;
608- }
609-
610- if (notify)
611- {
612- pub->data_msg_cv_ .notify_one ();
613- }
614-
615- // release the loan if any
616- if (pub->data_loans_
617- && RETCODE_OK != reader->return_loan (data_seq, infos))
618- {
619- EPROSIMA_LOG_ERROR (LatencyTest, " Problem returning loaned test data" );
635+ // release the loan if any
636+ if (pub->data_loans_
637+ && RETCODE_OK != reader->return_loan (data_seq, infos))
638+ {
639+ EPROSIMA_LOG_ERROR (LatencyTest, " Problem returning loaned test data" );
640+ }
620641 }
621642}
622643
0 commit comments