Skip to content

Commit de7e952

Browse files
Process key-only payloads (#5897)
* Refs #22901. Add `is_serialized_key` to `SerializedPayload_t`. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Fill `is_serialized_key` in `MessageReceiver`. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Handle `is_serialized_key` in `SerializedPayload` methods. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Never filter out key-only payloads Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Handle `is_serialized_key` on CacheChange Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Handle `is_serialized_key` on payload pools. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Handle `is_serialized_key` in DDSFilterExpression. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Remove TODO. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Add BB test. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Handle key computation failure in DataReaderHistory. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Fix encapsulation header in test. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Fix comment in test. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Use CV in test. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Extend test with fragmented data. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Extend fragment management unit test. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Add unit test in DDSSQLFilterValueTests. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Bonus point: fix change kind in data frag processing. Signed-off-by: Miguel Company <[email protected]> * Refs #22901. Add feature to versions.md. Signed-off-by: Miguel Company <[email protected]> * Apply suggestion Co-authored-by: Mario Domínguez López <[email protected]> Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Miguel Company <[email protected]> Co-authored-by: Mario Domínguez López <[email protected]>
1 parent 1b33309 commit de7e952

File tree

15 files changed

+354
-77
lines changed

15 files changed

+354
-77
lines changed

include/fastdds/rtps/common/CacheChange.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ struct FASTDDS_EXPORTED_API CacheChange_t
176176

177177
// Copy certain values from serializedPayload
178178
serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
179+
serializedPayload.is_serialized_key = ch_ptr->serializedPayload.is_serialized_key;
179180

180181
// Copy fragment size and calculate fragment count
181182
setFragmentSize(ch_ptr->fragment_size_, false);
@@ -287,6 +288,12 @@ struct FASTDDS_EXPORTED_API CacheChange_t
287288
uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
288289
uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
289290

291+
// Validate payload types
292+
if (serializedPayload.is_serialized_key != incoming_data.is_serialized_key)
293+
{
294+
return false;
295+
}
296+
290297
// Validate fragment indexes
291298
if (last_fragment_index > fragment_count_)
292299
{

include/fastdds/rtps/common/SerializedPayload.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ struct FASTDDS_EXPORTED_API SerializedPayload_t
7272
uint32_t pos;
7373
//!Pool that created the payload
7474
IPayloadPool* payload_owner = nullptr;
75+
//!Whether the payload contains a serialized key, or the whole data
76+
bool is_serialized_key = false;
7577

7678
//!Default constructor
7779
SerializedPayload_t()

src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,14 @@ bool DataReaderHistory::received_change_keep_all(
215215
{
216216
if (!compute_key_for_change_fn_(a_change))
217217
{
218-
// Store the sample temporally only in ReaderHistory. When completed it will be stored in DataReaderHistory too.
219-
return add_to_reader_history_if_not_full(a_change, rejection_reason);
218+
if (!a_change->is_fully_assembled())
219+
{
220+
// Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too.
221+
return add_to_reader_history_if_not_full(a_change, rejection_reason);
222+
}
223+
224+
rejection_reason = REJECTED_BY_INSTANCES_LIMIT;
225+
return false;
220226
}
221227

222228
bool ret_value = false;
@@ -251,8 +257,14 @@ bool DataReaderHistory::received_change_keep_last(
251257
{
252258
if (!compute_key_for_change_fn_(a_change))
253259
{
254-
// Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too.
255-
return add_to_reader_history_if_not_full(a_change, rejection_reason);
260+
if (!a_change->is_fully_assembled())
261+
{
262+
// Store the sample temporally only in ReaderHistory. When completed it will be stored in SubscriberHistory too.
263+
return add_to_reader_history_if_not_full(a_change, rejection_reason);
264+
}
265+
266+
rejection_reason = REJECTED_BY_INSTANCES_LIMIT;
267+
return false;
256268
}
257269

258270
bool ret_value = false;

src/cpp/fastdds/topic/DDSSQLFilter/DDSFilterExpression.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ bool DDSFilterExpression::evaluate(
5050
using namespace eprosima::fastdds::dds::xtypes;
5151
using namespace eprosima::fastcdr;
5252

53+
// Always pass filter for key-only payloads
54+
if (payload.is_serialized_key)
55+
{
56+
return true;
57+
}
58+
5359
dyn_data_->clear_all_values();
5460
try
5561
{

src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ bool DataSharingPayloadPool::release_payload(
3434
payload.pos = 0;
3535
payload.max_size = 0;
3636
payload.data = nullptr;
37+
payload.is_serialized_key = false;
3738
payload.payload_owner = nullptr;
3839
return true;
3940
}

src/cpp/rtps/DataSharing/ReaderPool.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class ReaderPool : public DataSharingPayloadPool
5959
payload.data = data.data;
6060
payload.length = data.length;
6161
payload.max_size = data.length;
62+
payload.is_serialized_key = data.is_serialized_key;
6263
payload.payload_owner = this;
6364
return true;
6465
}

src/cpp/rtps/DataSharing/WriterPool.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class WriterPool : public DataSharingPayloadPool
8888
payload.data = data.data;
8989
payload.length = data.length;
9090
payload.max_size = data.length;
91+
payload.is_serialized_key = data.is_serialized_key;
9192
payload.payload_owner = this;
9293
return true;
9394
}

src/cpp/rtps/common/SerializedPayload.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ SerializedPayload_t& SerializedPayload_t::operator = (
3636
max_size = other.max_size;
3737
pos = other.pos;
3838
payload_owner = other.payload_owner;
39+
is_serialized_key = other.is_serialized_key;
3940

4041
other.encapsulation = CDR_BE;
4142
other.length = 0;
4243
other.data = nullptr;
4344
other.max_size = 0;
4445
other.pos = 0;
4546
other.payload_owner = nullptr;
47+
other.is_serialized_key = false;
4648

4749
return *this;
4850
}
@@ -60,6 +62,7 @@ bool SerializedPayload_t::operator == (
6062
const SerializedPayload_t& other) const
6163
{
6264
return ((encapsulation == other.encapsulation) &&
65+
(is_serialized_key == other.is_serialized_key) &&
6366
(length == other.length) &&
6467
(0 == memcmp(data, other.data, length)));
6568
}
@@ -82,6 +85,7 @@ bool SerializedPayload_t::copy(
8285
}
8386
}
8487
encapsulation = serData->encapsulation;
88+
is_serialized_key = serData->is_serialized_key;
8589
if (length == 0)
8690
{
8791
return true;
@@ -96,6 +100,7 @@ bool SerializedPayload_t::reserve_fragmented(
96100
length = serData->length;
97101
max_size = serData->length;
98102
encapsulation = serData->encapsulation;
103+
is_serialized_key = serData->is_serialized_key;
99104
data = (octet*)calloc(length, sizeof(octet));
100105
return true;
101106
}
@@ -112,6 +117,7 @@ void SerializedPayload_t::empty()
112117
free(data);
113118
}
114119
data = nullptr;
120+
is_serialized_key = false;
115121
}
116122

117123
void SerializedPayload_t::reserve(

src/cpp/rtps/history/TopicPayloadPool.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ bool TopicPayloadPool::get_payload(
100100
payload.data = data.data;
101101
payload.length = data.length;
102102
payload.max_size = PayloadNode::data_size(data.data);
103+
payload.is_serialized_key = data.is_serialized_key;
103104
payload.payload_owner = this;
104105
return true;
105106
}
@@ -136,6 +137,7 @@ bool TopicPayloadPool::release_payload(
136137
payload.pos = 0;
137138
payload.max_size = 0;
138139
payload.data = nullptr;
140+
payload.is_serialized_key = false;
139141
payload.payload_owner = nullptr;
140142
return true;
141143
}

src/cpp/rtps/messages/MessageReceiver.cpp

Lines changed: 20 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -867,28 +867,10 @@ bool MessageReceiver::proc_Submsg_Data(
867867
uint32_t next_pos = msg->pos + payload_size;
868868
if (msg->length >= next_pos && payload_size > 0)
869869
{
870-
FASTDDS_TODO_BEFORE(3, 3, "Pass keyFlag in serializedPayload, and always pass input data upwards");
871-
if (dataFlag)
872-
{
873-
ch.serializedPayload.data = &msg->buffer[msg->pos];
874-
ch.serializedPayload.length = payload_size;
875-
ch.serializedPayload.max_size = payload_size;
876-
}
877-
else // keyFlag would be true since we are inside an if (dataFlag || keyFlag)
878-
{
879-
if (payload_size <= PARAMETER_KEY_HASH_LENGTH)
880-
{
881-
if (!ch.instanceHandle.isDefined())
882-
{
883-
memcpy(ch.instanceHandle.value, &msg->buffer[msg->pos], payload_size);
884-
}
885-
}
886-
else
887-
{
888-
EPROSIMA_LOG_WARNING(RTPS_MSG_IN, IDSTRING "Ignoring Serialized Payload for too large key-only data (" <<
889-
payload_size << ")");
890-
}
891-
}
870+
ch.serializedPayload.data = &msg->buffer[msg->pos];
871+
ch.serializedPayload.length = payload_size;
872+
ch.serializedPayload.max_size = payload_size;
873+
ch.serializedPayload.is_serialized_key = keyFlag;
892874
msg->pos = next_pos;
893875
}
894876
else
@@ -977,6 +959,7 @@ bool MessageReceiver::proc_Submsg_DataFrag(
977959
//FOUND THE READER.
978960
//We ask the reader for a cachechange to store the information.
979961
CacheChange_t ch;
962+
ch.kind = ALIVE;
980963
ch.writerGUID.guidPrefix = source_guid_prefix_;
981964
valid &= CDRMessage::readEntityId(msg, &ch.writerGUID.entityId);
982965

@@ -1046,49 +1029,24 @@ bool MessageReceiver::proc_Submsg_DataFrag(
10461029

10471030
// Validations??? XXX TODO
10481031

1049-
if (!keyFlag)
1032+
uint32_t next_pos = msg->pos + payload_size;
1033+
if (msg->length >= next_pos && payload_size > 0)
10501034
{
1051-
uint32_t next_pos = msg->pos + payload_size;
1052-
if (msg->length >= next_pos && payload_size > 0)
1053-
{
1054-
ch.kind = ALIVE;
1055-
ch.serializedPayload.data = &msg->buffer[msg->pos];
1056-
ch.serializedPayload.length = payload_size;
1057-
ch.serializedPayload.max_size = payload_size;
1058-
ch.setFragmentSize(fragmentSize);
1035+
ch.serializedPayload.data = &msg->buffer[msg->pos];
1036+
ch.serializedPayload.length = payload_size;
1037+
ch.serializedPayload.max_size = payload_size;
1038+
ch.serializedPayload.is_serialized_key = keyFlag;
1039+
ch.setFragmentSize(fragmentSize);
10591040

1060-
msg->pos = next_pos;
1061-
}
1062-
else
1063-
{
1064-
EPROSIMA_LOG_WARNING(RTPS_MSG_IN, IDSTRING "Serialized Payload value invalid or larger than maximum allowed size "
1065-
"(" << payload_size << "/" << (msg->length - msg->pos) << ")");
1066-
ch.serializedPayload.data = nullptr;
1067-
ch.inline_qos.data = nullptr;
1068-
return false;
1069-
}
1041+
msg->pos = next_pos;
10701042
}
1071-
else if (keyFlag)
1072-
{
1073-
/* XXX TODO
1074-
Endianness_t previous_endian = msg->msg_endian;
1075-
if (ch->serializedPayload.encapsulation == PL_CDR_BE)
1076-
msg->msg_endian = BIGEND;
1077-
else if (ch->serializedPayload.encapsulation == PL_CDR_LE)
1078-
msg->msg_endian = LITTLEEND;
1079-
else
1080-
{
1081-
EPROSIMA_LOG_ERROR(RTPS_MSG_IN, IDSTRING"Bad encapsulation for KeyHash and status parameter list");
1082-
return false;
1083-
}
1084-
//uint32_t param_size;
1085-
if (ParameterList::readParameterListfromCDRMsg(msg, &m_ParamList, ch, false) <= 0)
1086-
{
1087-
EPROSIMA_LOG_INFO(RTPS_MSG_IN, IDSTRING"SubMessage Data ERROR, keyFlag ParameterList");
1088-
return false;
1089-
}
1090-
msg->msg_endian = previous_endian;
1091-
*/
1043+
else
1044+
{
1045+
EPROSIMA_LOG_WARNING(RTPS_MSG_IN, IDSTRING "Serialized Payload value invalid or larger than maximum allowed size "
1046+
"(" << payload_size << "/" << (msg->length - msg->pos) << ")");
1047+
ch.serializedPayload.data = nullptr;
1048+
ch.inline_qos.data = nullptr;
1049+
return false;
10921050
}
10931051

10941052
// Set sourcetimestamp

0 commit comments

Comments
 (0)