Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/cppcmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ jobs:
dpkg -I package/liblsl*.deb
fi
cmake -E remove_directory package/_CPack_Packages
cp testing/lslcfgs/default.cfg .
- name: upload install dir
uses: actions/upload-artifact@master
with:
Expand Down
68 changes: 52 additions & 16 deletions src/api_config.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "api_config.h"
#include "common.h"
#include "util/cast.hpp"
#include "util/inireader.hpp"
#include "util/strfuns.hpp"
#include <algorithm>
Expand Down Expand Up @@ -148,32 +149,29 @@ void api_config::load_from_file(const std::string &filename) {
else
throw std::runtime_error("This ResolveScope setting is unsupported.");

multicast_addresses_.insert(
multicast_addresses_.end(), machine_group.begin(), machine_group.end());
std::vector<std::string> mcasttmp;

mcasttmp.insert(mcasttmp.end(), machine_group.begin(), machine_group.end());
multicast_ttl_ = 0;

if (scope >= link) {
multicast_addresses_.insert(
multicast_addresses_.end(), link_group.begin(), link_group.end());
multicast_addresses_.push_back("FF02:" + ipv6_multicast_group);
mcasttmp.insert(mcasttmp.end(), link_group.begin(), link_group.end());
mcasttmp.push_back("FF02:" + ipv6_multicast_group);
multicast_ttl_ = 1;
}
if (scope >= site) {
multicast_addresses_.insert(
multicast_addresses_.end(), site_group.begin(), site_group.end());
multicast_addresses_.push_back("FF05:" + ipv6_multicast_group);
mcasttmp.insert(mcasttmp.end(), site_group.begin(), site_group.end());
mcasttmp.push_back("FF05:" + ipv6_multicast_group);
multicast_ttl_ = 24;
}
if (scope >= organization) {
multicast_addresses_.insert(
multicast_addresses_.end(), organization_group.begin(), organization_group.end());
multicast_addresses_.push_back("FF08:" + ipv6_multicast_group);
mcasttmp.insert(mcasttmp.end(), organization_group.begin(), organization_group.end());
mcasttmp.push_back("FF08:" + ipv6_multicast_group);
multicast_ttl_ = 32;
}
if (scope >= global) {
multicast_addresses_.insert(
multicast_addresses_.end(), global_group.begin(), global_group.end());
multicast_addresses_.push_back("FF0E:" + ipv6_multicast_group);
mcasttmp.insert(mcasttmp.end(), global_group.begin(), global_group.end());
mcasttmp.push_back("FF0E:" + ipv6_multicast_group);
multicast_ttl_ = 255;
}

Expand All @@ -182,7 +180,45 @@ void api_config::load_from_file(const std::string &filename) {
std::vector<std::string> address_override =
parse_set(pt.get("multicast.AddressesOverride", "{}"));
if (ttl_override >= 0) multicast_ttl_ = ttl_override;
if (!address_override.empty()) multicast_addresses_ = address_override;
if (!address_override.empty()) mcasttmp = address_override;

// Parse, validate and store multicast addresses
for (std::vector<std::string>::iterator it = mcasttmp.begin(); it != mcasttmp.end(); ++it) {
ip::address addr = ip::make_address(*it);
if ((addr.is_v4() && allow_ipv4_) || (addr.is_v6() && allow_ipv6_))
multicast_addresses_.push_back(addr);
}

// The network stack requires the source interfaces for multicast packets to be
// specified as IPv4 address or an IPv6 interface index
// Try getting the interfaces from the configuration files
using namespace asio::ip;
std::vector<std::string> netifs = parse_set(pt.get("multicast.Interfaces", "{}"));
for (const auto &netifstr : netifs) {
netif if_;
if_.name = std::string("Configured in lslapi.cfg");
if_.addr = make_address(netifstr);
if (if_.addr.is_v6()) if_.ifindex = if_.addr.to_v6().scope_id();
multicast_interfaces.push_back(if_);
}
// Try getting the interfaces from the OS
if (multicast_interfaces.empty()) multicast_interfaces = get_local_interfaces();

// Otherwise, let the OS select an appropriate network interface
if (multicast_interfaces.empty()) {
LOG_F(ERROR,
"No local network interface addresses found, resolving streams will likely "
"only work for devices connected to the main network adapter\n");
// Add dummy interface with default settings
netif dummy;
dummy.name = "Dummy interface";
dummy.addr = address_v4::any();
multicast_interfaces.push_back(dummy);
dummy.name = "IPv6 dummy interface";
dummy.addr = address_v6::any();
multicast_interfaces.push_back(dummy);
}


// read the [lab] settings
known_peers_ = parse_set(pt.get("lab.KnownPeers", "{}"));
Expand Down Expand Up @@ -215,7 +251,7 @@ void api_config::load_from_file(const std::string &filename) {
force_default_timestamps_ = pt.get("tuning.ForceDefaultTimestamps", false);

// read the [log] settings
int log_level = pt.get("log.level", (int) loguru::Verbosity_INFO);
int log_level = pt.get("log.level", (int)loguru::Verbosity_INFO);
if (log_level < -3 || log_level > 9)
throw std::runtime_error("Invalid log.level (valid range: -3 to 9");

Expand Down
20 changes: 16 additions & 4 deletions src/api_config.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#ifndef API_CONFIG_H
#define API_CONFIG_H

#include "netinterfaces.h"
#include <cstdint>
#include <loguru.hpp>
#include <string>
#include <vector>

namespace ip = asio::ip;

namespace lsl {
/**
* A configuration object: holds all the configurable settings of liblsl.
Expand Down Expand Up @@ -81,7 +85,7 @@ class api_config {
const std::string &resolve_scope() const { return resolve_scope_; }

/**
* @brief List of multicast addresses on which inlets / outlets advertise/discover streams.
* List of multicast addresses on which inlets / outlets advertise/discover streams.
*
* This is merged from several other config file entries
* (LocalAddresses,SiteAddresses,OrganizationAddresses, GlobalAddresses)
Expand All @@ -96,7 +100,7 @@ class api_config {
* department) or organization (e.g., the campus), or at larger scope, multicast addresses
* with the according scope need to be included.
*/
const std::vector<std::string> &multicast_addresses() const { return multicast_addresses_; }
const std::vector<ip::address> &multicast_addresses() const { return multicast_addresses_; }

/**
* @brief The address of the local interface on which to listen to multicast traffic.
Expand All @@ -106,8 +110,16 @@ class api_config {
const std::string &listen_address() const { return listen_address_; }

/**
* @brief The TTL setting (time-to-live) for the multicast packets.
* A list of local interface addresses the multicast packets should be
* sent from.
*
* The ini file may contain IPv4 addresses and/or IPv6 addresses with the
* interface index as scope id, e.g. `1234:5678::2%3`
**/
std::vector<lsl::netif> multicast_interfaces;

/**
* The TTL setting (time-to-live) for the multicast packets.
* This is determined according to the ResolveScope setting if not overridden by the TTLOverride
* setting. The higher this number (0-255), the broader their distribution. Routers (if
* correctly configured) employ various thresholds below which packets are not further
Expand Down Expand Up @@ -216,7 +228,7 @@ class api_config {
bool allow_random_ports_;
uint16_t multicast_port_;
std::string resolve_scope_;
std::vector<std::string> multicast_addresses_;
std::vector<ip::address> multicast_addresses_;
int multicast_ttl_;
std::string listen_address_;
std::vector<std::string> known_peers_;
Expand Down
62 changes: 40 additions & 22 deletions src/resolve_attempt_udp.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "resolve_attempt_udp.h"
#include "api_config.h"
#include "netinterfaces.h"
#include "resolver_impl.h"
#include "socket_utils.h"
#include "util/strfuns.hpp"
Expand All @@ -11,13 +12,16 @@
#include <sstream>

using namespace lsl;
using err_t = const asio::error_code &;
using asio::ip::multicast::outbound_interface;

resolve_attempt_udp::resolve_attempt_udp(asio::io_context &io, const udp &protocol,
const std::vector<udp::endpoint> &targets, const std::string &query, resolver_impl &resolver,
double cancel_after)
: io_(io), resolver_(resolver), cancel_after_(cancel_after), cancelled_(false),
targets_(targets), query_(query), unicast_socket_(io), broadcast_socket_(io),
multicast_socket_(io), recv_socket_(io), cancel_timer_(io) {
multicast_socket_(io), multicast_interfaces(api_config::get_instance()->multicast_interfaces),
recv_socket_(io), cancel_timer_(io) {
// open the sockets that we might need
recv_socket_.open(protocol);
try {
Expand Down Expand Up @@ -71,7 +75,7 @@ void resolve_attempt_udp::begin() {
// initiate the result gathering chain
receive_next_result();
// initiate the send chain
send_next_query(targets_.begin());
send_next_query(targets_.begin(), multicast_interfaces.begin());

// also initiate the cancel event, if desired
if (cancel_after_ != FOREVER) {
Expand Down Expand Up @@ -153,27 +157,41 @@ void resolve_attempt_udp::handle_receive_outcome(err_t err, std::size_t len) {

// === send loop ===

void resolve_attempt_udp::send_next_query(endpoint_list::const_iterator next) {
if (next == targets_.end() || cancelled_) return;

udp::endpoint ep(*next++);
// endpoint matches our active protocol?
if (ep.protocol() == recv_socket_.local_endpoint().protocol()) {
// select socket to use
udp_socket &sock =
(ep.address() == asio::ip::address_v4::broadcast())
? broadcast_socket_
: (ep.address().is_multicast() ? multicast_socket_ : unicast_socket_);
// and send the query over it
sock.async_send_to(asio::buffer(query_msg_), ep,
[shared_this = shared_from_this(), next](err_t err, size_t /*unused*/) {
if (!shared_this->cancelled_ && err != asio::error::operation_aborted &&
err != asio::error::not_connected && err != asio::error::not_socket)
shared_this->send_next_query(next);
});
void resolve_attempt_udp::send_next_query(
endpoint_list::const_iterator next, mcast_interface_list::const_iterator mcit) {
if (cancelled_ || mcit == multicast_interfaces.end()) return;
auto proto = recv_socket_.local_endpoint().protocol();
if (next == targets_.begin()) {
// Mismatching protocols? Skip this round
if (mcit->addr.is_v4() != (proto == asio::ip::udp::v4()))
next = targets_.end();
else
multicast_socket_.set_option(mcit->addr.is_v4() ? outbound_interface(mcit->addr.to_v4())
: outbound_interface(mcit->ifindex));
}
if (next != targets_.end()) {
udp::endpoint ep(*next++);
// endpoint matches our active protocol?
if (ep.protocol() == recv_socket_.local_endpoint().protocol()) {
// select socket to use
udp_socket &sock =
(ep.address() == asio::ip::address_v4::broadcast())
? broadcast_socket_
: (ep.address().is_multicast() ? multicast_socket_ : unicast_socket_);
// and send the query over it
auto keepalive(shared_from_this());
sock.async_send_to(asio::buffer(query_msg_), ep,
[shared_this = shared_from_this(), next, mcit](err_t err, size_t /*unused*/) {
if (!shared_this->cancelled_ && err != asio::error::operation_aborted &&
err != asio::error::not_connected && err != asio::error::not_socket)
shared_this->send_next_query(next, mcit);
});
} else
// otherwise just go directly to the next query
send_next_query(next, mcit);
} else
// otherwise just go directly to the next query
send_next_query(next);
// Restart from the next interface
send_next_query(targets_.begin(), ++mcit);
}

void resolve_attempt_udp::do_cancel() {
Expand Down
10 changes: 8 additions & 2 deletions src/resolve_attempt_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

#include "cancellation.h"
#include "forward.h"
#include "netinterfaces.h"
#include "stream_info_impl.h"
#include "socket_utils.h"
#include <asio/ip/udp.hpp>
#include <asio/ip/multicast.hpp>
#include <asio/steady_timer.hpp>
#include <map>
#include <memory>
Expand All @@ -21,6 +22,8 @@ using steady_timer = asio::basic_waitable_timer<asio::chrono::steady_clock, asio

/// A container for resolve results (map from stream instance UID onto (stream_info,receive-time)).
typedef std::map<std::string, std::pair<stream_info_impl, double>> result_container;
/// A container for outgoing multicast interfaces
typedef std::vector<class netif> mcast_interface_list;

/**
* An asynchronous resolve attempt for a single query targeted at a set of endpoints, via UDP.
Expand Down Expand Up @@ -78,7 +81,8 @@ class resolve_attempt_udp final : public cancellable_obj,
void receive_next_result();

/// Thos function starts an async send operation for the given current endpoint.
void send_next_query(endpoint_list::const_iterator next);
void send_next_query(
endpoint_list::const_iterator next, mcast_interface_list::const_iterator mcit);

/// Handler that gets called when a receive has completed.
void handle_receive_outcome(err_t err, std::size_t len);
Expand Down Expand Up @@ -122,6 +126,8 @@ class resolve_attempt_udp final : public cancellable_obj,
udp_socket broadcast_socket_;
/// socket to send data over (for multicasts)
udp_socket multicast_socket_;
/// Interface addresses to send multicast packets from
const mcast_interface_list &multicast_interfaces;
/// socket to receive replies (always unicast)
udp_socket recv_socket_;
/// timer to schedule the cancel action
Expand Down
2 changes: 1 addition & 1 deletion src/resolver_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ resolver_impl::resolver_impl()
uint16_t mcast_port = cfg_->multicast_port();
for (const auto &mcast_addr : cfg_->multicast_addresses()) {
try {
mcast_endpoints_.emplace_back(asio::ip::make_address(mcast_addr), mcast_port);
mcast_endpoints_.emplace_back(mcast_addr, mcast_port);
} catch (std::exception &) {}
}

Expand Down
9 changes: 4 additions & 5 deletions src/stream_outlet_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,15 @@ void stream_outlet_impl::instantiate_stack(udp udp_protocol) {
// create UDP time server
udp_servers_.push_back(std::make_shared<udp_server>(info_, *io_ctx_service_, udp_protocol));
// create UDP multicast responders
for (const auto &mcastaddr : cfg->multicast_addresses()) {
for (const auto &address : cfg->multicast_addresses()) {
try {
// use only addresses for the protocol that we're supposed to use here
auto address = asio::ip::make_address(mcastaddr);
if (udp_protocol == udp::v4() ? address.is_v4() : address.is_v6())
responders_.push_back(std::make_shared<udp_server>(
info_, *io_ctx_service_, mcastaddr, multicast_port, multicast_ttl, listen_address));
info_, *io_ctx_service_, address, multicast_port, multicast_ttl, listen_address));
} catch (std::exception &e) {
LOG_F(WARNING, "Couldn't create multicast responder for %s (%s)", mcastaddr.c_str(),
e.what());
LOG_F(WARNING, "Couldn't create multicast responder for %s (%s)",
address.to_string().c_str(), e.what());
}
}
}
Expand Down
30 changes: 21 additions & 9 deletions src/udp_server.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "udp_server.h"
#include "api_config.h"
#include "socket_utils.h"
#include "stream_info_impl.h"
#include "util/strfuns.hpp"
Expand Down Expand Up @@ -34,11 +35,10 @@ udp_server::udp_server(stream_info_impl_p info, asio::io_context &io, udp protoc
(void *)this);
}

udp_server::udp_server(stream_info_impl_p info, asio::io_context &io, const std::string &address,
udp_server::udp_server(stream_info_impl_p info, asio::io_context &io, ip::address addr,
uint16_t port, int ttl, const std::string &listen_address)
: info_(std::move(info)), io_(io), socket_(std::make_shared<udp_socket>(io)),
time_services_enabled_(false) {
ip::address addr = ip::make_address(address);
bool is_broadcast = addr == ip::address_v4::broadcast();

// set up the endpoint where we listen (note: this is not yet the multicast address)
Expand All @@ -65,16 +65,28 @@ udp_server::udp_server(stream_info_impl_p info, asio::io_context &io, const std:
// bind to the listen endpoint
socket_->bind(listen_endpoint);

// join the multicast group, if any
// join the multicast groups
if (addr.is_multicast() && !is_broadcast) {
if (addr.is_v4())
socket_->set_option(
ip::multicast::join_group(addr.to_v4(), listen_endpoint.address().to_v4()));
else
socket_->set_option(ip::multicast::join_group(addr));
bool joined_anywhere = false;
asio::error_code err;
for (auto &if_ : api_config::get_instance()->multicast_interfaces) {
DLOG_F(
INFO, "Joining %s to %s", if_.addr.to_string().c_str(), addr.to_string().c_str());
if (addr.is_v4() && if_.addr.is_v4())
socket_->set_option(ip::multicast::join_group(addr.to_v4(), if_.addr.to_v4()), err);
else if (addr.is_v6() && if_.addr.is_v6())
socket_->set_option(
ip::multicast::join_group(addr.to_v6(), if_.addr.to_v6().scope_id()), err);
if (err)
LOG_F(WARNING, "Could not bind multicast responder for %s to interface %s (%s)",
addr.to_string().c_str(), if_.addr.to_string().c_str(), err.message().c_str());
else
joined_anywhere = true;
}
if (!joined_anywhere) throw std::runtime_error("Could not join any multicast group");
}
LOG_F(2, "%s: Started multicast udp server at %s port %d (addr %p)",
this->info_->name().c_str(), address.c_str(), port, (void *)this);
this->info_->name().c_str(), addr.to_string().c_str(), port, (void *)this);
}

// === externally issued asynchronous commands ===
Expand Down
Loading