Skip to content

Commit cad1f03

Browse files
committed
Remove timeout from generate_test_publisher_tasks functions, as these make the tests fail silently if they time out
1 parent 422de40 commit cad1f03

File tree

4 files changed

+72
-122
lines changed

4 files changed

+72
-122
lines changed

src/io/mqtt/reconfigurable_input_provider.rs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -312,28 +312,21 @@ mod integration_tests {
312312
let (y_tick, y_pub_stream) = tick_stream(stream::iter(ys.clone()).boxed());
313313

314314
// Spawn dummy MQTT publisher nodes and keep handles to wait for completion
315-
let x_publisher_task = executor.spawn(with_timeout_res(
316-
dummy_stream_mqtt_publisher(
317-
"x_publisher".to_string(),
318-
"mqtt_input_x".to_string(),
319-
x_pub_stream,
320-
xs.len(),
321-
mqtt_port,
322-
),
323-
5,
324-
"x_publisher_task",
315+
// Note: Without timeout as this fails silently due to spawn
316+
let x_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
317+
"x_publisher".to_string(),
318+
"mqtt_input_x".to_string(),
319+
x_pub_stream,
320+
xs.len(),
321+
mqtt_port,
325322
));
326323

327-
let y_publisher_task = executor.spawn(with_timeout_res(
328-
dummy_stream_mqtt_publisher(
329-
"y_publisher".to_string(),
330-
"mqtt_input_y".to_string(),
331-
y_pub_stream,
332-
ys.len(),
333-
mqtt_port,
334-
),
335-
5,
336-
"y_publisher_task",
324+
let y_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
325+
"y_publisher".to_string(),
326+
"mqtt_input_y".to_string(),
327+
y_pub_stream,
328+
ys.len(),
329+
mqtt_port,
337330
));
338331
((x_tick, x_publisher_task), (y_tick, y_publisher_task))
339332
}

tests/test_distributed_mqtt.rs

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,39 +46,27 @@ mod integration_tests {
4646
let (z_tick, z_pub_stream) = tick_stream(stream::iter(zs.clone()).boxed());
4747

4848
// Spawn dummy MQTT publisher nodes and keep handles to wait for completion
49-
let x_publisher_task = executor.spawn(with_timeout_res(
50-
dummy_stream_mqtt_publisher(
51-
"x_publisher".to_string(),
52-
"x".to_string(),
53-
x_pub_stream,
54-
xs.len(),
55-
mqtt_port,
56-
),
57-
5,
58-
"x_publisher_task",
49+
// Note: Without timeout as this fails silently due to spawn
50+
let x_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
51+
"x_publisher".to_string(),
52+
"x".to_string(),
53+
x_pub_stream,
54+
xs.len(),
55+
mqtt_port,
5956
));
60-
61-
let y_publisher_task = executor.spawn(with_timeout_res(
62-
dummy_stream_mqtt_publisher(
63-
"y_publisher".to_string(),
64-
"y".to_string(),
65-
y_pub_stream,
66-
ys.len(),
67-
mqtt_port,
68-
),
69-
5,
70-
"y_publisher_task",
57+
let y_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
58+
"y_publisher".to_string(),
59+
"y".to_string(),
60+
y_pub_stream,
61+
ys.len(),
62+
mqtt_port,
7163
));
72-
let z_publisher_task = executor.spawn(with_timeout_res(
73-
dummy_stream_mqtt_publisher(
74-
"z_publisher".to_string(),
75-
"z".to_string(),
76-
z_pub_stream,
77-
zs.len(),
78-
mqtt_port,
79-
),
80-
5,
81-
"z_publisher_task",
64+
let z_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
65+
"z_publisher".to_string(),
66+
"z".to_string(),
67+
z_pub_stream,
68+
zs.len(),
69+
mqtt_port,
8270
));
8371

8472
(

tests/test_mqtt_io.rs

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -97,28 +97,21 @@ mod integration_tests {
9797
let (y_tick, y_pub_stream) = tick_stream(stream::iter(ys.clone()).boxed());
9898

9999
// Spawn dummy MQTT publisher nodes and keep handles to wait for completion
100-
let x_publisher_task = executor.spawn(with_timeout_res(
101-
dummy_stream_mqtt_publisher(
102-
"x_publisher".to_string(),
103-
X_TOPIC.to_string(),
104-
x_pub_stream,
105-
xs.len(),
106-
mqtt_port,
107-
),
108-
5,
109-
"x_publisher_task",
100+
// Note: Without timeout as this fails silently due to spawn
101+
let x_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
102+
"x_publisher".to_string(),
103+
X_TOPIC.to_string(),
104+
x_pub_stream,
105+
xs.len(),
106+
mqtt_port,
110107
));
111108

112-
let y_publisher_task = executor.spawn(with_timeout_res(
113-
dummy_stream_mqtt_publisher(
114-
"y_publisher".to_string(),
115-
Y_TOPIC.to_string(),
116-
y_pub_stream,
117-
ys.len(),
118-
mqtt_port,
119-
),
120-
5,
121-
"y_publisher_task",
109+
let y_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
110+
"y_publisher".to_string(),
111+
Y_TOPIC.to_string(),
112+
y_pub_stream,
113+
ys.len(),
114+
mqtt_port,
122115
));
123116

124117
((x_tick, x_publisher_task), (y_tick, y_publisher_task))
@@ -485,28 +478,21 @@ mod reconf_tests {
485478
let (y_tick, y_pub_stream) = tick_stream(stream::iter(ys.clone()).boxed());
486479

487480
// Spawn dummy MQTT publisher nodes and keep handles to wait for completion
488-
let x_publisher_task = executor.spawn(with_timeout_res(
489-
dummy_stream_mqtt_publisher(
490-
"x_publisher".to_string(),
491-
X_TOPIC.to_string(),
492-
x_pub_stream,
493-
xs.len(),
494-
mqtt_port,
495-
),
496-
5,
497-
"x_publisher_task",
481+
// Note: Without timeout as this fails silently due to spawn
482+
let x_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
483+
"x_publisher".to_string(),
484+
X_TOPIC.to_string(),
485+
x_pub_stream,
486+
xs.len(),
487+
mqtt_port,
498488
));
499489

500-
let y_publisher_task = executor.spawn(with_timeout_res(
501-
dummy_stream_mqtt_publisher(
502-
"y_publisher".to_string(),
503-
Y_TOPIC.to_string(),
504-
y_pub_stream,
505-
ys.len(),
506-
mqtt_port,
507-
),
508-
5,
509-
"y_publisher_task",
490+
let y_publisher_task = executor.spawn(dummy_stream_mqtt_publisher(
491+
"y_publisher".to_string(),
492+
Y_TOPIC.to_string(),
493+
y_pub_stream,
494+
ys.len(),
495+
mqtt_port,
510496
));
511497

512498
((x_tick, x_publisher_task), (y_tick, y_publisher_task))

tests/test_redis_io.rs

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ mod integration_tests {
3232
use tc_testutils::streams::interleave_with_constant;
3333
use tc_testutils::streams::receive_values_serially;
3434
use tc_testutils::streams::tick_stream;
35-
use tc_testutils::streams::with_timeout_res;
3635
use tracing::error;
3736
use tracing::{debug, info};
3837
use trustworthiness_checker::async_test;
@@ -59,26 +58,19 @@ mod integration_tests {
5958
let (y_tick, y_pub_stream) = tick_stream(stream::iter(ys.clone()).boxed());
6059

6160
// Spawn dummy MQTT publisher nodes and keep handles to wait for completion
62-
let x_publisher_task = executor.spawn(with_timeout_res(
63-
dummy_redis_stream_sender(
64-
REDIS_HOSTNAME,
65-
Some(port),
66-
X_TOPIC.to_string(),
67-
x_pub_stream,
68-
),
69-
5,
70-
"x_publisher_task",
61+
// Note: Without timeout as this fails silently due to spawn
62+
let x_publisher_task = executor.spawn(dummy_redis_stream_sender(
63+
REDIS_HOSTNAME,
64+
Some(port),
65+
X_TOPIC.to_string(),
66+
x_pub_stream,
7167
));
7268

73-
let y_publisher_task = executor.spawn(with_timeout_res(
74-
dummy_redis_stream_sender(
75-
REDIS_HOSTNAME,
76-
Some(port),
77-
Y_TOPIC.to_string(),
78-
y_pub_stream,
79-
),
80-
5,
81-
"y_publisher_task",
69+
let y_publisher_task = executor.spawn(dummy_redis_stream_sender(
70+
REDIS_HOSTNAME,
71+
Some(port),
72+
Y_TOPIC.to_string(),
73+
y_pub_stream,
8274
));
8375

8476
((x_tick, x_publisher_task), (y_tick, y_publisher_task))
@@ -179,10 +171,7 @@ mod integration_tests {
179171
let input_provider_future = Box::pin(async move {
180172
while let Some(res) = input_provider_stream.next().await {
181173
if res.is_err() {
182-
error!(
183-
"Input provider stream returned error: {:?}",
184-
res
185-
);
174+
error!("Input provider stream returned error: {:?}", res);
186175
return res;
187176
}
188177
}
@@ -256,10 +245,7 @@ mod integration_tests {
256245
let input_provider_future = Box::pin(async move {
257246
while let Some(res) = input_provider_stream.next().await {
258247
if res.is_err() {
259-
error!(
260-
"Input provider stream returned error: {:?}",
261-
res
262-
);
248+
error!("Input provider stream returned error: {:?}", res);
263249
return res;
264250
}
265251
}
@@ -330,10 +316,7 @@ mod integration_tests {
330316
let input_provider_future = Box::pin(async move {
331317
while let Some(res) = input_provider_stream.next().await {
332318
if res.is_err() {
333-
error!(
334-
"Input provider stream returned error: {:?}",
335-
res
336-
);
319+
error!("Input provider stream returned error: {:?}", res);
337320
return res;
338321
}
339322
}

0 commit comments

Comments
 (0)