Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,7 @@ INCLUDE_FILE_PATTERNS =
# recursively expanded use the := operator instead of the = operator.
# This tag requires that the tag ENABLE_PREPROCESSING is set to YES.

PREDEFINED = ZENOHCXX_ZENOHPICO ZENOHCXX_ZENOHC Z_FEATURE_SHARED_MEMORY Z_FEATURE_UNSTABLE_API __DOXYGEN__
PREDEFINED = ZENOHCXX_ZENOHPICO ZENOHCXX_ZENOHC Z_FEATURE_SHARED_MEMORY Z_FEATURE_UNSTABLE_API Z_FEATURE_MULTI_THREAD=1 Z_FEATURE_PERIODIC_TASKS=1 __DOXYGEN__

# If the MACRO_EXPANSION and EXPAND_ONLY_PREDEF tags are set to YES then this
# tag can be used to specify a list of macro names that should be expanded. The
Expand Down
116 changes: 84 additions & 32 deletions include/zenoh/api/session.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,70 @@ class Session : public Owned<::z_owned_session_t> {
/// @brief Options to be passed when opening a ``Session``.
struct SessionOptions {
/// @name Fields
#ifdef ZENOHCXX_ZENOHPICO
#if defined(ZENOHCXX_ZENOHPICO) && Z_FEATURE_MULTI_THREAD == 1
/// @brief List of background tasks to auto-start, allowing per task granularity.
/// @note Zenoh-pico only.
struct BackgroundTasksAutoStartOptions {
/// @name Fields

/// Auto-start read task
bool auto_start_read_task = true;
/// Auto-start lease task
bool auto_start_lease_task = true;
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
/// Auto-start periodic scheduler task.
/// @note With Z_FEATURE_PERIODIC_TASKS enabled only.
bool auto_start_periodic_task = true;
#endif
/// @brief Create default auto-start settings.
static BackgroundTasksAutoStartOptions create_default() { return {}; }
};

/// @brief If ``true``, start background threads which handle the network
/// traffic. If false, the threads should be called manually with ``Session::start_read_task``,
/// ``Session::start_lease_task`` and ``Session::start_periodic_scheduler_task``
/// or methods ``Session::read``, ``Session::send_keep_alive``,
/// ``Session::send_join`` and ``Session::process_periodic_tasks`` should be called in loop.
/// @note Zenoh-pico only.
bool start_background_tasks = true;
/// If contains ``BackgroundTasksAutoStartOptions`` value, only enabled tasks will start and the
/// remaining ones will need to be started or triggered manually.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
std::variant<bool, BackgroundTasksAutoStartOptions> start_background_tasks = true;
#endif
/// @name Methods

/// @brief Create default option settings.
static SessionOptions create_default() { return {}; }

private:
friend struct interop::detail::Converter;
::z_open_options_t to_c_opts() {
z_open_options_t opts;
z_open_options_default(&opts);
#if defined(ZENOHCXX_ZENOHPICO) && Z_FEATURE_MULTI_THREAD == 1
std::visit(
detail::commons::overloaded{[&opts](const SessionOptions::BackgroundTasksAutoStartOptions& tasks) {
opts.auto_start_read_task = tasks.auto_start_read_task;
opts.auto_start_lease_task = tasks.auto_start_lease_task;
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
opts.auto_start_periodic_task = tasks.auto_start_periodic_task;
#endif
},
[&opts](const bool& start_all) {
opts.auto_start_read_task = start_all;
opts.auto_start_lease_task = start_all;
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
opts.auto_start_periodic_task = start_all;
#endif
}},
start_background_tasks);
#endif
return opts;
}
};

/// @brief Options to be passed when closing a ``Session``.
struct SessionCloseOptions {
/// @name Fields
/// @name Methods
static SessionCloseOptions create_default() { return {}; }
};

Expand All @@ -95,28 +144,9 @@ class Session : public Owned<::z_owned_session_t> {
/// thrown in case of error.
Session(Config&& config, SessionOptions&& options = SessionOptions::create_default(), ZResult* err = nullptr)
: Owned(nullptr) {
__ZENOH_RESULT_CHECK(::z_open(&this->_0, interop::as_moved_c_ptr(config), nullptr), err,
z_open_options_t opts = interop::detail::Converter::to_c_opts(options);
__ZENOH_RESULT_CHECK(::z_open(&this->_0, interop::as_moved_c_ptr(config), &opts), err,
"Failed to open session");
#ifdef ZENOHCXX_ZENOHPICO
if (err != nullptr && *err != Z_OK) return;
if (options.start_background_tasks) {
ZResult err_inner;
this->start_read_task(&err_inner);
if (err_inner == Z_OK) {
this->start_lease_task(&err_inner);
}
#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
if (err_inner == Z_OK) {
this->start_periodic_scheduler_task(&err_inner);
}
#endif
if (err_inner == Z_OK) return;
::z_drop(::z_move(this->_0));
__ZENOH_RESULT_CHECK(err_inner, err, "Failed to start background tasks");
}
#else
(void)options;
#endif
}

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_SHARED_MEMORY) && defined(Z_FEATURE_UNSTABLE_API)
Expand Down Expand Up @@ -875,11 +905,12 @@ class Session : public Owned<::z_owned_session_t> {
"Failed to fetch peer Ids");
return out;
}
#ifdef ZENOHCXX_ZENOHPICO
#if defined(ZENOHCXX_ZENOHPICO)
#if Z_FEATURE_MULTI_THREAD == 1
/// @brief Start a separate task to read from the network and process the messages as soon as they are received.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-pico only.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
void start_read_task(ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(zp_start_read_task(interop::as_loaned_c_ptr(*this), nullptr), err,
"Failed to start read task");
Expand All @@ -888,7 +919,7 @@ class Session : public Owned<::z_owned_session_t> {
/// @brief Stop the read task.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-pico only.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
void stop_read_task(ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(zp_stop_read_task(interop::as_loaned_c_ptr(*this)), err, "Failed to stop read task");
}
Expand All @@ -898,7 +929,7 @@ class Session : public Owned<::z_owned_session_t> {
/// periodically sends the Join messages.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-pico only.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
void start_lease_task(ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(zp_start_lease_task(interop::as_loaned_c_ptr(*this), NULL), err,
"Failed to start lease task");
Expand All @@ -907,17 +938,27 @@ class Session : public Owned<::z_owned_session_t> {
/// @brief Stop the lease task.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-pico only.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
void stop_lease_task(ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(zp_stop_lease_task(interop::as_loaned_c_ptr(*this)), err, "Failed to stop lease task");
}

/// @brief Verify if read task is currently running.
/// @return ``true`` if read task is running, ``false`` otherwise.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
bool is_read_task_running() const { return zp_read_task_is_running(interop::as_loaned_c_ptr(*this)); }

/// @brief Verify if lease task is currently running.
/// @return ``true`` if read task is running, ``false`` otherwise.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD enabled only.
bool is_lease_task_running() const { return zp_lease_task_is_running(interop::as_loaned_c_ptr(*this)); }

#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
/// @brief Start the periodic scheduler task. The periodic scheduler task executes registered periodic jobs
/// according to their configured intervals. Jobs are added and removed via the scheduler API.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-pico only.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD and Z_FEATURE_PERIODIC_TASKS enabled only.
void start_periodic_scheduler_task(ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(zp_start_periodic_scheduler_task(interop::as_loaned_c_ptr(*this), NULL), err,
"Failed to start periodic scheduler task");
Expand All @@ -926,15 +967,26 @@ class Session : public Owned<::z_owned_session_t> {
/// @brief Stop the periodic scheduler task.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-pico only.
/// @note Zenoh-pico with Z_FEATURE_MULTI_THREAD and Z_FEATURE_PERIODIC_TASKS enabled only.
void stop_periodic_scheduler_task(ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(zp_stop_periodic_scheduler_task(interop::as_loaned_c_ptr(*this)), err,
"Failed to stop periodic scheduler task");
}

/// @brief Verify if periodic scheduler task is currently running.
/// @return ``true`` if read task is running, ``false`` otherwise.
/// @note Zenoh-pico with _FEATURE_MULTI_THREAD and Z_FEATURE_PERIODIC_TASKS enabled only.
bool is_periodic_scheduler_task_running() const {
return zp_periodic_scheduler_task_is_running(interop::as_loaned_c_ptr(*this));
}
#endif
#endif

#if defined(Z_FEATURE_UNSTABLE_API) && Z_FEATURE_PERIODIC_TASKS == 1
/// @brief Process outstanding periodic tasks.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-pico with Z_FEATURE_PERIODIC_TASKS enabled only.
void process_periodic_tasks(ZResult* err = nullptr) {
__ZENOH_RESULT_CHECK(zp_process_periodic_tasks(interop::as_loaned_c_ptr(*this)), err,
"Failed to process periodic tasks");
Expand Down
9 changes: 9 additions & 0 deletions include/zenoh/detail/commons.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,13 @@ auto make_transform_iterator(It const& it, F&& f) {
return TransformIterator<It, F>(it, std::forward<F>(f));
}

template <class... Ts>
struct overloaded : Ts... {
using Ts::operator()...;
};

// Some compilers might require this explicit deduction guide
template <class... Ts>
overloaded(Ts...) -> overloaded<Ts...>;

} // namespace zenoh::detail::commons
4 changes: 4 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ if(ZENOHCXX_ZENOHPICO)
if ((${file} MATCHES "^.*batching.*$") AND NOT(ZENOHPICO_FEATURE_BATCHING))
continue()
endif()
if ((${file} MATCHES "^.*tasks.*$")
AND NOT((ZENOHPICO_FEATURE_MULTI_THREAD) AND (ZENOHPICO_FEATURE_ADVANCED_PUBLICATION) AND (ZENOHPICO_FEATURE_ADVANCED_SUBSCRIPTION)))
continue()
endif()
add_test_instance(${file} zenohpico zenohcxx::zenohpico Router)
endforeach()
endif()
Expand Down
87 changes: 87 additions & 0 deletions tests/zenohpico/network/tasks.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//
// Copyright (c) 2025 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include <chrono>
#include <thread>

#include "zenoh.hxx"

using namespace zenoh;
using namespace std::chrono_literals;

#undef NDEBUG
#include <assert.h>

void test_all_tasks_auto_start() {
std::cout << "Test tasks auto start: all\n";

Session::SessionOptions opts;
opts.start_background_tasks = true;
auto session1 = Session::open(Config::create_default(), std::move(opts));

assert(session1.is_read_task_running());
assert(session1.is_lease_task_running());
assert(session1.is_periodic_scheduler_task_running());

opts.start_background_tasks = false;
auto session2 = Session::open(Config::create_default(), std::move(opts));

assert(!session2.is_read_task_running());
assert(!session2.is_lease_task_running());
assert(!session2.is_periodic_scheduler_task_running());
}

void test_individual_tasks_auto_start() {
std::cout << "Test tasks auto start: individual\n";

Session::SessionOptions opts;
Session::SessionOptions::BackgroundTasksAutoStartOptions tasks;
tasks.auto_start_read_task = true;
tasks.auto_start_lease_task = false;
tasks.auto_start_periodic_task = false;
opts.start_background_tasks = tasks;

auto session1 = Session::open(Config::create_default(), std::move(opts));

assert(session1.is_read_task_running());
assert(!session1.is_lease_task_running());
assert(!session1.is_periodic_scheduler_task_running());

tasks.auto_start_read_task = false;
tasks.auto_start_lease_task = true;
tasks.auto_start_periodic_task = false;
opts.start_background_tasks = tasks;

auto session2 = Session::open(Config::create_default(), std::move(opts));

assert(!session2.is_read_task_running());
assert(session2.is_lease_task_running());
assert(!session2.is_periodic_scheduler_task_running());

tasks.auto_start_read_task = false;
tasks.auto_start_lease_task = false;
tasks.auto_start_periodic_task = true;
opts.start_background_tasks = tasks;

auto session3 = Session::open(Config::create_default(), std::move(opts));

assert(!session3.is_read_task_running());
assert(!session3.is_lease_task_running());
assert(session3.is_periodic_scheduler_task_running());
}

int main(int argc, char** argv) {
test_all_tasks_auto_start();
test_individual_tasks_auto_start();
};
Loading