Skip to content

Commit fc8d170

Browse files
mergify[bot]ma30002000MiguelCompanyJesusPoderoso
authored
Improve resilience against clock adjustments (#5018) (#5189)
* Improve resilience against clock adjustments (#5018) * Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl) Signed-off-by: Matthias Schneider <[email protected]> * Use steady_clock instead for system_clock for calculating timeouts Signed-off-by: Matthias Schneider <[email protected]> * Use correct clock's duration for duration_cast Signed-off-by: Matthias Schneider <[email protected]> * Use Time_t::now() Signed-off-by: Matthias Schneider <[email protected]> * Fix build. Signed-off-by: Miguel Company <[email protected]> * Refs #21314. Refactor on DataWriterImpl. Signed-off-by: Miguel Company <[email protected]> * Refs #21314. Refactor on DataReaderImpl. Signed-off-by: Miguel Company <[email protected]> * Refs #21314. Refactor on StatefulWriter. Signed-off-by: Miguel Company <[email protected]> * Refs #21314. Protect current_time_since_unix_epoch against clock adjustments. Signed-off-by: Miguel Company <[email protected]> * Revert "Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl)" This reverts commit d69eb91. --------- Signed-off-by: Matthias Schneider <[email protected]> Signed-off-by: Miguel Company <[email protected]> Co-authored-by: Miguel Company <[email protected]> (cherry picked from commit ccc690c) * Refs #21314: Fix conflicts and build issues Signed-off-by: JesusPoderoso <[email protected]> * Refs #21314: Apply rev suggestions Signed-off-by: JesusPoderoso <[email protected]> * Refs #21314: Please uncrustify Signed-off-by: JesusPoderoso <[email protected]> * Keep comparison Signed-off-by: Miguel Company <[email protected]> * Use rtps data types Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Matthias Schneider <[email protected]> Signed-off-by: Miguel Company <[email protected]> Signed-off-by: JesusPoderoso <[email protected]> Co-authored-by: Matthias Schneider <[email protected]> Co-authored-by: Miguel Company <[email protected]> Co-authored-by: JesusPoderoso <[email protected]>
1 parent 3e59dc3 commit fc8d170

File tree

7 files changed

+61
-96
lines changed

7 files changed

+61
-96
lines changed

include/fastdds/rtps/writer/StatefulWriter.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,8 @@ class StatefulWriter : public RTPSWriter
515515
bool disable_heartbeat_piggyback_;
516516
//! True to disable positive ACKs
517517
bool disable_positive_acks_;
518-
//! Keep duration for disable positive ACKs QoS, in microseconds
519-
std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
518+
//! Keep duration for disable positive ACKs QoS
519+
Duration_t keep_duration_;
520520
//! Last acknowledged cache change (only used if using disable positive ACKs QoS)
521521
SequenceNumber_t last_sequence_number_;
522522
//! Biggest sequence number removed from history

src/cpp/fastdds/domain/DomainParticipantImpl.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,14 +1221,7 @@ bool DomainParticipantImpl::contains_entity(
12211221
ReturnCode_t DomainParticipantImpl::get_current_time(
12221222
fastrtps::Time_t& current_time) const
12231223
{
1224-
auto now = std::chrono::system_clock::now();
1225-
auto duration = now.time_since_epoch();
1226-
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
1227-
duration -= seconds;
1228-
auto nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(duration);
1229-
1230-
current_time.seconds = static_cast<int32_t>(seconds.count());
1231-
current_time.nanosec = static_cast<uint32_t>(nanos.count());
1224+
fastrtps::Time_t::now(current_time);
12321225

12331226
return ReturnCode_t::RETCODE_OK;
12341227
}

src/cpp/fastdds/domain/DomainParticipantImpl.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,10 @@ class DomainParticipantImpl
111111
DomainParticipantListener* listener,
112112
const std::chrono::seconds timeout = std::chrono::seconds::max())
113113
{
114-
auto time_out = std::chrono::time_point<std::chrono::system_clock>::max();
114+
auto time_out = std::chrono::time_point<std::chrono::steady_clock>::max();
115115
if (timeout < std::chrono::seconds::max())
116116
{
117-
auto now = std::chrono::system_clock::now();
117+
auto now = std::chrono::steady_clock::now();
118118
time_out = now + timeout;
119119
}
120120

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <fastdds/publisher/filtering/DataWriterFilteredChangePool.hpp>
3535
#include <fastdds/publisher/PublisherImpl.hpp>
3636
#include <fastdds/rtps/builtin/liveliness/WLP.h>
37+
#include <fastdds/rtps/common/Time_t.h>
3738
#include <fastdds/rtps/participant/RTPSParticipant.h>
3839
#include <fastdds/rtps/resources/ResourceEvent.h>
3940
#include <fastdds/rtps/resources/TimedEvent.h>
@@ -1027,7 +1028,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10271028
{
10281029
if (!history_.set_next_deadline(
10291030
handle,
1030-
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
1031+
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
10311032
{
10321033
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
10331034
}
@@ -1499,7 +1500,7 @@ bool DataWriterImpl::deadline_missed()
14991500

15001501
if (!history_.set_next_deadline(
15011502
timer_owner_,
1502-
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
1503+
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
15031504
{
15041505
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
15051506
return false;
@@ -1549,39 +1550,24 @@ bool DataWriterImpl::lifespan_expired()
15491550
{
15501551
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
15511552

1553+
fastrtps::rtps::Time_t current_ts;
1554+
fastrtps::rtps::Time_t::now(current_ts);
1555+
15521556
CacheChange_t* earliest_change;
15531557
while (history_.get_earliest_change(&earliest_change))
15541558
{
1555-
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
1556-
auto now = system_clock::now();
1559+
fastrtps::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;
15571560

15581561
// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
1559-
if (now - source_timestamp < lifespan_duration_us_)
1562+
if (current_ts < expiration_ts)
15601563
{
1561-
auto interval = source_timestamp - now + lifespan_duration_us_;
1562-
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
1564+
fastrtps::rtps::Time_t interval = expiration_ts - current_ts;
1565+
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
15631566
return true;
15641567
}
15651568

15661569
// The earliest change has expired
15671570
history_.remove_change_pub(earliest_change);
1568-
1569-
// Set the timer for the next change if there is one
1570-
if (!history_.get_earliest_change(&earliest_change))
1571-
{
1572-
return false;
1573-
}
1574-
1575-
// Calculate when the next change is due to expire and restart
1576-
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
1577-
now = system_clock::now();
1578-
auto interval = source_timestamp - now + lifespan_duration_us_;
1579-
1580-
if (interval.count() > 0)
1581-
{
1582-
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
1583-
return true;
1584-
}
15851571
}
15861572

15871573
return false;

src/cpp/fastdds/subscriber/DataReaderImpl.cpp

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <fastdds/dds/topic/Topic.hpp>
3636
#include <fastdds/dds/topic/TypeSupport.hpp>
3737
#include <fastdds/domain/DomainParticipantImpl.hpp>
38+
#include <fastdds/rtps/common/Time_t.h>
3839
#include <fastdds/rtps/participant/RTPSParticipant.h>
3940
#include <fastdds/rtps/reader/RTPSReader.h>
4041
#include <fastdds/rtps/resources/ResourceEvent.h>
@@ -1103,7 +1104,7 @@ bool DataReaderImpl::on_new_cache_change_added(
11031104
{
11041105
if (!history_.set_next_deadline(
11051106
change->instanceHandle,
1106-
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
1107+
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
11071108
{
11081109
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
11091110
}
@@ -1122,12 +1123,13 @@ bool DataReaderImpl::on_new_cache_change_added(
11221123
return true;
11231124
}
11241125

1125-
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
1126-
auto now = system_clock::now();
1126+
fastrtps::rtps::Time_t expiration_ts = change->sourceTimestamp + qos_.lifespan().duration;
1127+
fastrtps::rtps::Time_t current_ts;
1128+
fastrtps::rtps::Time_t::now(current_ts);
11271129

11281130
// The new change could have expired if it arrived too late
11291131
// If so, remove it from the history and return false to avoid notifying the listener
1130-
if (now - source_timestamp >= lifespan_duration_us_)
1132+
if (expiration_ts <= current_ts)
11311133
{
11321134
history_.remove_change_sub(new_change);
11331135
return false;
@@ -1149,11 +1151,10 @@ bool DataReaderImpl::on_new_cache_change_added(
11491151
EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved");
11501152
}
11511153

1152-
auto interval = source_timestamp - now + duration_cast<nanoseconds>(lifespan_duration_us_);
1153-
11541154
// Update and restart the timer
11551155
// If the timer is already running this will not have any effect
1156-
lifespan_timer_->update_interval_millisec(interval.count() * 1e-6);
1156+
fastrtps::rtps::Time_t interval = expiration_ts - current_ts;
1157+
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
11571158
lifespan_timer_->restart_timer();
11581159
return true;
11591160
}
@@ -1244,7 +1245,7 @@ bool DataReaderImpl::deadline_missed()
12441245

12451246
if (!history_.set_next_deadline(
12461247
timer_owner_,
1247-
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_), true))
1248+
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_), true))
12481249
{
12491250
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
12501251
return false;
@@ -1275,41 +1276,26 @@ bool DataReaderImpl::lifespan_expired()
12751276
{
12761277
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());
12771278

1279+
fastrtps::rtps::Time_t current_ts;
1280+
fastrtps::rtps::Time_t::now(current_ts);
1281+
12781282
CacheChange_t* earliest_change;
12791283
while (history_.get_earliest_change(&earliest_change))
12801284
{
1281-
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
1282-
auto now = system_clock::now();
1285+
fastrtps::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;
12831286

12841287
// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
1285-
if (now - source_timestamp < lifespan_duration_us_)
1288+
if (current_ts < expiration_ts)
12861289
{
1287-
auto interval = source_timestamp - now + lifespan_duration_us_;
1288-
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
1290+
fastrtps::rtps::Time_t interval = expiration_ts - current_ts;
1291+
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
12891292
return true;
12901293
}
12911294

12921295
// The earliest change has expired
12931296
history_.remove_change_sub(earliest_change);
12941297

12951298
try_notify_read_conditions();
1296-
1297-
// Set the timer for the next change if there is one
1298-
if (!history_.get_earliest_change(&earliest_change))
1299-
{
1300-
return false;
1301-
}
1302-
1303-
// Calculate when the next change is due to expire and restart
1304-
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
1305-
now = system_clock::now();
1306-
auto interval = source_timestamp - now + lifespan_duration_us_;
1307-
1308-
if (interval.count() > 0)
1309-
{
1310-
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
1311-
return true;
1312-
}
13131299
}
13141300

13151301
return false;

src/cpp/rtps/writer/StatefulWriter.cpp

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ StatefulWriter::StatefulWriter(
183183
, may_remove_change_(0)
184184
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
185185
, disable_positive_acks_(att.disable_positive_acks)
186-
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
186+
, keep_duration_(att.keep_duration)
187187
, last_sequence_number_()
188188
, biggest_removed_sequence_number_()
189189
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
@@ -218,7 +218,7 @@ StatefulWriter::StatefulWriter(
218218
, may_remove_change_(0)
219219
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
220220
, disable_positive_acks_(att.disable_positive_acks)
221-
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
221+
, keep_duration_(att.keep_duration)
222222
, last_sequence_number_()
223223
, biggest_removed_sequence_number_()
224224
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
@@ -254,7 +254,7 @@ StatefulWriter::StatefulWriter(
254254
, may_remove_change_(0)
255255
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
256256
, disable_positive_acks_(att.disable_positive_acks)
257-
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
257+
, keep_duration_(att.keep_duration)
258258
, last_sequence_number_()
259259
, biggest_removed_sequence_number_()
260260
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
@@ -432,12 +432,13 @@ void StatefulWriter::unsent_change_added_to_history(
432432

433433
if (disable_positive_acks_)
434434
{
435-
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
436-
auto now = system_clock::now();
437-
auto interval = source_timestamp - now + keep_duration_us_;
438-
assert(interval.count() >= 0);
435+
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
436+
Time_t current_ts;
437+
Time_t::now(current_ts);
438+
assert(expiration_ts >= current_ts);
439+
auto interval = (expiration_ts - current_ts).to_duration_t();
439440

440-
ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
441+
ack_event_->update_interval(interval);
441442
ack_event_->restart_timer(max_blocking_time);
442443
}
443444

@@ -953,12 +954,13 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
953954
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
954955
{
955956
// Restart ack_timer
956-
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
957-
auto now = system_clock::now();
958-
auto interval = source_timestamp - now + keep_duration_us_;
959-
assert(interval.count() >= 0);
957+
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
958+
Time_t current_ts;
959+
Time_t::now(current_ts);
960+
assert(expiration_ts >= current_ts);
961+
auto interval = (expiration_ts - current_ts).to_duration_t();
960962

961-
ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
963+
ack_event_->update_interval(interval);
962964
ack_event_->restart_timer(max_blocking_time);
963965
}
964966
}
@@ -1675,13 +1677,9 @@ void StatefulWriter::updatePositiveAcks(
16751677
const WriterAttributes& att)
16761678
{
16771679
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1678-
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
1679-
{
1680-
// Implicit conversion to microseconds
1681-
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
1682-
}
1680+
keep_duration_ = att.keep_duration;
16831681
// Restart ack timer with new duration
1684-
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
1682+
ack_event_->update_interval(keep_duration_);
16851683
ack_event_->restart_timer();
16861684
}
16871685

@@ -2094,14 +2092,16 @@ bool StatefulWriter::ack_timer_expired()
20942092
// The timer has expired so the earliest non-acked change must be marked as acknowledged
20952093
// This will be done in the first while iteration, as we start with a negative interval
20962094

2097-
auto interval = -keep_duration_us_;
2095+
Time_t expiration_ts;
2096+
Time_t current_ts;
2097+
Time_t::now(current_ts);
20982098

20992099
// On the other hand, we've seen in the tests that if samples are sent very quickly with little
21002100
// time between consecutive samples, the timer interval could end up being negative
21012101
// In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while
21022102
// loop
21032103

2104-
while (interval.count() < 0)
2104+
do
21052105
{
21062106
bool acks_flag = false;
21072107
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
@@ -2141,13 +2141,13 @@ bool StatefulWriter::ack_timer_expired()
21412141
return false;
21422142
}
21432143

2144-
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
2145-
auto now = system_clock::now();
2146-
interval = source_timestamp - now + keep_duration_us_;
2144+
Time_t::now(current_ts);
2145+
expiration_ts = change->sourceTimestamp + keep_duration_;
21472146
}
2148-
assert(interval.count() >= 0);
2147+
while (expiration_ts < current_ts);
21492148

2150-
ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
2149+
auto interval = (expiration_ts - current_ts).to_duration_t();
2150+
ack_event_->update_interval(interval);
21512151
return true;
21522152
}
21532153

src/cpp/utils/SystemInfo.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ bool SystemInfo::wait_for_file_closure(
174174
const std::string& filename,
175175
const std::chrono::seconds timeout)
176176
{
177-
auto start = std::chrono::system_clock::now();
177+
auto start = std::chrono::steady_clock::now();
178178

179179
#ifdef _MSC_VER
180180
std::ofstream os;
@@ -184,7 +184,7 @@ bool SystemInfo::wait_for_file_closure(
184184
os.open(filename, std::ios::out | std::ios::app, _SH_DENYWR);
185185
if (!os.is_open()
186186
// If the file is lock-opened in an external editor do not hang
187-
&& (std::chrono::system_clock::now() - start) < timeout )
187+
&& (std::chrono::steady_clock::now() - start) < timeout )
188188
{
189189
std::this_thread::yield();
190190
}
@@ -199,7 +199,7 @@ bool SystemInfo::wait_for_file_closure(
199199

200200
while (flock(fd, LOCK_EX | LOCK_NB)
201201
// If the file is lock-opened in an external editor do not hang
202-
&& (std::chrono::system_clock::now() - start) < timeout )
202+
&& (std::chrono::steady_clock::now() - start) < timeout )
203203
{
204204
std::this_thread::yield();
205205
}
@@ -214,7 +214,7 @@ bool SystemInfo::wait_for_file_closure(
214214
(void)filename;
215215
#endif // ifdef _MSC_VER
216216

217-
return std::chrono::system_clock::now() - start < timeout;
217+
return std::chrono::steady_clock::now() - start < timeout;
218218
}
219219

220220
ReturnCode_t SystemInfo::set_environment_file()

0 commit comments

Comments
 (0)