Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 89 additions & 8 deletions plugins/FragmentAggregatorModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

#include "FragmentAggregatorModule.hpp"
#include "dfmodules/CommonIssues.hpp"
#include "dfmodules/opmon/FragmentAggregatorModule.pb.h"

#include "appmodel/FragmentAggregatorModule.hpp"
#include "confmodel/Connection.hpp"
#include "confmodel/QueueWithSourceId.hpp"
#include "daqdataformats/FragmentHeader.hpp"
#include "dfmessages/Fragment_serialization.hpp"
#include "logging/Logging.hpp"

Expand Down Expand Up @@ -64,19 +66,60 @@ FragmentAggregatorModule::init(std::shared_ptr<appfwk::ConfigurationManager> mcf
iom->get_receiver<dfmessages::DataRequest>(m_data_req_input);
}

// void
// FragmentAggregatorModule::get_info(opmonlib::InfoCollector& /*ci*/, int /* level */)
// {
// // dummyconsumerinfo::Info info;
// // info.packets_processed = m_packets_processed;
void
FragmentAggregatorModule::generate_opmon_data()
{
if (m_data_requests_processed > 0) {
opmon::FragmentAggregatorTimeInfo dr_times;
dr_times.set_min_us(m_data_requests_time_min_us.exchange(std::numeric_limits<uint64_t>::max()));
dr_times.set_max_us(m_data_requests_time_max_us.exchange(0));
dr_times.set_average_us(m_data_requests_time_average_us.exchange(0) / m_data_requests_processed);
this->publish(std::move(dr_times), { { "data", "DataRequest" } });
}

if (m_fragments_processed > 0) {
opmon::FragmentAggregatorTimeInfo frag_times;
frag_times.set_min_us(m_fragments_time_min_us.exchange(std::numeric_limits<uint64_t>::max()));
frag_times.set_max_us(m_fragments_time_max_us.exchange(0));
frag_times.set_average_us(m_fragments_time_average_us.exchange(0) / m_fragments_processed);
this->publish(std::move(frag_times), { { "data", "Fragment" } });
}

opmon::FADataRequestsCounterInfo dr_info;
dr_info.set_data_requests_received(m_data_requests_received.exchange(0));
dr_info.set_data_requests_processed(m_data_requests_processed.exchange(0));
dr_info.set_data_requests_failed(m_data_requests_failed.load()); //the failed counters are meant NOT to reset
this->publish(std::move(dr_info));

// // ci.add(info);
// }
opmon::FAFragmentsCounterInfo frag_info;
frag_info.set_fragments_received(m_fragments_received.exchange(0));
frag_info.set_fragments_processed(m_fragments_processed.exchange(0));
frag_info.set_fragments_failed(m_fragments_failed.load());
frag_info.set_fragments_empty(m_fragments_empty.exchange(0));
frag_info.set_fragments_incomplete(m_fragments_incomplete.exchange(0));
frag_info.set_fragments_invalid(m_fragments_invalid.exchange(0));
this->publish(std::move(frag_info));
}

void
FragmentAggregatorModule::do_start(const data_t& /* args */)
{
m_packets_processed = 0;

m_data_requests_received.store(0);
m_data_requests_processed.store(0);
m_data_requests_failed.store(0);
m_fragments_received.store(0);
m_fragments_processed.store(0);
m_fragments_failed.store(0);
m_fragments_empty.store(0);
m_fragments_incomplete.store(0);
m_fragments_invalid.store(0);
m_fragments_time_average_us.store(0);
m_fragments_time_min_us.store(std::numeric_limits<uint64_t>::max());
m_fragments_time_max_us.store(0);
m_data_requests_time_average_us.store(0);
m_data_requests_time_min_us.store(std::numeric_limits<uint64_t>::max());
m_data_requests_time_max_us.store(0);

// 19-Dec-2024, KAB: check that Fragment senders are ready to send. This is done so
// that the IOManager infrastructure fetches the necessary connection details from
Expand Down Expand Up @@ -113,6 +156,10 @@ FragmentAggregatorModule::process_data_request(dfmessages::DataRequest& data_req

{
std::scoped_lock lock(m_mutex);

m_timestamp_before_dr = get_current_time_us();
m_data_requests_received++;

std::tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID> triplet = {
data_request.trigger_number, data_request.sequence_number, data_request.request_information.component
};
Expand All @@ -133,9 +180,16 @@ FragmentAggregatorModule::process_data_request(dfmessages::DataRequest& data_req
auto sender = get_iom_sender<dfmessages::DataRequest>(uid_elem->second);
data_request.data_destination = m_fragment_input;
sender->send(std::move(data_request), iomanager::Sender::s_no_block);

m_data_requests_processed++;
auto timestamp_total = get_current_time_us() - m_timestamp_before_dr;
if (timestamp_total < m_data_requests_time_min_us) { m_data_requests_time_min_us = timestamp_total; }
if (timestamp_total > m_data_requests_time_max_us) { m_data_requests_time_max_us = timestamp_total; }
m_data_requests_time_average_us += timestamp_total;
}
} catch (const ers::Issue& excpt) {
ers::warning(excpt);
m_data_requests_failed++;
}
}

Expand All @@ -146,6 +200,18 @@ FragmentAggregatorModule::process_fragment(std::unique_ptr<daqdataformats::Fragm
std::string trb_identifier;
{
std::scoped_lock lock(m_mutex);

m_timestamp_before_frag = get_current_time_us();
m_fragments_received++;

std::bitset<32> error_bits = fragment->get_error_bits();
if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kDataNotFound)])
m_fragments_empty++;
if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kIncomplete)])
m_fragments_incomplete++;
if (error_bits[static_cast<size_t>(dunedaq::daqdataformats::FragmentErrorBits::kInvalidWindow)])
m_fragments_invalid++;

auto dr_iter = m_data_req_map.find(
std::make_tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID>(
fragment->get_trigger_number(), fragment->get_sequence_number(), fragment->get_element_id()));
Expand All @@ -166,16 +232,31 @@ FragmentAggregatorModule::process_fragment(std::unique_ptr<daqdataformats::Fragm
<< trb_identifier;
auto sender = get_iom_sender<std::unique_ptr<daqdataformats::Fragment>>(trb_identifier);
sender->send(std::move(fragment), iomanager::Sender::s_no_block);

m_fragments_processed++;
auto timestamp_total = get_current_time_us() - m_timestamp_before_frag;
if (timestamp_total < m_fragments_time_min_us) { m_fragments_time_min_us = timestamp_total; }
if (timestamp_total > m_fragments_time_max_us) { m_fragments_time_max_us = timestamp_total; }
m_fragments_time_average_us += timestamp_total;

} catch (const ers::Issue& excpt) {
ers::error(AbandonedFragment(ERS_HERE,
fragment->get_run_number(),
fragment->get_trigger_number(),
fragment->get_sequence_number(),
fragment->get_element_id(),
excpt));
m_fragments_failed++;
}
}

uint64_t
FragmentAggregatorModule::get_current_time_us()
{
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}

} // namespace dfmodules
} // namespace dunedaq

Expand Down
26 changes: 22 additions & 4 deletions plugins/FragmentAggregatorModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class FragmentAggregatorModule : public dunedaq::appfwk::DAQModule
FragmentAggregatorModule& operator=(FragmentAggregatorModule&&) = delete;

void init(std::shared_ptr<appfwk::ConfigurationManager> mcfg) override;
// void get_info(opmonlib::InfoCollector& ci, int level) override;
void generate_opmon_data() override;

private:
// Commands
Expand All @@ -72,14 +72,32 @@ class FragmentAggregatorModule : public dunedaq::appfwk::DAQModule
void process_data_request(dfmessages::DataRequest&);
void process_fragment(std::unique_ptr<daqdataformats::Fragment>&);

// Input and Output Connection namess
// Input and Output Connection names
std::string m_data_req_input;
std::string m_fragment_input;
std::map<int, std::string> m_producer_conn_ids;
std::vector<std::string> m_trb_conn_ids;

// Stats
std::atomic<int> m_packets_processed{ 0 };
// Opmon
uint64_t get_current_time_us();
uint64_t m_timestamp_before_dr;
uint64_t m_timestamp_before_frag;
using metric_counter_type = uint64_t;
std::atomic<metric_counter_type> m_data_requests_received{ 0 };
std::atomic<metric_counter_type> m_data_requests_processed{ 0 };
std::atomic<metric_counter_type> m_data_requests_failed{ 0 };
std::atomic<metric_counter_type> m_fragments_received{ 0 };
std::atomic<metric_counter_type> m_fragments_processed{ 0 };
std::atomic<metric_counter_type> m_fragments_failed{ 0 };
std::atomic<metric_counter_type> m_fragments_empty{ 0 };
std::atomic<metric_counter_type> m_fragments_incomplete{ 0 };
std::atomic<metric_counter_type> m_fragments_invalid{ 0 };
std::atomic<metric_counter_type> m_fragments_time_average_us{ 0 };
std::atomic<metric_counter_type> m_fragments_time_min_us{ 0 };
std::atomic<metric_counter_type> m_fragments_time_max_us{ 0 };
std::atomic<metric_counter_type> m_data_requests_time_average_us{ 0 };
std::atomic<metric_counter_type> m_data_requests_time_min_us{ 0 };
std::atomic<metric_counter_type> m_data_requests_time_max_us{ 0 };

// TRB tracking
std::map<std::tuple<dfmessages::trigger_number_t, dfmessages::sequence_number_t, daqdataformats::SourceID>,
Expand Down
30 changes: 30 additions & 0 deletions schema/dfmodules/opmon/FragmentAggregatorModule.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
syntax = "proto3";

package dunedaq.dfmodules.opmon;

message FADataRequestsCounterInfo {

uint64 data_requests_received = 1;
uint64 data_requests_processed = 2;
uint64 data_requests_failed = 3;

}

message FAFragmentsCounterInfo {

uint64 fragments_received = 1;
uint64 fragments_processed = 2;
uint64 fragments_failed = 3;
uint64 fragments_empty = 4;
uint64 fragments_incomplete = 5;
uint64 fragments_invalid = 6;

}

message FragmentAggregatorTimeInfo {

uint64 average_us = 1;
uint64 min_us = 2;
uint64 max_us = 3;

}