diff --git a/include/liboculus/DataRx.h b/include/liboculus/DataRx.h index 3e9738b..3b66f98 100644 --- a/include/liboculus/DataRx.h +++ b/include/liboculus/DataRx.h @@ -30,6 +30,7 @@ #pragma once +#include #include #include #include @@ -44,21 +45,58 @@ namespace liboculus { using std::shared_ptr; +template +class MutexedVariable { + public: + MutexedVariable(const T &initial_value) : var_(initial_value), mutex_() { ; } + + T get() const { + std::lock_guard guard(mutex_); + return var_; + } + + T set(const T &value) { + std::lock_guard guard(mutex_); + var_ = value; + return var_; + } + + private: + // Mutable to allow locking mutex w/o breaking const correctness + mutable std::mutex mutex_; + T var_; +}; + class DataRx : public OculusMessageHandler { public: explicit DataRx(const IoServiceThread::IoContextPtr &iosrv); ~DataRx(); - bool isConnected() const { return _socket.is_open(); } + // The socket interface can't actually track if a socket is _good_ + // just that it's _open_. + // So we track this state outselves. + bool isConnected() const { return is_connected_.get(); } void connect(const boost::asio::ip::address &addr); void connect(const std::string &strAddr); + void disconnect(); + typedef std::function OnConnectCallback; void setOnConnectCallback(OnConnectCallback callback) { _onConnectCallback = callback; } + typedef std::function OnDisconnectCallback; + void setOnDisconnectCallback(OnDisconnectCallback callback) { + _onDisconnectCallback = callback; + } + + typedef std::function OnTimeoutCallback; + void setOnTimeoutCallback(OnTimeoutCallback callback) { + _onTimeoutCallback = callback; + } + // By default, this function sends the config to the sonar // as an OculusSimpleFireMessage2. // @@ -74,6 +112,7 @@ class DataRx : public OculusMessageHandler { private: void onConnect(const boost::system::error_code &error); + void onTimeout(const boost::system::error_code &error); // Initiates a network read. // Note this function reads until the **total number of bytes @@ -105,7 +144,21 @@ class DataRx : public OculusMessageHandler { shared_ptr _buffer; OnConnectCallback _onConnectCallback; + OnDisconnectCallback _onDisconnectCallback; + OnTimeoutCallback _onTimeoutCallback; + + int timeout_secs_; + boost::asio::deadline_timer timeout_timer_; + MutexedVariable is_connected_; + + // There are many cases where a sonar disappearing (e.g. failing) + // can't be distinguished from a sonar simply not being present. + // + // This flag is essentially "send each error message once" + // to reduce driver verbosity. It is reset on good communications + // with the sonar. + bool has_complained_; }; // class DataRx template @@ -122,9 +175,14 @@ void DataRx::sendSimpleFireMessage(const SonarConfiguration &config) { data = config.serialize(); if (data.size() > 0) { - auto result = _socket.send(boost::asio::buffer(data)); - LOG(DEBUG) << "Sent " << result << " bytes to sonar"; - haveWritten(data); + try { + auto result = _socket.send(boost::asio::buffer(data)); + LOG(DEBUG) << "Sent " << result << " bytes to sonar"; + haveWritten(data); + } catch (boost::system::system_error &ex) { + LOG(WARNING) << "Exception when sending: " << ex.what(); + disconnect(); + } } } diff --git a/lib/DataRx.cpp b/lib/DataRx.cpp index 494b45f..3bfcf2f 100644 --- a/lib/DataRx.cpp +++ b/lib/DataRx.cpp @@ -42,7 +42,10 @@ DataRx::DataRx(const IoServiceThread::IoContextPtr& iosrv) : OculusMessageHandler(), _socket(*iosrv), _buffer(std::make_shared()), - _onConnectCallback() {} + _onConnectCallback(), + is_connected_(false), + timeout_secs_(2), + timeout_timer_(*iosrv) {} DataRx::~DataRx() {} @@ -52,7 +55,8 @@ void DataRx::connect(const asio::ip::address& addr) { uint16_t port = liboculus::DataPort; boost::asio::ip::tcp::endpoint sonarEndpoint(addr, port); - LOG(INFO) << "Connecting to sonar at " << sonarEndpoint; + LOG(DEBUG) << "Attempting to connect to sonar at " << sonarEndpoint; + is_connected_.set(true); _socket.async_connect(sonarEndpoint, boost::bind(&DataRx::onConnect, this, _1)); @@ -67,17 +71,37 @@ void DataRx::connect(const std::string& strAddr) { void DataRx::onConnect(const boost::system::error_code& ec) { if (ec) { - LOG(WARNING) << "Error on connect: " << ec.message(); - _socket.close(); - + LOG(DEBUG) << "Error on connect: " << ec.message(); + disconnect(); + return; + } else if (!isConnected()) { + // A separate thread could have failed independently.. return; } - LOG(DEBUG) << "Connected to sonar!"; + LOG(INFO) << "Successful connection to sonar!"; restartReceiveCycle(); if (_onConnectCallback) _onConnectCallback(); } +void DataRx::disconnect() { + LOG(DEBUG) << " ... disconnecting"; + _socket.close(); + is_connected_.set(false); + if (_onDisconnectCallback) _onDisconnectCallback(); +} + +void DataRx::onTimeout(const boost::system::error_code& ec) { + if (ec == boost::asio::error::operation_aborted) { + return; + } else if (ec) { + LOG(WARNING) << "Error on timeout " << ec.message(); + } + LOG(DEBUG) << "!! No data from sonar in " << timeout_secs_ + << " seconds, timeout"; + if (_onTimeoutCallback) _onTimeoutCallback(); +} + //=== Readers void DataRx::readUpTo(size_t bytes, StateMachineCallback callback) { const size_t current_sz = _buffer->size(); @@ -101,6 +125,13 @@ void DataRx::restartReceiveCycle() { } else { _buffer->clear(); } + + // Reset timeout timer; this should cancel existing pending timeouts + const auto timeout = boost::posix_time::seconds(timeout_secs_); + timeout_timer_.expires_from_now(timeout); + timeout_timer_.async_wait( + boost::bind(&DataRx::onTimeout, this, boost::placeholders::_1)); + readUpTo(sizeof(uint8_t), boost::bind(&DataRx::rxFirstByteOculusId, this, _1, _2)); } @@ -109,29 +140,36 @@ void DataRx::restartReceiveCycle() { void DataRx::rxFirstByteOculusId(const boost::system::error_code& ec, std::size_t bytes_transferred) { - if (ec) { - LOG(WARNING) << "Error on receive of header: " << ec.message(); - goto exit; + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive cancelled, giving up..."; + return; + } else if (ec) { + // Failure of this first read usually indicates a network failure + LOG(WARNING) << "Error on receive of header: " << ec.value() << " " + << ec.message(); + disconnect(); + return; } if (bytes_transferred != sizeof(uint8_t)) { - goto exit; + restartReceiveCycle(); + return; } if (_buffer->data()[0] == liboculus::PacketHeaderLSB) { readUpTo(sizeof(uint16_t), boost::bind(&DataRx::rxSecondByteOculusId, this, _1, _2)); - return; } - -exit: - restartReceiveCycle(); } void DataRx::rxSecondByteOculusId(const boost::system::error_code& ec, std::size_t bytes_transferred) { - if (ec) { - LOG(WARNING) << "Error on receive of header: " << ec.message(); + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive ancelled, giving up..."; + return; + } else if (ec) { + LOG(WARNING) << "Error on receive of header: " << ec.value() << " " + << ec.message(); goto exit; } @@ -153,8 +191,12 @@ void DataRx::rxSecondByteOculusId(const boost::system::error_code& ec, void DataRx::rxHeader(const boost::system::error_code& ec, std::size_t bytes_transferred) { - if (ec) { - LOG(WARNING) << "Error on receive of header: " << ec.message(); + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive cancelled, giving up..."; + return; + } else if (ec) { + LOG(WARNING) << "Error on receive of header: " << ec.value() << " " + << ec.message(); return; } @@ -185,8 +227,12 @@ void DataRx::rxPacket(const boost::system::error_code& ec, std::size_t bytes_transferred) { MessageHeader hdr(_buffer); - if (ec) { - LOG(WARNING) << "Error on receive of packet data: " << ec.message(); + if (ec.value() == boost::asio::error::basic_errors::operation_aborted) { + LOG(DEBUG) << "Receive cancelled, giving up..."; + return; + } else if (ec) { + LOG(WARNING) << "Error on receive of packet data: " << ec.value() << " " + << ec.message(); goto exit; } diff --git a/lib/IoServiceThread.cpp b/lib/IoServiceThread.cpp index 9c9ffdf..9196a8a 100644 --- a/lib/IoServiceThread.cpp +++ b/lib/IoServiceThread.cpp @@ -31,6 +31,7 @@ #include "liboculus/IoServiceThread.h" #include +#include namespace liboculus { @@ -67,6 +68,13 @@ void IoServiceThread::join() { _thread.reset(); } -void IoServiceThread::threadExec() { _context->run(); } +void IoServiceThread::threadExec() { + try { + _context->run(); + } catch (std::exception& ex) { + std::cerr << "!! Unhandled ASIO exception in IoServiceThread: " << ex.what() + << std::endl; + } +} } // namespace liboculus