|
| 1 | +// This file reads out DTCs that are NOT in HW Event-building mode |
| 2 | +// It can be used as an exmaple for developing more specific functionality. |
| 3 | + |
| 4 | +#include "artdaq-core-mu2e/Overlays/FragmentType.hh" |
| 5 | +#include "dtcInterfaceLib/DTC.h" |
| 6 | +#include "cfoInterfaceLib/CFO.h" |
| 7 | +#include "dtcInterfaceLib/DTCSoftwareCFO.h" |
| 8 | + |
| 9 | +#include "artdaq-core/Data/ContainerFragmentLoader.hh" |
| 10 | +#include "artdaq-core/Data/MetadataFragment.hh" |
| 11 | +#include "artdaq/DAQdata/Globals.hh" |
| 12 | +#include "artdaq/Generators/GeneratorMacros.hh" |
| 13 | + |
| 14 | +#include <fstream> |
| 15 | + |
| 16 | +#include "trace.h" |
| 17 | +#define TRACE_NAME "CFODataReceiver" |
| 18 | + |
| 19 | +namespace mu2e { |
| 20 | + class CFODataReceiver : public artdaq::CommandableFragmentGenerator |
| 21 | + { |
| 22 | + public: |
| 23 | + explicit CFODataReceiver(fhicl::ParameterSet const& ps); |
| 24 | + virtual ~CFODataReceiver(); |
| 25 | + |
| 26 | + DTCLib::DTC_SimMode GetMode() { return mode_; } |
| 27 | + |
| 28 | + private: |
| 29 | + bool getNextDTCFragment(artdaq::FragmentPtrs& output, DTCLib::DTC_EventWindowTag ts); |
| 30 | + |
| 31 | + void start() override; |
| 32 | + |
| 33 | + void stopNoMutex() override {} |
| 34 | + |
| 35 | + void stop() override; |
| 36 | + |
| 37 | + size_t getCurrentSequenceID(); |
| 38 | + |
| 39 | + // Like "getNext_", "fragmentIDs_" is a mandatory override; it |
| 40 | + // returns a vector of the fragment IDs an instance of this class |
| 41 | + // is responsible for (in the case of CFODataReceiver, this is just |
| 42 | + // the fragment_id_ variable declared in the parent |
| 43 | + // CommandableFragmentGenerator class) |
| 44 | + |
| 45 | + std::vector<artdaq::Fragment::fragment_id_t> fragmentIDs_() { return fragment_ids_; } |
| 46 | + |
| 47 | + std::vector<artdaq::Fragment::fragment_id_t> fragment_ids_; |
| 48 | + |
| 49 | + // State |
| 50 | + DTCLib::DTC_SimMode mode_; //!=0 is simulation mode |
| 51 | + const bool skip_cfo_init_; |
| 52 | + bool print_packets_; |
| 53 | + |
| 54 | + size_t first_timestamp_seen_{size_t(-1)}, last_fragment_timestamp_{size_t(-1)}; |
| 55 | + |
| 56 | + std::unique_ptr<CFOLib::CFO> theCFO_; |
| 57 | + // std::unique_ptr<DTCLib::DTCSoftwareCFO> theCFO_; |
| 58 | + |
| 59 | + std::size_t const throttle_usecs_; |
| 60 | + std::size_t const rollover_subrun_interval_; |
| 61 | + std::condition_variable throttle_cv_; |
| 62 | + std::mutex throttle_mutex_; |
| 63 | + int diagLevel_; |
| 64 | + // The "getNext_" function is used to implement user-specific |
| 65 | + // functionality; it's a mandatory override of the pure virtual |
| 66 | + // getNext_ function declared in CommandableFragmentGenerator |
| 67 | + |
| 68 | + bool getNext_(artdaq::FragmentPtrs& output) override; |
| 69 | + DTCLib::DTC_EventWindowTag getCurrentEventWindowTag(); |
| 70 | + }; |
| 71 | +} // namespace mu2e |
| 72 | + |
| 73 | +mu2e::CFODataReceiver::~CFODataReceiver() |
| 74 | +{ |
| 75 | +} |
| 76 | + |
| 77 | +bool mu2e::CFODataReceiver::getNext_(artdaq::FragmentPtrs& frags) |
| 78 | +{ |
| 79 | + TLOG(TLVL_TRACE + 30) << "getNext_"; |
| 80 | + while (!should_stop()) |
| 81 | + { |
| 82 | + TLOG(TLVL_TRACE + 31) << "Sleeping..."; |
| 83 | + usleep(5000); |
| 84 | + } |
| 85 | + |
| 86 | + if(throttle_usecs_ > 0) |
| 87 | + { |
| 88 | + TLOG(TLVL_TRACE + 32) << "Throttling... " << throttle_usecs_; |
| 89 | + std::unique_lock<std::mutex> throttle_lock(throttle_mutex_); |
| 90 | + throttle_cv_.wait_for(throttle_lock, std::chrono::microseconds(throttle_usecs_), [&]() { return should_stop(); }); |
| 91 | + } |
| 92 | + |
| 93 | + if (should_stop()) |
| 94 | + { |
| 95 | + TLOG(TLVL_TRACE + 33) << "Stopping."; |
| 96 | + return false; |
| 97 | + } |
| 98 | + |
| 99 | + uint64_t z = 0; |
| 100 | + DTCLib::DTC_EventWindowTag zero(z); |
| 101 | + |
| 102 | + //-------------------------------------------------------------------------------- |
| 103 | + // temporary sub-run transition |
| 104 | + //-------------------------------------------------------------------------------- |
| 105 | + if (rollover_subrun_interval_ > 0 && ev_counter() % rollover_subrun_interval_ == 0 && fragment_id() ==0 ) |
| 106 | + { |
| 107 | + auto endOfSubrunFrag = artdaq::MetadataFragment::CreateEndOfSubrunFragment(my_rank, ev_counter() + 1, 1 + (ev_counter() / rollover_subrun_interval_), 0); |
| 108 | + frags.emplace_back(std::move(endOfSubrunFrag)); |
| 109 | + } |
| 110 | + |
| 111 | + TLOG(TLVL_TRACE + 34) << "getNext_ req"; |
| 112 | + auto start_time = std::chrono::steady_clock::now(); |
| 113 | + bool retVal = true; |
| 114 | + do |
| 115 | + { |
| 116 | + retVal = getNextDTCFragment(frags, zero); |
| 117 | + TLOG(TLVL_TRACE + 35) << "getNext_ req retry? " << retVal << " " << frags.size(); |
| 118 | + } while (1 && retVal && frags.size() < 900 && |
| 119 | + artdaq::TimeUtils::GetElapsedTimeMicroseconds(start_time) < 100000 /* 100 ms */); |
| 120 | + TLOG(TLVL_TRACE + 36) << "getNext_ req done" << retVal << " " << frags.size(); |
| 121 | + |
| 122 | + return retVal; |
| 123 | +} //end getNext_() |
| 124 | + |
| 125 | +DTCLib::DTC_EventWindowTag mu2e::CFODataReceiver::getCurrentEventWindowTag() |
| 126 | +{ |
| 127 | + if (first_timestamp_seen_ != size_t(-1)) |
| 128 | + { |
| 129 | + return DTCLib::DTC_EventWindowTag(getCurrentSequenceID() + first_timestamp_seen_); |
| 130 | + } |
| 131 | + |
| 132 | + return DTCLib::DTC_EventWindowTag(uint64_t(0)); |
| 133 | +} |
| 134 | + |
| 135 | +mu2e::CFODataReceiver::CFODataReceiver(fhicl::ParameterSet const& ps) |
| 136 | + : CommandableFragmentGenerator(ps) |
| 137 | + , fragment_ids_{static_cast<artdaq::Fragment::fragment_id_t>(fragment_id())} |
| 138 | + , mode_ (DTCLib::DTC_SimModeConverter::ConvertToSimMode(ps.get<std::string>("sim_mode", "Disabled"))) |
| 139 | + , skip_cfo_init_ (ps.get<bool> ("skip_cfo_init", false)) |
| 140 | + , print_packets_ (ps.get<bool> ("debug_print", false)) |
| 141 | + , throttle_usecs_ (ps.get<size_t> ("throttle_usecs", 0)) // in units of us |
| 142 | + , rollover_subrun_interval_(ps.get<size_t> ("rollover_subrun_interval", 20000)) |
| 143 | + , diagLevel_ (ps.get<int> ("diagLevel", 0)) |
| 144 | +{ |
| 145 | + // mode_ can still be overridden by environment! |
| 146 | + theCFO_ = std::make_unique<CFOLib::CFO>(mode_, |
| 147 | + ps.get<int>("cfo", -1), |
| 148 | + ps.get<std::string>("expectedDesignVersion", ""), |
| 149 | + skip_cfo_init_, |
| 150 | + ps.get<std::string>("uid", "")); |
| 151 | + |
| 152 | + mode_ = theCFO_->GetSimMode(); |
| 153 | + TLOG(TLVL_DEBUG) << "CFODataReceiver Initialized with mode " << mode_; |
| 154 | + |
| 155 | + if (skip_cfo_init_) return; // skip any control of DTC |
| 156 | + |
| 157 | + theCFO_->ReleaseAllBuffers(DTC_DMA_Engine_DAQ); |
| 158 | + |
| 159 | +} |
| 160 | + |
| 161 | + |
| 162 | +void mu2e::CFODataReceiver::stop() |
| 163 | +{ |
| 164 | + |
| 165 | + //if (skip_cfo_init_) return; // skip any control of DTC |
| 166 | + |
| 167 | +} |
| 168 | + |
| 169 | +void mu2e::CFODataReceiver::start() |
| 170 | +{ |
| 171 | + theCFO_->ReleaseAllBuffers(DTC_DMA_Engine_DAQ); |
| 172 | +} |
| 173 | + |
| 174 | +bool mu2e::CFODataReceiver::getNextDTCFragment(artdaq::FragmentPtrs& frags, DTCLib::DTC_EventWindowTag ts_in) |
| 175 | +{ |
| 176 | + auto before_read = std::chrono::steady_clock::now(); |
| 177 | + int retryCount = 5; |
| 178 | + std::vector<std::unique_ptr<CFOLib::CFO_Event>> data; |
| 179 | + while (data.size() == 0 && retryCount >= 0) |
| 180 | + { |
| 181 | + try |
| 182 | + { |
| 183 | + theCFO_->GetData(data, ts_in /* not used when not matching */);// do we need to set matchEventWindowTag = true?? |
| 184 | + TLOG(TLVL_TRACE + 25) << "Done calling theCFO->GetData() data.size()=" << data.size() << ", retryCount=" << retryCount; |
| 185 | + } |
| 186 | + catch (std::exception const& ex) |
| 187 | + { |
| 188 | + TLOG(TLVL_ERROR) << "There was an error in the DTC Library: " << ex.what(); |
| 189 | + } |
| 190 | + retryCount--; |
| 191 | + } |
| 192 | + if (retryCount < 0 && data.size() == 0) |
| 193 | + { |
| 194 | + // Return true if no data in external CFO mode, otherwise false |
| 195 | + return mode_ == 0; |
| 196 | + } |
| 197 | + auto after_read = std::chrono::steady_clock::now(); |
| 198 | + |
| 199 | + DTCLib::DTC_EventWindowTag ts_out = data[0]->GetEventWindowTag(); |
| 200 | + TLOG(TLVL_TRACE) << "Received data with timestamp " << ts_out.GetEventWindowTag(true); |
| 201 | + |
| 202 | + // GetSubEventData can return multiple EWTs, and we can assume that there is ONE DTC_SubEvent per EWT! |
| 203 | + for (auto& cfoevt : data) |
| 204 | + { |
| 205 | + TLOG(TLVL_TRACE + 20) << "Initializing a CFO_Event "; |
| 206 | + auto evt = std::make_unique<CFOLib::CFO_Event>(&cfoevt); |
| 207 | + //TLOG(TLVL_TRACE + 23) << "Setting Eventmode to " << (uint64_t) evt->GetEventMode(); |
| 208 | + |
| 209 | + auto ptr = reinterpret_cast<const uint8_t*>(evt->GetRawBufferPointer()); |
| 210 | + |
| 211 | + TLOG(TLVL_TRACE + 21) << "Calling memcpy(" << (const void*)ptr << ", " << (void*)cfoevt->GetRawBufferPointer() << ", " << cfoevt->GetEventByteCount() << ")"; |
| 212 | + memcpy(const_cast<uint8_t*>(ptr), cfoevt->GetRawBufferPointer(), cfoevt->GetEventByteCount()); |
| 213 | + ptr += cfoevt->GetEventByteCount(); |
| 214 | + |
| 215 | + TLOG(TLVL_TRACE + 23) << "Setting EventWindowTag to " << ts_out.GetEventWindowTag(true); |
| 216 | + evt->SetEventWindowTag(ts_out); |
| 217 | + |
| 218 | + // auto after_print = std::chrono::steady_clock::now(); |
| 219 | + |
| 220 | + auto fragment_timestamp = ts_out.GetEventWindowTag(true); |
| 221 | + TLOG(TLVL_TRACE + 24) << "fragment_timestamp=" << fragment_timestamp; // << " while timestamp_to_use=" << timestamp_to_use; |
| 222 | + |
| 223 | + |
| 224 | + //frags.emplace_back(new artdaq::Fragment(getCurrentSequenceID(), fragment_ids_[0], FragmentType::DTCEVT, fragment_timestamp)); |
| 225 | + TLOG(TLVL_TRACE + 25) << "Creating Fragment, sz=" << evt->GetEventByteCount() << ", seqid=" << getCurrentSequenceID(); |
| 226 | + frags.emplace_back(new artdaq::Fragment(fragment_timestamp, fragment_ids_[0], FragmentType::DTCEVT, fragment_timestamp)); |
| 227 | + frags.back()->resizeBytes(evt->GetEventByteCount()); |
| 228 | + memcpy(frags.back()->dataBegin(), evt->GetRawBufferPointer(), evt->GetEventByteCount()); |
| 229 | + metricMan->sendMetric("Average Event Size", evt->GetEventByteCount(), "Bytes", 3, artdaq::MetricMode::Average); |
| 230 | + TLOG(TLVL_TRACE + 26) << "Incrementing event counter"; |
| 231 | + ev_counter_inc(); |
| 232 | + } |
| 233 | + |
| 234 | + auto after_copy = std::chrono::steady_clock::now(); |
| 235 | + TLOG(TLVL_TRACE + 27) << "Reporting Metrics"; |
| 236 | + auto hwTime = theCFO_->GetDevice()->GetDeviceTime(); |
| 237 | + |
| 238 | + double hw_timestamp_rate = 1 / hwTime; |
| 239 | + |
| 240 | + metricMan->sendMetric("CFO Read Time" , artdaq::TimeUtils::GetElapsedTime(after_read, after_copy) , "s", 3, artdaq::MetricMode::Average); |
| 241 | + metricMan->sendMetric("Fragment Prep Time", artdaq::TimeUtils::GetElapsedTime(before_read, after_read), "s", 3, artdaq::MetricMode::Average); |
| 242 | + metricMan->sendMetric("HW Timestamp Rate" , hw_timestamp_rate, "timestamps/s", 1, artdaq::MetricMode::Average); |
| 243 | + |
| 244 | + TLOG(TLVL_TRACE + 28) << "Returning true"; |
| 245 | + |
| 246 | + return true; |
| 247 | +} |
| 248 | + |
| 249 | +size_t mu2e::CFODataReceiver::getCurrentSequenceID() |
| 250 | +{ |
| 251 | + return ev_counter(); |
| 252 | +} |
| 253 | + |
| 254 | +// The following macro is defined in artdaq's GeneratorMacros.hh header |
| 255 | +DEFINE_ARTDAQ_COMMANDABLE_GENERATOR(mu2e::CFODataReceiver) |
0 commit comments