Skip to content

Commit 4d688f6

Browse files
authored
Merge pull request #36 from DUNE-DAQ/eflumerf/DMCRConfiguration
Configuration objects for DataMoveCallbackRegistry
2 parents 06fa772 + c1bc348 commit 4d688f6

File tree

6 files changed

+48
-243
lines changed

6 files changed

+48
-243
lines changed

plugins/SocketReaderModule.cpp

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -125,34 +125,11 @@ SocketReaderModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcf
125125
}
126126
}
127127

128-
if (mdal->get_outputs().empty()) {
129-
auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
130-
"No outputs defined for socket reader in configuration.");
131-
ers::fatal(err);
132-
throw err;
133-
}
134-
135-
for (auto* con : mdal->get_outputs()) {
136-
auto* queue = con->cast<confmodel::QueueWithSourceId>();
137-
if (queue == nullptr) {
138-
auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId.");
139-
ers::fatal(err);
140-
throw err;
141-
}
142-
143-
// Check for CB prefix indicating Callback use
144-
const char delim = '_';
145-
const std::string target = queue->UID();
146-
std::vector<std::string> words;
147-
tokenize(target, delim, words);
148-
149-
bool callback_mode = false;
150-
if (words.front() == "cb") {
151-
callback_mode = true;
152-
}
128+
auto callback_confs = mdal->get_raw_data_callbacks();
129+
for (auto* callback_conf : callback_confs) {
153130

154-
auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode);
155-
register_node(queue->UID(), ptr);
131+
auto ptr = m_sources[callback_conf->get_source_id()] = createSourceModel(callback_conf);
132+
register_node(callback_conf->UID(), ptr);
156133
}
157134
}
158135

plugins/SocketWriterModule.cpp

Lines changed: 11 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -42,64 +42,14 @@ SocketWriterModule::SocketWriterModule(const std::string& name)
4242
register_command("stop_trigger_sources", &SocketWriterModule::do_stop);
4343
}
4444

45-
void
46-
SocketWriterModule::get_dal_inputs(const dunedaq::appmodel::SocketDataWriterModule* mdal)
47-
{
48-
if (mdal->get_inputs().empty()) {
49-
auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE,
50-
"No inputs defined for socket writer in configuration.");
51-
ers::fatal(err);
52-
throw err;
53-
}
54-
55-
for (auto* input : mdal->get_inputs()) {
56-
m_raw_data_receiver_connection_name = input->UID();
57-
// Parse for prefix
58-
std::string conn_name = input->UID();
59-
const char delim = '_';
60-
std::vector<std::string> words;
61-
std::size_t start;
62-
std::size_t end = 0;
63-
while ((start = conn_name.find_first_not_of(delim, end)) != std::string::npos) {
64-
end = conn_name.find(delim, start);
65-
words.push_back(conn_name.substr(start, end - start));
66-
}
67-
68-
TLOG_DEBUG() << "Initialize connection based on uid: "
69-
<< m_raw_data_receiver_connection_name << " front word: " << words.front();
70-
71-
std::string cb_prefix("cb");
72-
if (words.front() == cb_prefix) {
73-
m_callback_mode = true;
74-
}
75-
76-
if (!m_callback_mode) {
77-
const auto recv_timeout_ms = input->get_recv_timeout_ms();
78-
if (recv_timeout_ms == 0) {
79-
ers::warning(InvalidRawReceiverTimeout(ERS_HERE, m_raw_receiver_timeout_ms.count()));
80-
} else {
81-
m_raw_receiver_timeout_ms = std::chrono::milliseconds(recv_timeout_ms);
82-
}
83-
}
84-
85-
auto* queue = input->cast<confmodel::QueueWithSourceId>();
86-
if (queue == nullptr) {
87-
auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Inputs are not of type QueueWithGeoId.");
88-
ers::fatal(err);
89-
throw err;
90-
}
91-
92-
m_raw_data_receiver = createGenericReceiver(queue->UID(), m_raw_data_receiver_connection_name); // FIXME (DTE): Overwriting doesn't make sense
93-
}
94-
}
95-
9645
void
9746
SocketWriterModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcfg)
9847
{
9948
m_cfg = mcfg;
10049
auto* mdal = m_cfg->get_dal<appmodel::SocketDataWriterModule>(get_name());
10150
auto* module_conf = mdal->get_configuration()->cast<appmodel::SocketWriterConf>();
10251

52+
m_callback_conf = mdal->get_raw_data_callback();
10353
const auto remote_ip = module_conf->get_remote_ip();
10454

10555
m_socket_type = string_to_socket_type(module_conf->get_socket_type());
@@ -113,7 +63,7 @@ SocketWriterModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcf
11363
}
11464

11565
for (auto* nw_sender : d2d_conn->get_net_senders()) {
116-
66+
11767
if (nw_sender->is_disabled(*(m_cfg->get_session()))) {
11868
continue;
11969
}
@@ -148,13 +98,11 @@ SocketWriterModule::init(const std::shared_ptr<appfwk::ConfigurationManager> mcf
14898
}
14999
}
150100

151-
get_dal_inputs(mdal);
152-
153101
// Raw input connection sensibility check
154-
if (!m_callback_mode && m_raw_data_receiver == nullptr) {
155-
TLOG() << "Non callback mode, and receiver is unset!";
156-
//ers::error(ConfigurationError(ERS_HERE, m_sourceid, "Non callback mode, and receiver is unset!"));
157-
}
102+
if (m_callback_conf == nullptr) {
103+
TLOG() << "No callback configuration given!";
104+
//ers::error(ConfigurationError(ERS_HERE, m_sourceid, No callback configuration given!"));
105+
}
158106
}
159107

160108
SocketWriterModule::SocketType
@@ -168,26 +116,6 @@ SocketWriterModule::string_to_socket_type(const std::string& socket_type) const
168116
return SocketWriterModule::SocketType::INVALID;
169117
}
170118

171-
void
172-
SocketWriterModule::run_consume()
173-
{
174-
TLOG() << "Consumer thread started..."; // TODO (DTE): Make debug logs
175-
176-
while (m_run_marker.load()) {
177-
// Try to acquire data
178-
179-
if (auto opt_payload = m_raw_data_receiver->try_receive(m_raw_receiver_timeout_ms)) {
180-
consume_payload(std::move(*opt_payload));
181-
} else {
182-
for (const auto& writer_config : m_writer_configs) {
183-
++writer_config.socket_stats->rawq_timeout_count;
184-
}
185-
}
186-
}
187-
188-
TLOG() << "Consumer thread joins... ";
189-
}
190-
191119
void
192120
SocketWriterModule::consume_payload(GenericReceiverConcept::TypeErasedPayload payload)
193121
{
@@ -201,15 +129,12 @@ SocketWriterModule::consume_payload(GenericReceiverConcept::TypeErasedPayload pa
201129
void
202130
SocketWriterModule::do_configure(const CommandData_t&)
203131
{
204-
// Register callbacks if operating in that mode.
205-
if (m_callback_mode) {
206132
// Configure and register consume callback
207133
m_consume_callback = std::bind(&SocketWriterModule::consume_payload, this, std::placeholders::_1);
208-
134+
209135
// Register callback
210136
auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get();
211-
dmcbr->register_callback<GenericReceiverConcept::TypeErasedPayload>(m_raw_data_receiver_connection_name, m_consume_callback);
212-
}
137+
dmcbr->register_callback<GenericReceiverConcept::TypeErasedPayload>(m_callback_conf, m_consume_callback);
213138

214139
for (std::size_t i = 0; i < m_writers.size(); ++i) {
215140
const auto& writer_config = m_writer_configs[i];
@@ -229,26 +154,14 @@ SocketWriterModule::do_start(const CommandData_t&)
229154
writer_config.socket_stats->stats_packet_count = 0;
230155
}
231156

232-
m_t0 = std::chrono::high_resolution_clock::now();
233-
234-
if (!m_callback_mode) {
235-
m_run_marker.store(true);
236-
m_consumer_thread.set_work(&SocketWriterModule::run_consume, this);
237-
}
157+
m_t0 = std::chrono::steady_clock::now();
238158

239159
m_io_thread = std::jthread([this] { m_io_context.run(); });
240160
}
241161

242162
void
243163
SocketWriterModule::do_stop(const CommandData_t&)
244164
{
245-
if (!m_callback_mode) {
246-
m_run_marker.store(false);
247-
while (!m_consumer_thread.get_readiness()) {
248-
std::this_thread::sleep_for(std::chrono::milliseconds(10));
249-
}
250-
}
251-
252165
for (auto& writer : m_writers) {
253166
std::visit([](auto& writer) { writer.stop(); }, writer);
254167
}
@@ -266,7 +179,7 @@ SocketWriterModule::generate_opmon_data()
266179
stats.set_sum_bytes(writer_config.socket_stats->sum_bytes.load());
267180
stats.set_num_data_input_timeouts(writer_config.socket_stats->rawq_timeout_count.exchange(0));
268181

269-
auto now = std::chrono::high_resolution_clock::now();
182+
auto now = std::chrono::steady_clock::now();
270183
int new_packets = writer_config.socket_stats->stats_packet_count.exchange(0);
271184
double seconds = std::chrono::duration_cast<std::chrono::microseconds>(now - m_t0).count() / 1000000.;
272185
m_t0 = now;
@@ -341,7 +254,7 @@ SocketWriterModule::UDPWriter::start(GenericReceiverConcept::TypeErasedPayload p
341254
++m_writer_config.socket_stats->num_payloads;
342255
++m_writer_config.socket_stats->sum_payloads;
343256
m_writer_config.socket_stats->sum_bytes.fetch_add(bytes_sent);
344-
++m_writer_config.socket_stats->stats_packet_count;
257+
++m_writer_config.socket_stats->stats_packet_count;
345258
}
346259

347260
void

plugins/SocketWriterModule.hpp

Lines changed: 17 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -70,29 +70,29 @@ class SocketWriterModule : public dunedaq::appfwk::DAQModule
7070
std::atomic<uint64_t> sum_payloads{ 0 };
7171

7272
/**
73-
* @brief Incremental number of received payloads
73+
* @brief Incremental number of received payloads
7474
*/
75-
std::atomic<uint64_t> num_payloads{ 0 };
75+
std::atomic<uint64_t> num_payloads{ 0 };
7676

7777
/**
78-
* @brief Total number of received bytes
78+
* @brief Total number of received bytes
7979
*/
8080
std::atomic<uint64_t> sum_bytes{ 0 };
8181

8282
/**
83-
* @brief Timeout on data inputs
83+
* @brief Timeout on data inputs
8484
*/
85-
std::atomic<uint64_t> rawq_timeout_count{ 0 };
85+
std::atomic<uint64_t> rawq_timeout_count{ 0 };
8686

8787
/**
88-
* @brief Rate of consumed packets
88+
* @brief Rate of consumed packets
8989
*/
9090
std::atomic<double> rate_payloads_consumed{ 0 };
91-
91+
9292
/**
9393
* @brief Counts packets since last opmon data generation
9494
*/
95-
std::atomic<int> stats_packet_count{ 0 };
95+
std::atomic<int> stats_packet_count{ 0 };
9696
};
9797

9898
struct WriterConfig
@@ -196,22 +196,11 @@ class SocketWriterModule : public dunedaq::appfwk::DAQModule
196196
*/
197197
SocketType string_to_socket_type(const std::string& socket_type) const;
198198

199-
/**
200-
* @brief Gets dal inputs
201-
* @param mdal SocketDataWriterModule dal
202-
*/
203-
void get_dal_inputs(const dunedaq::appmodel::SocketDataWriterModule* mdal);
204-
205-
/**
206-
* @brief Raw data consume thread function
207-
*/
208-
void run_consume();
209-
210199
/**
211200
* @brief Raw data consume callback function
212201
* @param payload Consumed data
213-
*/
214-
void consume_payload(GenericReceiverConcept::TypeErasedPayload payload);
202+
*/
203+
void consume_payload(GenericReceiverConcept::TypeErasedPayload payload);
215204

216205
/**
217206
* @brief I/O context for socket operations
@@ -249,48 +238,21 @@ class SocketWriterModule : public dunedaq::appfwk::DAQModule
249238
std::shared_ptr<appfwk::ConfigurationManager> m_cfg;
250239

251240
/**
252-
* @brief Whether callback mode is configured
253-
*/
254-
bool m_callback_mode{ false };
255-
256-
// RAW RECEIVER
257-
/**
258-
* @brief Generic raw data receiver
259-
*/
260-
std::shared_ptr<GenericReceiverConcept> m_raw_data_receiver;
261-
262-
/**
263-
* @brief Raw data receiver timeout
264-
*/
265-
std::chrono::milliseconds m_raw_receiver_timeout_ms{ raw_receiver_timeout_ms };
266-
267-
/**
268-
* @brief Raw data receiver UID
269-
*/
270-
std::string m_raw_data_receiver_connection_name;
271-
272-
// CONSUMER
273-
/**
274-
* @brief Raw data consume thread
275-
*/
276-
utilities::ReusableThread m_consumer_thread;
277-
278-
/**
279-
* @brief Whether consumer thread should continue
280-
*/
281-
std::atomic<bool> m_run_marker { false };
241+
* @brief Configuration object for the callbacks
242+
*/
243+
const appmodel::DataMoveCallbackConf* m_callback_conf;
282244

283245
// Consume callback
284246
/**
285247
* @brief Raw data consume callback
286-
*/
287-
std::function<void(GenericReceiverConcept::TypeErasedPayload payload)> m_consume_callback;
248+
*/
249+
std::function<void(GenericReceiverConcept::TypeErasedPayload payload)> m_consume_callback;
288250

289251
// RUN START T0
290252
/**
291253
* @brief Timestamp used to measure time between opmon reports
292-
*/
293-
std::chrono::time_point<std::chrono::high_resolution_clock> m_t0;
254+
*/
255+
std::chrono::time_point<std::chrono::steady_clock> m_t0;
294256
};
295257

296258
} // namespace dunedaq::asiolibs

0 commit comments

Comments
 (0)