Skip to content

Commit cbaa72f

Browse files
authored
Merge pull request #116 from DUNE-DAQ/eflumerf/ConnectionNameToIPM
Configure connection name in ipm
2 parents 81b902d + 1a2bbfe commit cbaa72f

23 files changed

+290
-199
lines changed

CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ find_package(utilities REQUIRED)
88
find_package(cetlib REQUIRED)
99
find_package(cppzmq REQUIRED)
1010
find_package(ers REQUIRED)
11-
find_package(nlohmann_json REQUIRED)
1211
find_package(Boost COMPONENTS unit_test_framework program_options REQUIRED)
1312
find_package(opmonlib REQUIRED)
1413

cmake/ipmConfig.cmake.in

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ include(CMakeFindDependencyMacro)
55

66
find_dependency(ers)
77
find_dependency(cetlib)
8-
find_dependency(nlohmann_json)
98
find_dependency(cppzmq)
109
find_dependency(logging)
1110
find_dependency(utilities)

include/ipm/Receiver.hpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include "cetlib/compiler_macros.h"
2828
#include "ers/Issue.hpp"
2929
#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
30-
#include "nlohmann/json.hpp"
3130
#include "opmonlib/MonitorableObject.hpp"
3231

3332
#include <atomic>
@@ -40,12 +39,12 @@ namespace dunedaq {
4039
ERS_DECLARE_ISSUE(ipm, KnownStateForbidsReceive, "Receiver not in a state to receive data", )
4140
ERS_DECLARE_ISSUE(ipm,
4241
UnexpectedNumberOfBytes,
43-
"Expected " << bytes1 << " bytes in message but received " << bytes2,
44-
((int)bytes1)((int)bytes2)) // NOLINT
42+
connection_name << ": Expected " << bytes1 << " bytes in message but received " << bytes2,
43+
((std::string)connection_name)((int)bytes1)((int)bytes2)) // NOLINT
4544
ERS_DECLARE_ISSUE(ipm,
4645
ReceiveTimeoutExpired,
47-
"Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)",
48-
((int)timeout)) // NOLINT
46+
connection_name << ": Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)",
47+
((std::string)connection_name)((int)timeout)) // NOLINT
4948
// Reenable coverage collection LCOV_EXCL_STOP
5049
} // namespace dunedaq
5150

@@ -75,6 +74,12 @@ class Receiver : public opmonlib::MonitorableObject
7574
{
7675

7776
public:
77+
struct ConnectionInfo
78+
{
79+
std::string connection_name{ "" };
80+
std::string connection_string{ "" };
81+
std::vector<std::string> connection_strings{};
82+
};
7883
using duration_t = std::chrono::milliseconds;
7984
static constexpr duration_t s_block = duration_t::max();
8085
static constexpr duration_t s_no_block = duration_t::zero();
@@ -86,7 +91,7 @@ class Receiver : public opmonlib::MonitorableObject
8691
Receiver() = default;
8792
virtual ~Receiver() = default;
8893

89-
virtual std::string connect_for_receives(const nlohmann::json& connection_info) = 0;
94+
virtual std::string connect_for_receives(const ConnectionInfo& connection_info) = 0;
9095

9196
virtual bool can_receive() const noexcept = 0;
9297

@@ -113,6 +118,7 @@ class Receiver : public opmonlib::MonitorableObject
113118
Receiver& operator=(Receiver&&) = delete;
114119

115120
protected:
121+
ConnectionInfo m_connection_info;
116122
void generate_opmon_data() override;
117123

118124
virtual Response receive_(const duration_t& timeout, bool no_tmoexcept_mode) = 0;

include/ipm/Sender.hpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include "cetlib/compiler_macros.h"
2828
#include "ers/Issue.hpp"
2929
#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG<<issue wont work.
30-
#include "nlohmann/json.hpp"
3130
#include "opmonlib/MonitorableObject.hpp"
3231

3332
#include <atomic>
@@ -38,11 +37,14 @@
3837
namespace dunedaq {
3938
// Disable coverage collection LCOV_EXCL_START
4039
ERS_DECLARE_ISSUE(ipm, KnownStateForbidsSend, "Sender not in a state to send data", )
41-
ERS_DECLARE_ISSUE(ipm, NullPointerPassedToSend, "An null pointer to memory was passed to Sender::send", )
40+
ERS_DECLARE_ISSUE(ipm,
41+
NullPointerPassedToSend,
42+
connection_name << ": An null pointer to memory was passed to Sender::send",
43+
((std::string)connection_name))
4244
ERS_DECLARE_ISSUE(ipm,
4345
SendTimeoutExpired,
44-
"Unable to send within timeout period (timeout period was " << timeout << " milliseconds)",
45-
((int)timeout)) // NOLINT
46+
connection_name << ": Unable to send within timeout period (timeout period was " << timeout << " milliseconds)",
47+
((std::string)connection_name)((int)timeout)) // NOLINT
4648

4749
// Reenable coverage collection LCOV_EXCL_STOP
4850
} // namespace dunedaq
@@ -73,6 +75,12 @@ class Sender : public opmonlib::MonitorableObject
7375
{
7476

7577
public:
78+
struct ConnectionInfo
79+
{
80+
std::string connection_name{ "" };
81+
std::string connection_string{ "inproc://default" };
82+
int capacity{ 0 };
83+
};
7684
using duration_t = std::chrono::milliseconds;
7785
static constexpr duration_t s_block = duration_t::max();
7886
static constexpr duration_t s_no_block = duration_t::zero();
@@ -82,7 +90,7 @@ class Sender : public opmonlib::MonitorableObject
8290
Sender() = default;
8391
virtual ~Sender() = default;
8492

85-
virtual std::string connect_for_sends(const nlohmann::json& connection_info) = 0;
93+
virtual std::string connect_for_sends(const ConnectionInfo& connection_info) = 0;
8694

8795
virtual bool can_send() const noexcept = 0;
8896

@@ -104,6 +112,7 @@ class Sender : public opmonlib::MonitorableObject
104112
Sender& operator=(Sender&&) = delete;
105113

106114
protected:
115+
ConnectionInfo m_connection_info;
107116
void generate_opmon_data() override;
108117

109118
virtual bool send_(const void* message,

include/ipm/Subscriber.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
#include "cetlib/BasicPluginFactory.h"
3232
#include "cetlib/compiler_macros.h"
3333
#include "ers/Issue.hpp"
34-
#include "nlohmann/json.hpp"
3534

3635
#include <memory>
3736
#include <string>

include/ipm/ZmqContext.hpp

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ namespace dunedaq {
5050
*/
5151
ERS_DECLARE_ISSUE(ipm,
5252
ZmqOperationError,
53-
"An exception occured while calling " << operation << " on the ZMQ " << direction << " socket: "
54-
<< what << " (connection_string: " << connection_string << ")",
55-
((std::string)operation)((std::string)direction)((const char*)what)(
53+
connection_name << ": An exception occured while calling " << operation << " on the ZMQ " << direction
54+
<< " socket: " << what << " (connection_string: " << connection_string << ")",
55+
((std::string)connection_name)((std::string)operation)((std::string)direction)((const char*)what)(
5656
(std::string)connection_string)) // NOLINT
5757
/// @endcond LCOV_EXCL_STOP
5858

@@ -63,11 +63,12 @@ ERS_DECLARE_ISSUE(ipm,
6363
* @param topic Send topic
6464
* @cond Doxygen doesn't like ERS macros LCOV_EXCL_START
6565
*/
66-
ERS_DECLARE_ISSUE(ipm,
67-
ZmqSendError,
68-
"An exception occurred while sending " << N << " bytes to " << topic << ": " << what,
69-
((const char*)what)((int)N)((std::string)topic)) // NOLINT
70-
/// @endcond LCOV_EXCL_STOP
66+
ERS_DECLARE_ISSUE(
67+
ipm,
68+
ZmqSendError,
69+
connection_name << ": An exception occurred while sending " << N << " bytes to " << topic << ": " << what,
70+
((std::string)connection_name)((const char*)what)((int)N)((std::string)topic)) // NOLINT
71+
/// @endcond LCOV_EXCL_STOP
7172

7273
/**
7374
* @brief An ERS Error indicating that an exception was thrown from ZMQ while receiving
@@ -77,9 +78,9 @@ ERS_DECLARE_ISSUE(ipm,
7778
*/
7879
ERS_DECLARE_ISSUE(ipm,
7980
ZmqReceiveError,
80-
"An exception occured while receiving " << which << ": " << what,
81-
((const char*)what)((const char*)which)) // NOLINT
82-
/// @endcond LCOV_EXCL_STOP
81+
connection_name << ": An exception occured while receiving " << which << ": " << what,
82+
((std::string)connection_name)((const char*)what)((const char*)which)) // NOLINT
83+
/// @endcond LCOV_EXCL_STOP
8384

8485
/**
8586
* @brief An ERS Error indicating that an exception was thrown from ZMQ during a subscribe
@@ -89,8 +90,8 @@ ERS_DECLARE_ISSUE(ipm,
8990
*/
9091
ERS_DECLARE_ISSUE(ipm,
9192
ZmqSubscribeError,
92-
"An execption occured while subscribing to " << topic << ": " << what,
93-
((const char*)what)((std::string)topic)) // NOLINT
93+
connection_name << ": An execption occured while subscribing to " << topic << ": " << what,
94+
((std::string)connection_name)((const char*)what)((std::string)topic)) // NOLINT
9495
/// @endcond LCOV_EXCL_STOP
9596

9697
/**
@@ -101,8 +102,8 @@ ERS_DECLARE_ISSUE(ipm,
101102
*/
102103
ERS_DECLARE_ISSUE(ipm,
103104
ZmqUnsubscribeError,
104-
"An execption occured while unsubscribing from " << topic << ": " << what,
105-
((const char*)what)((std::string)topic)) // NOLINT
105+
connection_name << ": An execption occured while unsubscribing from " << topic << ": " << what,
106+
((std::string)connection_name)((const char*)what)((std::string)topic)) // NOLINT
106107
/// @endcond LCOV_EXCL_STOP
107108

108109
namespace ipm {
@@ -152,10 +153,11 @@ class ZmqContext
152153
set_context_maxsockets(s_minimum_sockets);
153154
}
154155
}
155-
~ZmqContext() {
156-
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Closing ZMQ Context";
157-
m_context.close();
158-
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "ZMQ Context closed";
156+
~ZmqContext()
157+
{
158+
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Closing ZMQ Context";
159+
m_context.close();
160+
TLOG_DEBUG(TLVL_ZMQCONTEXT) << "ZMQ Context closed";
159161
}
160162
zmq::context_t m_context;
161163
static constexpr int s_minimum_sockets = 16636;

0 commit comments

Comments
 (0)