Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ find_package(utilities REQUIRED)
find_package(cetlib REQUIRED)
find_package(cppzmq REQUIRED)
find_package(ers REQUIRED)
find_package(nlohmann_json REQUIRED)
find_package(Boost COMPONENTS unit_test_framework program_options REQUIRED)
find_package(opmonlib REQUIRED)

Expand Down
1 change: 0 additions & 1 deletion cmake/ipmConfig.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ include(CMakeFindDependencyMacro)

find_dependency(ers)
find_dependency(cetlib)
find_dependency(nlohmann_json)
find_dependency(cppzmq)
find_dependency(logging)
find_dependency(utilities)
Expand Down
18 changes: 12 additions & 6 deletions include/ipm/Receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "cetlib/compiler_macros.h"
#include "ers/Issue.hpp"
#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
#include "nlohmann/json.hpp"
#include "opmonlib/MonitorableObject.hpp"

#include <atomic>
Expand All @@ -40,12 +39,12 @@ namespace dunedaq {
ERS_DECLARE_ISSUE(ipm, KnownStateForbidsReceive, "Receiver not in a state to receive data", )
ERS_DECLARE_ISSUE(ipm,
UnexpectedNumberOfBytes,
"Expected " << bytes1 << " bytes in message but received " << bytes2,
((int)bytes1)((int)bytes2)) // NOLINT
connection_name << ": Expected " << bytes1 << " bytes in message but received " << bytes2,
((std::string)connection_name)((int)bytes1)((int)bytes2)) // NOLINT
ERS_DECLARE_ISSUE(ipm,
ReceiveTimeoutExpired,
"Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)",
((int)timeout)) // NOLINT
connection_name << ": Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)",
((std::string)connection_name)((int)timeout)) // NOLINT
// Reenable coverage collection LCOV_EXCL_STOP
} // namespace dunedaq

Expand Down Expand Up @@ -75,6 +74,12 @@ class Receiver : public opmonlib::MonitorableObject
{

public:
struct ConnectionInfo
{
std::string connection_name{ "" };
std::string connection_string{ "" };
std::vector<std::string> connection_strings{};
};
using duration_t = std::chrono::milliseconds;
static constexpr duration_t s_block = duration_t::max();
static constexpr duration_t s_no_block = duration_t::zero();
Expand All @@ -86,7 +91,7 @@ class Receiver : public opmonlib::MonitorableObject
Receiver() = default;
virtual ~Receiver() = default;

virtual std::string connect_for_receives(const nlohmann::json& connection_info) = 0;
virtual std::string connect_for_receives(const ConnectionInfo& connection_info) = 0;

virtual bool can_receive() const noexcept = 0;

Expand All @@ -113,6 +118,7 @@ class Receiver : public opmonlib::MonitorableObject
Receiver& operator=(Receiver&&) = delete;

protected:
ConnectionInfo m_connection_info;
void generate_opmon_data() override;

virtual Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) = 0;
Expand Down
19 changes: 14 additions & 5 deletions include/ipm/Sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "cetlib/compiler_macros.h"
#include "ers/Issue.hpp"
#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
#include "nlohmann/json.hpp"
#include "opmonlib/MonitorableObject.hpp"

#include <atomic>
Expand All @@ -38,11 +37,14 @@
namespace dunedaq {
// Disable coverage collection LCOV_EXCL_START
ERS_DECLARE_ISSUE(ipm, KnownStateForbidsSend, "Sender not in a state to send data", )
ERS_DECLARE_ISSUE(ipm, NullPointerPassedToSend, "An null pointer to memory was passed to Sender::send", )
ERS_DECLARE_ISSUE(ipm,
NullPointerPassedToSend,
connection_name << ": An null pointer to memory was passed to Sender::send",
((std::string)connection_name))
ERS_DECLARE_ISSUE(ipm,
SendTimeoutExpired,
"Unable to send within timeout period (timeout period was " << timeout << " milliseconds)",
((int)timeout)) // NOLINT
connection_name << ": Unable to send within timeout period (timeout period was " << timeout << " milliseconds)",
((std::string)connection_name)((int)timeout)) // NOLINT

// Reenable coverage collection LCOV_EXCL_STOP
} // namespace dunedaq
Expand Down Expand Up @@ -73,6 +75,12 @@ class Sender : public opmonlib::MonitorableObject
{

public:
struct ConnectionInfo
{
std::string connection_name{ "" };
std::string connection_string{ "inproc://default" };
int capacity{ 0 };
};
using duration_t = std::chrono::milliseconds;
static constexpr duration_t s_block = duration_t::max();
static constexpr duration_t s_no_block = duration_t::zero();
Expand All @@ -82,7 +90,7 @@ class Sender : public opmonlib::MonitorableObject
Sender() = default;
virtual ~Sender() = default;

virtual std::string connect_for_sends(const nlohmann::json& connection_info) = 0;
virtual std::string connect_for_sends(const ConnectionInfo& connection_info) = 0;

virtual bool can_send() const noexcept = 0;

Expand All @@ -104,6 +112,7 @@ class Sender : public opmonlib::MonitorableObject
Sender& operator=(Sender&&) = delete;

protected:
ConnectionInfo m_connection_info;
void generate_opmon_data() override;

virtual bool send_(const void* message,
Expand Down
1 change: 0 additions & 1 deletion include/ipm/Subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "cetlib/BasicPluginFactory.h"
#include "cetlib/compiler_macros.h"
#include "ers/Issue.hpp"
#include "nlohmann/json.hpp"

#include <memory>
#include <string>
Expand Down
40 changes: 21 additions & 19 deletions include/ipm/ZmqContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ namespace dunedaq {
*/
ERS_DECLARE_ISSUE(ipm,
ZmqOperationError,
"An exception occured while calling " << operation << " on the ZMQ " << direction << " socket: "
<< what << " (connection_string: " << connection_string << ")",
((std::string)operation)((std::string)direction)((const char*)what)(
connection_name << ": An exception occured while calling " << operation << " on the ZMQ " << direction
<< " socket: " << what << " (connection_string: " << connection_string << ")",
((std::string)connection_name)((std::string)operation)((std::string)direction)((const char*)what)(
(std::string)connection_string)) // NOLINT
/// @endcond LCOV_EXCL_STOP

Expand All @@ -63,11 +63,12 @@ ERS_DECLARE_ISSUE(ipm,
* @param topic Send topic
* @cond Doxygen doesn't like ERS macros LCOV_EXCL_START
*/
ERS_DECLARE_ISSUE(ipm,
ZmqSendError,
"An exception occurred while sending " << N << " bytes to " << topic << ": " << what,
((const char*)what)((int)N)((std::string)topic)) // NOLINT
/// @endcond LCOV_EXCL_STOP
ERS_DECLARE_ISSUE(
ipm,
ZmqSendError,
connection_name << ": An exception occurred while sending " << N << " bytes to " << topic << ": " << what,
((std::string)connection_name)((const char*)what)((int)N)((std::string)topic)) // NOLINT
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Error indicating that an exception was thrown from ZMQ while receiving
Expand All @@ -77,9 +78,9 @@ ERS_DECLARE_ISSUE(ipm,
*/
ERS_DECLARE_ISSUE(ipm,
ZmqReceiveError,
"An exception occured while receiving " << which << ": " << what,
((const char*)what)((const char*)which)) // NOLINT
/// @endcond LCOV_EXCL_STOP
connection_name << ": An exception occured while receiving " << which << ": " << what,
((std::string)connection_name)((const char*)what)((const char*)which)) // NOLINT
/// @endcond LCOV_EXCL_STOP

/**
* @brief An ERS Error indicating that an exception was thrown from ZMQ during a subscribe
Expand All @@ -89,8 +90,8 @@ ERS_DECLARE_ISSUE(ipm,
*/
ERS_DECLARE_ISSUE(ipm,
ZmqSubscribeError,
"An execption occured while subscribing to " << topic << ": " << what,
((const char*)what)((std::string)topic)) // NOLINT
connection_name << ": An execption occured while subscribing to " << topic << ": " << what,
((std::string)connection_name)((const char*)what)((std::string)topic)) // NOLINT
/// @endcond LCOV_EXCL_STOP

/**
Expand All @@ -101,8 +102,8 @@ ERS_DECLARE_ISSUE(ipm,
*/
ERS_DECLARE_ISSUE(ipm,
ZmqUnsubscribeError,
"An execption occured while unsubscribing from " << topic << ": " << what,
((const char*)what)((std::string)topic)) // NOLINT
connection_name << ": An execption occured while unsubscribing from " << topic << ": " << what,
((std::string)connection_name)((const char*)what)((std::string)topic)) // NOLINT
/// @endcond LCOV_EXCL_STOP

namespace ipm {
Expand Down Expand Up @@ -152,10 +153,11 @@ class ZmqContext
set_context_maxsockets(s_minimum_sockets);
}
}
~ZmqContext() {
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Closing ZMQ Context";
m_context.close();
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "ZMQ Context closed";
~ZmqContext()
{
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Closing ZMQ Context";
m_context.close();
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "ZMQ Context closed";
}
zmq::context_t m_context;
static constexpr int s_minimum_sockets = 16636;
Expand Down
Loading