Skip to content

Commit d256d9b

Browse files
committed
Merge branch 'w-utter-cache-change-iter'
2 parents bdb8cda + f92b029 commit d256d9b

File tree

1 file changed

+222
-141
lines changed

1 file changed

+222
-141
lines changed

src/rtps/writer.rs

Lines changed: 222 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{
4242
entity::RTPSEntity,
4343
guid::{EntityId, GuidPrefix, GUID},
4444
locator::Locator,
45-
sequence_number::{FragmentNumber, SequenceNumber},
45+
sequence_number::{FragmentNumber, FragmentNumberRange, SequenceNumber},
4646
time::Timestamp,
4747
},
4848
};
@@ -699,145 +699,8 @@ impl Writer {
699699
}
700700
}
701701

702-
// All the messages are pushed to a vector first before sending them.
703-
// If this hinders performance when many datafrag messages need to be
704-
// sent, optimize.
705-
let mut messages_to_send: Vec<Message> = vec![];
706-
707-
// The EntityId of the destination
708-
let reader_entity_id =
709-
target_reader_opt.map_or(EntityId::UNKNOWN, |p| p.remote_reader_guid.entity_id);
710-
711-
let data_size = cc.data_value.payload_size();
712-
let fragmentation_needed = data_size > self.data_max_size_serialized;
713-
714-
if !fragmentation_needed {
715-
// We can send DATA
716-
let mut message_builder = MessageBuilder::new();
717-
718-
// If DataWriter sent us a source timestamp, then add that.
719-
// Timestamp has to go before Data to have effect on Data.
720-
if let Some(src_ts) = cc.write_options.source_timestamp() {
721-
message_builder = message_builder.ts_msg(self.endianness, Some(src_ts));
722-
}
723-
724-
if let Some(reader) = target_reader_opt {
725-
// Add info_destination
726-
message_builder =
727-
message_builder.dst_submessage(self.endianness, reader.remote_reader_guid.prefix);
728-
729-
// If the reader is pending GAPs on any sequence numbers, add a GAP
730-
if !reader.get_pending_gap().is_empty() {
731-
message_builder = message_builder.gap_msg(
732-
reader.get_pending_gap(),
733-
self.entity_id(),
734-
self.endianness,
735-
reader.remote_reader_guid,
736-
);
737-
}
738-
}
739-
740-
// Add the DATA submessage
741-
message_builder = message_builder.data_msg(
742-
cc,
743-
reader_entity_id,
744-
self.my_guid, // writer
745-
self.endianness,
746-
self.security_plugins.as_ref(),
747-
);
748-
749-
// Add HEARTBEAT if needed
750-
if send_also_heartbeat && !self.like_stateless {
751-
let final_flag = false; // false = request that readers acknowledge with ACKNACK.
752-
let liveliness_flag = false; // This is not a manual liveliness assertion (DDS API call), but side-effect of
753-
// writing new data.
754-
message_builder = message_builder.heartbeat_msg(
755-
self.entity_id(), // from Writer
756-
self.history_buffer.first_change_sequence_number(),
757-
self.history_buffer.last_change_sequence_number(),
758-
self.next_heartbeat_count(),
759-
self.endianness,
760-
reader_entity_id, // to Reader
761-
final_flag,
762-
liveliness_flag,
763-
);
764-
}
765-
766-
let data_message = message_builder.add_header_and_build(self.my_guid.prefix);
767-
768-
messages_to_send.push(data_message);
769-
} else {
770-
// fragmentation_needed: We need to send DATAFRAGs
771-
772-
// If sending to a single reader, add a GAP message with pending gaps if any
773-
if let Some(reader) = target_reader_opt {
774-
if !reader.get_pending_gap().is_empty() {
775-
let gap_msg = MessageBuilder::new()
776-
.dst_submessage(self.endianness, reader.remote_reader_guid.prefix)
777-
.gap_msg(
778-
reader.get_pending_gap(),
779-
self.entity_id(),
780-
self.endianness,
781-
reader.remote_reader_guid,
782-
)
783-
.add_header_and_build(self.my_guid.prefix);
784-
messages_to_send.push(gap_msg);
785-
}
786-
}
787-
788-
let (num_frags, fragment_size) = self.num_frags_and_frag_size(data_size);
789-
790-
for frag_num in
791-
FragmentNumber::range_inclusive(FragmentNumber::new(1), FragmentNumber::new(num_frags))
792-
{
793-
let mut message_builder = MessageBuilder::new(); // fresh builder
794-
795-
if let Some(src_ts) = cc.write_options.source_timestamp() {
796-
// Add timestamp
797-
message_builder = message_builder.ts_msg(self.endianness, Some(src_ts));
798-
}
799-
800-
if let Some(reader) = target_reader_opt {
801-
// Add info_destination
802-
message_builder =
803-
message_builder.dst_submessage(self.endianness, reader.remote_reader_guid.prefix);
804-
}
805-
806-
message_builder = message_builder.data_frag_msg(
807-
cc,
808-
reader_entity_id, // reader
809-
self.my_guid, // writer
810-
frag_num,
811-
fragment_size,
812-
data_size.try_into().unwrap(),
813-
self.endianness,
814-
self.security_plugins.as_ref(),
815-
);
816-
817-
let datafrag_msg = message_builder.add_header_and_build(self.my_guid.prefix);
818-
messages_to_send.push(datafrag_msg);
819-
} // end for
820-
821-
// Add HEARTBEAT message if needed
822-
if send_also_heartbeat && !self.like_stateless {
823-
let final_flag = false; // false = request that readers acknowledge with ACKNACK.
824-
let liveliness_flag = false; // This is not a manual liveliness assertion (DDS API call), but side-effect of
825-
// writing new data.
826-
let hb_msg = MessageBuilder::new()
827-
.heartbeat_msg(
828-
self.entity_id(), // from Writer
829-
self.history_buffer.first_change_sequence_number(),
830-
self.history_buffer.last_change_sequence_number(),
831-
self.next_heartbeat_count(),
832-
self.endianness,
833-
reader_entity_id, // to Reader
834-
final_flag,
835-
liveliness_flag,
836-
)
837-
.add_header_and_build(self.my_guid.prefix);
838-
messages_to_send.push(hb_msg);
839-
}
840-
}
702+
let messages_to_send = FragmentationIter::new(self, cc, target_reader_opt, send_also_heartbeat);
703+
let fragmentation_needed = messages_to_send.fragmentation_needed();
841704

842705
// Send the messages, either to all readers or just one
843706
for msg in messages_to_send {
@@ -856,7 +719,6 @@ impl Writer {
856719
}
857720
}
858721
}
859-
860722
// The return value tells if the data had to be fragmented
861723
fragmentation_needed
862724
}
@@ -1739,6 +1601,225 @@ impl HasQoSPolicy for Writer {
17391601
}
17401602
}
17411603

1604+
struct FragmentationIter<'a> {
1605+
writer: &'a Writer,
1606+
cache_change: &'a CacheChange,
1607+
target_reader_opt: Option<&'a RtpsReaderProxy>,
1608+
reader_entity_id: EntityId,
1609+
send_heartbeat: bool,
1610+
finished: bool,
1611+
state: FragmentationIterState,
1612+
}
1613+
1614+
impl<'a> FragmentationIter<'a> {
1615+
fn new(
1616+
writer: &'a Writer,
1617+
cache_change: &'a CacheChange,
1618+
target_reader_opt: Option<&'a RtpsReaderProxy>,
1619+
send_heartbeat: bool,
1620+
) -> Self {
1621+
// The EntityId of the destination
1622+
let reader_entity_id =
1623+
target_reader_opt.map_or(EntityId::UNKNOWN, |p| p.remote_reader_guid.entity_id);
1624+
1625+
let data_size = cache_change.data_value.payload_size();
1626+
let fragmentation_needed = data_size > writer.data_max_size_serialized;
1627+
1628+
let state = if fragmentation_needed {
1629+
FragmentationIterState::Fragmented(FragmentedState::TargetReader, data_size)
1630+
} else {
1631+
FragmentationIterState::Unfragmented
1632+
};
1633+
1634+
Self {
1635+
writer,
1636+
cache_change,
1637+
target_reader_opt,
1638+
state,
1639+
reader_entity_id,
1640+
finished: false,
1641+
send_heartbeat,
1642+
}
1643+
}
1644+
1645+
fn fragmentation_needed(&self) -> bool {
1646+
matches!(self.state, FragmentationIterState::Fragmented(..))
1647+
}
1648+
}
1649+
1650+
enum FragmentationIterState {
1651+
Fragmented(FragmentedState, usize),
1652+
Unfragmented,
1653+
}
1654+
1655+
enum FragmentedState {
1656+
TargetReader,
1657+
Fragments(FragmentNumberRange, u16),
1658+
Heartbeat,
1659+
}
1660+
1661+
impl<'a> Iterator for FragmentationIter<'a> {
1662+
type Item = Message;
1663+
fn next(&mut self) -> Option<Self::Item> {
1664+
if self.finished {
1665+
return None;
1666+
}
1667+
1668+
let cc = self.cache_change;
1669+
let writer = self.writer;
1670+
let target_reader_opt = self.target_reader_opt;
1671+
let reader_entity_id = self.reader_entity_id;
1672+
let send_heartbeat = self.send_heartbeat;
1673+
1674+
match &mut self.state {
1675+
FragmentationIterState::Fragmented(state, data_size) => {
1676+
// fragmentation_needed: We need to send DATAFRAGs
1677+
match state {
1678+
FragmentedState::TargetReader => {
1679+
let (num_frags, fragment_size) = writer.num_frags_and_frag_size(*data_size);
1680+
*state = FragmentedState::Fragments(
1681+
FragmentNumber::range_inclusive(
1682+
FragmentNumber::new(1),
1683+
FragmentNumber::new(num_frags),
1684+
),
1685+
fragment_size,
1686+
);
1687+
1688+
// If sending to a single reader, add a GAP message with pending gaps if any
1689+
if let Some(reader) = target_reader_opt {
1690+
if !reader.get_pending_gap().is_empty() {
1691+
let gap_msg = MessageBuilder::new()
1692+
.dst_submessage(writer.endianness, reader.remote_reader_guid.prefix)
1693+
.gap_msg(
1694+
reader.get_pending_gap(),
1695+
writer.entity_id(),
1696+
writer.endianness,
1697+
reader.remote_reader_guid,
1698+
)
1699+
.add_header_and_build(writer.my_guid.prefix);
1700+
return Some(gap_msg);
1701+
}
1702+
}
1703+
self.next()
1704+
}
1705+
FragmentedState::Fragments(fragments, fragment_size) => {
1706+
if let Some(frag_num) = fragments.next() {
1707+
let mut message_builder = MessageBuilder::new(); // fresh builder
1708+
1709+
if let Some(src_ts) = cc.write_options.source_timestamp() {
1710+
// Add timestamp
1711+
message_builder = message_builder.ts_msg(writer.endianness, Some(src_ts));
1712+
}
1713+
1714+
if let Some(reader) = target_reader_opt {
1715+
// Add info_destination
1716+
message_builder = message_builder
1717+
.dst_submessage(writer.endianness, reader.remote_reader_guid.prefix);
1718+
}
1719+
1720+
message_builder = message_builder.data_frag_msg(
1721+
cc,
1722+
reader_entity_id, // reader
1723+
writer.my_guid,
1724+
frag_num,
1725+
*fragment_size,
1726+
(*data_size).try_into().unwrap(),
1727+
writer.endianness,
1728+
writer.security_plugins.as_ref(),
1729+
);
1730+
1731+
let datafrag_msg = message_builder.add_header_and_build(writer.my_guid.prefix);
1732+
return Some(datafrag_msg);
1733+
}
1734+
*state = FragmentedState::Heartbeat;
1735+
self.next()
1736+
}
1737+
FragmentedState::Heartbeat => {
1738+
self.finished = true;
1739+
1740+
// Add HEARTBEAT message if needed
1741+
if send_heartbeat && !writer.like_stateless {
1742+
let final_flag = false; // false = request that readers acknowledge with ACKNACK.
1743+
let liveliness_flag = false; // This is not a manual liveliness assertion (DDS API call), but side-effect of
1744+
// writing new data.
1745+
let hb_msg = MessageBuilder::new()
1746+
.heartbeat_msg(
1747+
writer.entity_id(), // from Writer
1748+
writer.history_buffer.first_change_sequence_number(),
1749+
writer.history_buffer.last_change_sequence_number(),
1750+
writer.next_heartbeat_count(),
1751+
writer.endianness,
1752+
reader_entity_id, // to Reader
1753+
final_flag,
1754+
liveliness_flag,
1755+
)
1756+
.add_header_and_build(writer.my_guid.prefix);
1757+
return Some(hb_msg);
1758+
}
1759+
None
1760+
}
1761+
}
1762+
}
1763+
FragmentationIterState::Unfragmented => {
1764+
// We can send DATA
1765+
let mut message_builder = MessageBuilder::new();
1766+
1767+
// If DataWriter sent us a source timestamp, then add that.
1768+
// Timestamp has to go before Data to have effect on Data.
1769+
if let Some(src_ts) = cc.write_options.source_timestamp() {
1770+
message_builder = message_builder.ts_msg(writer.endianness, Some(src_ts));
1771+
}
1772+
1773+
if let Some(reader) = target_reader_opt {
1774+
// Add info_destination
1775+
message_builder =
1776+
message_builder.dst_submessage(writer.endianness, reader.remote_reader_guid.prefix);
1777+
1778+
// If the reader is pending GAPs on any sequence numbers, add a GAP
1779+
if !reader.get_pending_gap().is_empty() {
1780+
message_builder = message_builder.gap_msg(
1781+
reader.get_pending_gap(),
1782+
writer.entity_id(),
1783+
writer.endianness,
1784+
reader.remote_reader_guid,
1785+
);
1786+
}
1787+
}
1788+
1789+
// Add the DATA submessage
1790+
message_builder = message_builder.data_msg(
1791+
cc,
1792+
reader_entity_id,
1793+
writer.my_guid,
1794+
writer.endianness,
1795+
writer.security_plugins.as_ref(),
1796+
);
1797+
1798+
// Add HEARTBEAT if needed
1799+
if send_heartbeat && !writer.like_stateless {
1800+
let final_flag = false; // false = request that readers acknowledge with ACKNACK.
1801+
let liveliness_flag = false; // This is not a manual liveliness assertion (DDS API call), but side-effect of
1802+
// writing new data.
1803+
message_builder = message_builder.heartbeat_msg(
1804+
writer.entity_id(),
1805+
writer.history_buffer.first_change_sequence_number(),
1806+
writer.history_buffer.last_change_sequence_number(),
1807+
writer.next_heartbeat_count(),
1808+
writer.endianness,
1809+
reader_entity_id, // to Reader
1810+
final_flag,
1811+
liveliness_flag,
1812+
);
1813+
}
1814+
1815+
let data_message = message_builder.add_header_and_build(writer.my_guid.prefix);
1816+
self.finished = true;
1817+
Some(data_message)
1818+
}
1819+
}
1820+
}
1821+
}
1822+
17421823
// -------------------------------------------------------------------------------------
17431824
// -------------------------------------------------------------------------------------
17441825
// -------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)