Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 88 additions & 32 deletions srcCxx/shape_main.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ class ShapeOptions {

useconds_t periodic_announcement_period_us;

char* cft_expression;

int size_modulo;

public:
//-------------------------------------------------------------
ShapeOptions()
Expand Down Expand Up @@ -330,6 +334,10 @@ class ShapeOptions {
take_read_next_instance = true;

periodic_announcement_period_us = 0;

cft_expression = NULL;

size_modulo = 0; // 0 means disabled
}

//-------------------------------------------------------------
Expand All @@ -338,6 +346,7 @@ class ShapeOptions {
STRING_FREE(topic_name);
STRING_FREE(color);
STRING_FREE(partition);
STRING_FREE(cft_expression);
}

//-------------------------------------------------------------
Expand Down Expand Up @@ -403,6 +412,13 @@ class ShapeOptions {
printf(" read_next_instance()\n");
printf(" --periodic-announcement <ms> : indicates the periodic participant\n");
printf(" announcement period in ms. Default 0 (off)\n");
printf(" --cft <expression> : ContentFilteredTopic filter expression (quotes\n");
printf(" required around the expression). Cannot be used with\n");
printf(" -c on subscriber applications\n");
printf(" --size-modulo <int> : If set, the modulo operation is applied to the\n");
printf(" shapesize. This will make that shapesize is in the\n");
printf(" range [1,N]. This only applies if shapesize is\n");
printf(" increased (-z 0)\n");
}

//-------------------------------------------------------------
Expand All @@ -415,7 +431,7 @@ class ShapeOptions {
logger.log_message("please specify publish [-P] or subscribe [-S]", Verbosity::ERROR);
return false;
}
if ( publish && subscribe ) {
if (publish && subscribe) {
logger.log_message("please specify only one of: publish [-P] or subscribe [-S]", Verbosity::ERROR);
return false;
}
Expand All @@ -432,6 +448,9 @@ class ShapeOptions {
if (publish && take_read_next_instance == false ) {
logger.log_message("warning: --take-read ignored on publisher applications", Verbosity::ERROR);
}
if (publish && cft_expression != NULL) {
logger.log_message("warning: --cft ignored on publisher applications", Verbosity::ERROR);
}
if (subscribe && shapesize != 20) {
logger.log_message("warning: shapesize [-z] ignored on subscriber applications", Verbosity::ERROR);
}
Expand All @@ -456,6 +475,13 @@ class ShapeOptions {
if (!coherent_set_enabled && !ordered_access_enabled && coherent_set_access_scope_set) {
logger.log_message("warning: --access-scope ignored because not coherent, or ordered access enabled", Verbosity::ERROR);
}
if (size_modulo > 0 && shapesize != 0) {
logger.log_message("warning: --size-modulo has no effect unless shapesize (-z) is set to 0", Verbosity::ERROR);
}
if (subscribe && color != NULL && cft_expression != NULL) {
logger.log_message("error: cannot specify both --cft and -c/--color for subscriber applications", Verbosity::ERROR);
return false;
}

return true;
}
Expand Down Expand Up @@ -483,6 +509,8 @@ class ShapeOptions {
{"take-read", no_argument, NULL, 'K'},
{"time-filter", required_argument, NULL, 'i'},
{"periodic-announcement", required_argument, NULL, 'N'},
{"cft", required_argument, NULL, 'F'},
{"size-modulo", required_argument, NULL, 'Q'},
{NULL, 0, NULL, 0 }
};

Expand Down Expand Up @@ -867,6 +895,19 @@ class ShapeOptions {
periodic_announcement_period_us = (useconds_t) converted_param * 1000;
break;
}
case 'F':
cft_expression = strdup(optarg);
break;
case 'Q': {
int converted_param = 0;
if (sscanf(optarg, "%d", &converted_param) == 0 || converted_param < 1) {
logger.log_message("incorrect value for size-modulo, must be >=1", Verbosity::ERROR);
parse_ok = false;
} else {
size_modulo = converted_param;
}
break;
}
case '?':
parse_ok = false;
break;
Expand Down Expand Up @@ -1500,50 +1541,58 @@ class ShapeApplication {
logger.log_message(" HistoryDepth = " + std::to_string(dr_qos.history FIELD_ACCESSOR.depth), Verbosity::DEBUG);
}

if ( options->color != NULL ) {
/* filter on specified color */
if ( options->cft_expression != NULL || options->color != NULL) {
ContentFilteredTopic *cft = NULL;
StringSeq cf_params;
StringSeq cf_params;

for (unsigned int i = 0; i < options->num_topics; ++i) {
const std::string filtered_topic_name_str =
std::string(options->topic_name) +
(i > 0 ? std::to_string(i) : "") +
"_filtered";
const char* filtered_topic_name = filtered_topic_name_str.c_str();
for (unsigned int i = 0; i < options->num_topics; ++i) {
const std::string filtered_topic_name_str =
std::string(options->topic_name) +
(i > 0 ? std::to_string(i) : "") +
"_filtered";
const char* filtered_topic_name = filtered_topic_name_str.c_str();
const char* filter_expr = nullptr;

if (options->cft_expression != NULL) {
filter_expr = options->cft_expression;
cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], filter_expr, cf_params);
logger.log_message(" ContentFilterTopic = \"" + std::string(filter_expr) + "\"", Verbosity::DEBUG);
} else if (options->color != NULL) {
#if defined(RTI_CONNEXT_DDS)
char parameter[64];
snprintf(parameter, 64, "'%s'", options->color);
StringSeq_push(cf_params, parameter);
char parameter[64];
snprintf(parameter, 64, "'%s'", options->color);
StringSeq_push(cf_params, parameter);

cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color MATCH %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color MATCH "
+ std::string(parameter) + std::string("\""), Verbosity::DEBUG);
cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color MATCH %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color MATCH "
+ std::string(parameter) + std::string("\""), Verbosity::DEBUG);
#elif defined(INTERCOM_DDS)
char parameter[64];
snprintf(parameter, 64, "'%s'", options->color);
StringSeq_push(cf_params, parameter);
char parameter[64];
snprintf(parameter, 64, "'%s'", options->color);
StringSeq_push(cf_params, parameter);

cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color = "
+ std::string(parameter) + std::string("\""), Verbosity::DEBUG);
cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color = "
+ std::string(parameter) + std::string("\""), Verbosity::DEBUG);
#elif defined(TWINOAKS_COREDX) || defined(OPENDDS)
StringSeq_push(cf_params, options->color);
cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color = "
+ std::string(options->color) + std::string("\""), Verbosity::DEBUG);
StringSeq_push(cf_params, options->color);
cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color = "
+ std::string(options->color) + std::string("\""), Verbosity::DEBUG);
#elif defined(EPROSIMA_FAST_DDS)
cf_params.push_back(std::string("'") + options->color + std::string("'"));
cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color = "
+ cf_params[0] + std::string("\""), Verbosity::DEBUG);
cf_params.push_back(std::string("'") + options->color + std::string("'"));
cft = dp->create_contentfilteredtopic(filtered_topic_name, topics[i], "color = %0", cf_params);
logger.log_message(" ContentFilterTopic = \"color = "
+ cf_params[0] + std::string("\""), Verbosity::DEBUG);
#endif
}

if (cft == NULL) {
logger.log_message("failed to create content filtered topic", Verbosity::ERROR);
return false;
}

printf("Create reader for topic: %s color: %s\n", cft->get_name() NAME_ACCESSOR, options->color );
printf("Create reader for topic: %s\n", cft->get_name() NAME_ACCESSOR);
drs[i] = dynamic_cast<ShapeTypeDataReader *>(sub->create_datareader(cft, dr_qos, NULL, LISTENER_STATUS_MASK_NONE));
if (drs[i] == NULL) {
logger.log_message("failed to create datareader[" + std::to_string(i) + "] topic: " + topics[i]->get_name(), Verbosity::ERROR);
Expand Down Expand Up @@ -1728,6 +1777,7 @@ class ShapeApplication {
} else {
ShapeType shape_key;
shape_initialize_w_color(shape_key, NULL);

#if defined(EPROSIMA_FAST_DDS)
shape_key.color FIELD_ACCESSOR = instance_handle_color[sample_info->instance_handle] NAME_ACCESSOR;
#else
Expand Down Expand Up @@ -1831,7 +1881,13 @@ class ShapeApplication {
moveShape(&shape);

if (options->shapesize == 0) {
shape.shapesize FIELD_ACCESSOR += 1;
if (options->size_modulo > 0) {
// Size cannot be 0, so increase it after modulo operation
shape.shapesize FIELD_ACCESSOR =
(shape.shapesize FIELD_ACCESSOR % options->size_modulo) + 1;
} else {
shape.shapesize FIELD_ACCESSOR += 1;
}
}

if (options->coherent_set_enabled || options->ordered_access_enabled) {
Expand Down
25 changes: 21 additions & 4 deletions test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,22 +425,39 @@

# Content Filtered Topic
'Test_Cft_0' : {
'apps' : ['-P -t Square -r -c BLUE', '-P -t Square -r -c RED', '-S -t Square -r -c RED'],
'apps' : ['-P -t Square -r -k 0 -c BLUE', '-P -t Square -r -k 0 -c RED', '-S -t Square -r -k 0 --cft "color = \'RED\'"'],
'expected_codes' : [ReturnCode.OK, ReturnCode.OK, ReturnCode.RECEIVING_FROM_ONE],
'check_function' : tsf.test_color_receivers,
'title' : 'Use of Content filter to avoid receiving undesired data',
'description' : 'Verifies a subscription using a ContentFilteredTopic does not receive data that does not pass the filter\n\n'
'title' : 'Use of Content filter to avoid receiving undesired data (key)',
'description' : 'Verifies a subscription using a ContentFilteredTopic does not receive data that does not '
'pass the filter. The filter is applied to the key "color"\n\n'
' * Configures a subscriber with a ContentFilteredTopic that selects only the shapes that '
'have "color" equal to "RED"\n'
' * Configures a first publisher to publish samples with "color" equal to "BLUE"\n'
' * Configures a second publisher to publish samples with "color" equal to "RED"\n'
' * Use RELIABLE Qos in all publishers and subscriber to ensure any samples that are not '
'received are due to filtering\n'
' * Configures the publishers / subscriber with history KEEP_ALL\n'
' * Verifies that both publishers discover and match the subscriber and vice-versa\n'
' * Note that this test does not check whether the filtering happens in the publisher side or '
'the subscriber side. It only checks the middleware filters the samples somewhere.\n\n'
f'The test passes if the subscriber receives {tsf.MAX_SAMPLES_READ} samples of one color\n'
},
},

'Test_Cft_1': {
'apps': ['-P -t Square -r -k 0 -z 0 --size-modulo 50', '-S -t Square -r -k 0 --cft "shapesize <= 20"'],
'expected_codes': [ReturnCode.OK, ReturnCode.OK],
'check_function': tsf.test_size_less_than_20,
'title' : 'Use of Content filter to avoid receiving undesired data (non-key)',
'description': 'Verifies a subscription using a ContentFilteredTopic does not receive data that does not '
'pass the filter. The filter is applied to the non-key member "shapesize".\n\n'
' * Use RELIABLE Qos in all publishers and subscriber to avoid samples losses\n'
' * Configures the publisher / subscriber with history KEEP_ALL\n'
' * The publisher application sends samples with increasing value of the "size" member\n'
' * Publisher sends samples with size cycling from 1 to 50 (using --size-modulo 50 and -z 0)\n'
' * Subscriber uses --cft "shapesize <= 20"\n'
' * The test passes if all received samples have size < 20\n'
},

# PARTITION
'Test_Partition_0' : {
Expand Down
35 changes: 35 additions & 0 deletions test_suite_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,41 @@ def test_color_receivers(child_sub, samples_sent, last_sample_saved, timeout):
print(f'Samples read: {samples_read}')
return ReturnCode.RECEIVING_FROM_ONE

def test_size_less_than_20(child_sub, samples_sent, last_sample_saved, timeout):
"""
Checks that all received samples have size between 1 and 20 (inclusive).
Returns ReturnCode.OK if all samples are in range, otherwise ReturnCode.DATA_NOT_CORRECT.
"""
import re
from rtps_test_utilities import ReturnCode

max_samples_received = 500
samples_read = 0

sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after)

while sub_string is not None and samples_read < max_samples_received:
size = int(sub_string.group(1))
if size < 1 or size > 20:
return ReturnCode.DATA_NOT_CORRECT

index = child_sub.expect(
[
'\[[0-9]+\]', # index = 0
pexpect.TIMEOUT # index = 1
],
timeout
)
if index == 1:
break

samples_read += 1
sub_string = re.search('[0-9]+ [0-9]+ \[([0-9]+)\]', child_sub.before + child_sub.after)

print(f'Samples read: {samples_read}')
return ReturnCode.OK


def test_reliability_order(child_sub, samples_sent, last_sample_saved, timeout):
"""
This function tests reliability, it checks whether the subscriber receives
Expand Down