Skip to content

Commit cb5887b

Browse files
Add support for task auto start using zenoh-pico options (#686)
* add support for task auto start using zenoh-pico options * add methods to check if tasks are running; add tests;
1 parent e600b43 commit cb5887b

File tree

5 files changed

+185
-33
lines changed

5 files changed

+185
-33
lines changed

docs/Doxyfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2353,7 +2353,7 @@ INCLUDE_FILE_PATTERNS =
23532353
# recursively expanded use the := operator instead of the = operator.
23542354
# This tag requires that the tag ENABLE_PREPROCESSING is set to YES.
23552355

2356-
PREDEFINED = ZENOHCXX_ZENOHPICO ZENOHCXX_ZENOHC Z_FEATURE_SHARED_MEMORY Z_FEATURE_UNSTABLE_API __DOXYGEN__
2356+
PREDEFINED = ZENOHCXX_ZENOHPICO ZENOHCXX_ZENOHC Z_FEATURE_SHARED_MEMORY Z_FEATURE_UNSTABLE_API Z_FEATURE_MULTI_THREAD=1 Z_FEATURE_PERIODIC_TASKS=1 __DOXYGEN__
23572357

23582358
# If the MACRO_EXPANSION and EXPAND_ONLY_PREDEF tags are set to YES then this
23592359
# tag can be used to specify a list of macro names that should be expanded. The

include/zenoh/api/session.hxx

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,70 @@ class Session : public Owned<::z_owned_session_t> {
6868
/// @brief Options to be passed when opening a ``Session``.
6969
struct SessionOptions {
7070
/// @name Fields
71-
#ifdef ZENOHCXX_ZENOHPICO
71+
#if defined(ZENOHCXX_ZENOHPICO) && Z_FEATURE_MULTI_THREAD == 1
72+
/// @brief List of background tasks to auto-start, allowing per task granularity.
73+
/// @note Zenoh-pico only.
74+
struct BackgroundTasksAutoStartOptions {
75+
/// @name Fields
76+
77+
/// Auto-start read task
78+
bool auto_start_read_task = true;
79+
/// Auto-start lease task
80+
bool auto_start_lease_task = true;
81+
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
82+
/// Auto-start periodic scheduler task.
83+
/// @note With Z_FEATURE_PERIODIC_TASKS enabled only.
84+
bool auto_start_periodic_task = true;
85+
#endif
86+
/// @brief Create default auto-start settings.
87+
static BackgroundTasksAutoStartOptions create_default() { return {}; }
88+
};
89+
7290
/// @brief If ``true``, start background threads which handle the network
7391
/// traffic. If false, the threads should be called manually with ``Session::start_read_task``,
7492
/// ``Session::start_lease_task`` and ``Session::start_periodic_scheduler_task``
7593
/// or methods ``Session::read``, ``Session::send_keep_alive``,
7694
/// ``Session::send_join`` and ``Session::process_periodic_tasks`` should be called in loop.
77-
/// @note Zenoh-pico only.
78-
bool start_background_tasks = true;
95+
/// If contains ``BackgroundTasksAutoStartOptions`` value, only enabled tasks will start and the
96+
/// remaining ones will need to be started or triggered manually.
97+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
98+
std::variant<bool, BackgroundTasksAutoStartOptions> start_background_tasks = true;
7999
#endif
100+
/// @name Methods
101+
102+
/// @brief Create default option settings.
80103
static SessionOptions create_default() { return {}; }
104+
105+
private:
106+
friend struct interop::detail::Converter;
107+
::z_open_options_t to_c_opts() {
108+
z_open_options_t opts;
109+
z_open_options_default(&opts);
110+
#if defined(ZENOHCXX_ZENOHPICO) && Z_FEATURE_MULTI_THREAD == 1
111+
std::visit(
112+
detail::commons::overloaded{[&opts](const SessionOptions::BackgroundTasksAutoStartOptions& tasks) {
113+
opts.auto_start_read_task = tasks.auto_start_read_task;
114+
opts.auto_start_lease_task = tasks.auto_start_lease_task;
115+
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
116+
opts.auto_start_periodic_task = tasks.auto_start_periodic_task;
117+
#endif
118+
},
119+
[&opts](const bool& start_all) {
120+
opts.auto_start_read_task = start_all;
121+
opts.auto_start_lease_task = start_all;
122+
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
123+
opts.auto_start_periodic_task = start_all;
124+
#endif
125+
}},
126+
start_background_tasks);
127+
#endif
128+
return opts;
129+
}
81130
};
82131

83132
/// @brief Options to be passed when closing a ``Session``.
84133
struct SessionCloseOptions {
85-
/// @name Fields
134+
/// @name Methods
86135
static SessionCloseOptions create_default() { return {}; }
87136
};
88137

@@ -95,28 +144,9 @@ class Session : public Owned<::z_owned_session_t> {
95144
/// thrown in case of error.
96145
Session(Config&& config, SessionOptions&& options = SessionOptions::create_default(), ZResult* err = nullptr)
97146
: Owned(nullptr) {
98-
__ZENOH_RESULT_CHECK(::z_open(&this->_0, interop::as_moved_c_ptr(config), nullptr), err,
147+
z_open_options_t opts = interop::detail::Converter::to_c_opts(options);
148+
__ZENOH_RESULT_CHECK(::z_open(&this->_0, interop::as_moved_c_ptr(config), &opts), err,
99149
"Failed to open session");
100-
#ifdef ZENOHCXX_ZENOHPICO
101-
if (err != nullptr && *err != Z_OK) return;
102-
if (options.start_background_tasks) {
103-
ZResult err_inner;
104-
this->start_read_task(&err_inner);
105-
if (err_inner == Z_OK) {
106-
this->start_lease_task(&err_inner);
107-
}
108-
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
109-
if (err_inner == Z_OK) {
110-
this->start_periodic_scheduler_task(&err_inner);
111-
}
112-
#endif
113-
if (err_inner == Z_OK) return;
114-
::z_drop(::z_move(this->_0));
115-
__ZENOH_RESULT_CHECK(err_inner, err, "Failed to start background tasks");
116-
}
117-
#else
118-
(void)options;
119-
#endif
120150
}
121151

122152
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API)
@@ -875,11 +905,12 @@ class Session : public Owned<::z_owned_session_t> {
875905
"Failed to fetch peer Ids");
876906
return out;
877907
}
878-
#ifdef ZENOHCXX_ZENOHPICO
908+
#if defined(ZENOHCXX_ZENOHPICO)
909+
#if Z_FEATURE_MULTI_THREAD == 1
879910
/// @brief Start a separate task to read from the network and process the messages as soon as they are received.
880911
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
881912
/// thrown in case of error.
882-
/// @note Zenoh-pico only.
913+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
883914
void start_read_task(ZResult* err = nullptr) {
884915
__ZENOH_RESULT_CHECK(zp_start_read_task(interop::as_loaned_c_ptr(*this), nullptr), err,
885916
"Failed to start read task");
@@ -888,7 +919,7 @@ class Session : public Owned<::z_owned_session_t> {
888919
/// @brief Stop the read task.
889920
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
890921
/// thrown in case of error.
891-
/// @note Zenoh-pico only.
922+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
892923
void stop_read_task(ZResult* err = nullptr) {
893924
__ZENOH_RESULT_CHECK(zp_stop_read_task(interop::as_loaned_c_ptr(*this)), err, "Failed to stop read task");
894925
}
@@ -898,7 +929,7 @@ class Session : public Owned<::z_owned_session_t> {
898929
/// periodically sends the Join messages.
899930
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
900931
/// thrown in case of error.
901-
/// @note Zenoh-pico only.
932+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
902933
void start_lease_task(ZResult* err = nullptr) {
903934
__ZENOH_RESULT_CHECK(zp_start_lease_task(interop::as_loaned_c_ptr(*this), NULL), err,
904935
"Failed to start lease task");
@@ -907,17 +938,27 @@ class Session : public Owned<::z_owned_session_t> {
907938
/// @brief Stop the lease task.
908939
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
909940
/// thrown in case of error.
910-
/// @note Zenoh-pico only.
941+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
911942
void stop_lease_task(ZResult* err = nullptr) {
912943
__ZENOH_RESULT_CHECK(zp_stop_lease_task(interop::as_loaned_c_ptr(*this)), err, "Failed to stop lease task");
913944
}
914945

946+
/// @brief Verify if read task is currently running.
947+
/// @return ``true`` if read task is running, ``false`` otherwise.
948+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
949+
bool is_read_task_running() const { return zp_read_task_is_running(interop::as_loaned_c_ptr(*this)); }
950+
951+
/// @brief Verify if lease task is currently running.
952+
/// @return ``true`` if read task is running, ``false`` otherwise.
953+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
954+
bool is_lease_task_running() const { return zp_lease_task_is_running(interop::as_loaned_c_ptr(*this)); }
955+
915956
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
916957
/// @brief Start the periodic scheduler task. The periodic scheduler task executes registered periodic jobs
917958
/// according to their configured intervals. Jobs are added and removed via the scheduler API.
918959
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
919960
/// thrown in case of error.
920-
/// @note Zenoh-pico only.
961+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD and Z_FEATURE_PERIODIC_TASKS enabled only.
921962
void start_periodic_scheduler_task(ZResult* err = nullptr) {
922963
__ZENOH_RESULT_CHECK(zp_start_periodic_scheduler_task(interop::as_loaned_c_ptr(*this), NULL), err,
923964
"Failed to start periodic scheduler task");
@@ -926,15 +967,26 @@ class Session : public Owned<::z_owned_session_t> {
926967
/// @brief Stop the periodic scheduler task.
927968
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
928969
/// thrown in case of error.
929-
/// @note Zenoh-pico only.
970+
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD and Z_FEATURE_PERIODIC_TASKS enabled only.
930971
void stop_periodic_scheduler_task(ZResult* err = nullptr) {
931972
__ZENOH_RESULT_CHECK(zp_stop_periodic_scheduler_task(interop::as_loaned_c_ptr(*this)), err,
932973
"Failed to stop periodic scheduler task");
933974
}
934975

976+
/// @brief Verify if periodic scheduler task is currently running.
977+
/// @return ``true`` if read task is running, ``false`` otherwise.
978+
/// @note Zenoh-pico with _FEATURE_MULTI_THREAD and Z_FEATURE_PERIODIC_TASKS enabled only.
979+
bool is_periodic_scheduler_task_running() const {
980+
return zp_periodic_scheduler_task_is_running(interop::as_loaned_c_ptr(*this));
981+
}
982+
#endif
983+
#endif
984+
985+
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
935986
/// @brief Process outstanding periodic tasks.
936987
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
937988
/// thrown in case of error.
989+
/// @note Zenoh-pico with Z_FEATURE_PERIODIC_TASKS enabled only.
938990
void process_periodic_tasks(ZResult* err = nullptr) {
939991
__ZENOH_RESULT_CHECK(zp_process_periodic_tasks(interop::as_loaned_c_ptr(*this)), err,
940992
"Failed to process periodic tasks");

include/zenoh/detail/commons.hxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,13 @@ auto make_transform_iterator(It const& it, F&& f) {
5454
return TransformIterator<It, F>(it, std::forward<F>(f));
5555
}
5656

57+
template <class... Ts>
58+
struct overloaded : Ts... {
59+
using Ts::operator()...;
60+
};
61+
62+
// Some compilers might require this explicit deduction guide
63+
template <class... Ts>
64+
overloaded(Ts...) -> overloaded<Ts...>;
65+
5766
} // namespace zenoh::detail::commons

tests/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ if(ZENOHCXX_ZENOHPICO)
9999
if ((${file} MATCHES "^.*batching.*$") AND NOT(ZENOHPICO_FEATURE_BATCHING))
100100
continue()
101101
endif()
102+
if ((${file} MATCHES "^.*tasks.*$")
103+
AND NOT((ZENOHPICO_FEATURE_MULTI_THREAD) AND (ZENOHPICO_FEATURE_ADVANCED_PUBLICATION) AND (ZENOHPICO_FEATURE_ADVANCED_SUBSCRIPTION)))
104+
continue()
105+
endif()
102106
add_test_instance(${file} zenohpico zenohcxx::zenohpico Router)
103107
endforeach()
104108
endif()

tests/zenohpico/network/tasks.cxx

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
//
2+
// Copyright (c) 2025 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <[email protected]>
13+
//
14+
15+
#include <chrono>
16+
#include <thread>
17+
18+
#include "zenoh.hxx"
19+
20+
using namespace zenoh;
21+
using namespace std::chrono_literals;
22+
23+
#undef NDEBUG
24+
#include <assert.h>
25+
26+
void test_all_tasks_auto_start() {
27+
std::cout << "Test tasks auto start: all\n";
28+
29+
Session::SessionOptions opts;
30+
opts.start_background_tasks = true;
31+
auto session1 = Session::open(Config::create_default(), std::move(opts));
32+
33+
assert(session1.is_read_task_running());
34+
assert(session1.is_lease_task_running());
35+
assert(session1.is_periodic_scheduler_task_running());
36+
37+
opts.start_background_tasks = false;
38+
auto session2 = Session::open(Config::create_default(), std::move(opts));
39+
40+
assert(!session2.is_read_task_running());
41+
assert(!session2.is_lease_task_running());
42+
assert(!session2.is_periodic_scheduler_task_running());
43+
}
44+
45+
void test_individual_tasks_auto_start() {
46+
std::cout << "Test tasks auto start: individual\n";
47+
48+
Session::SessionOptions opts;
49+
Session::SessionOptions::BackgroundTasksAutoStartOptions tasks;
50+
tasks.auto_start_read_task = true;
51+
tasks.auto_start_lease_task = false;
52+
tasks.auto_start_periodic_task = false;
53+
opts.start_background_tasks = tasks;
54+
55+
auto session1 = Session::open(Config::create_default(), std::move(opts));
56+
57+
assert(session1.is_read_task_running());
58+
assert(!session1.is_lease_task_running());
59+
assert(!session1.is_periodic_scheduler_task_running());
60+
61+
tasks.auto_start_read_task = false;
62+
tasks.auto_start_lease_task = true;
63+
tasks.auto_start_periodic_task = false;
64+
opts.start_background_tasks = tasks;
65+
66+
auto session2 = Session::open(Config::create_default(), std::move(opts));
67+
68+
assert(!session2.is_read_task_running());
69+
assert(session2.is_lease_task_running());
70+
assert(!session2.is_periodic_scheduler_task_running());
71+
72+
tasks.auto_start_read_task = false;
73+
tasks.auto_start_lease_task = false;
74+
tasks.auto_start_periodic_task = true;
75+
opts.start_background_tasks = tasks;
76+
77+
auto session3 = Session::open(Config::create_default(), std::move(opts));
78+
79+
assert(!session3.is_read_task_running());
80+
assert(!session3.is_lease_task_running());
81+
assert(session3.is_periodic_scheduler_task_running());
82+
}
83+
84+
int main(int argc, char** argv) {
85+
test_all_tasks_auto_start();
86+
test_individual_tasks_auto_start();
87+
};

0 commit comments

Comments
 (0)