Skip to content

Commit 5b36b06

Browse files
authored
Merge branch 'develop' into revert-249-revert-243-eflumerf/RemoveUPS
2 parents 85f15f0 + d8b092e commit 5b36b06

File tree

12 files changed

+205
-224
lines changed

12 files changed

+205
-224
lines changed

artdaq/Application/DispatcherCore.cc

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ bool artdaq::DispatcherCore::initialize(fhicl::ParameterSet const& pset)
6666
}
6767

6868
broadcast_mode_ = agg_pset.get<bool>("broadcast_mode", true);
69+
allow_label_overwrites_ = agg_pset.get<bool>("allow_label_overwrites", true);
6970
if (broadcast_mode_ && !agg_pset.has_key("broadcast_mode"))
7071
{
7172
agg_pset.put<bool>("broadcast_mode", true);
@@ -107,19 +108,40 @@ std::string artdaq::DispatcherCore::register_monitor(fhicl::ParameterSet const&
107108
TLOG(TLVL_DEBUG + 32) << "Getting unique_label from input ParameterSet";
108109
auto const& label = pset.get<std::string>("unique_label");
109110
TLOG(TLVL_DEBUG + 32) << "Unique label is " << label;
111+
TLOG(TLVL_INFO) << "Registering monitor with unique_label \"" << label << "\"";
112+
113+
bool overwriting_label = false;
114+
110115
{
111116
std::lock_guard<std::mutex> lock(dispatcher_transfers_mutex_);
112117
if (registered_monitors_.count(label) != 0u)
113118
{
114-
throw cet::exception("DispatcherCore") << "Unique label already exists!"; // NOLINT(cert-err60-cpp)
119+
if (allow_label_overwrites_)
120+
{
121+
TLOG(TLVL_WARNING) << "Restarting companion art process with label " << label << "!";
122+
overwriting_label = true;
123+
}
124+
else
125+
{
126+
throw cet::exception("DispatcherCore") << "Unique label already exists!"; // NOLINT(cert-err60-cpp)
127+
}
115128
}
116129

117130
registered_monitors_[label] = pset;
118131
}
119132

120-
// ELF, Jul 21, 2020: This can take a long time, and we don't want to block the XMLRPC thread
121-
boost::thread thread([this, label] { start_art_process_(label); });
122-
thread.detach();
133+
if (overwriting_label)
134+
{
135+
// ELF, Jul 21, 2020: This can take a long time, and we don't want to block the XMLRPC thread
136+
boost::thread thread([this, label] { restart_art_process_(label); });
137+
thread.detach();
138+
}
139+
else
140+
{
141+
// ELF, Jul 21, 2020: This can take a long time, and we don't want to block the XMLRPC thread
142+
boost::thread thread([this, label] { start_art_process_(label); });
143+
thread.detach();
144+
}
123145
}
124146
catch (const cet::exception& e)
125147
{
@@ -159,7 +181,7 @@ std::string artdaq::DispatcherCore::register_monitor(fhicl::ParameterSet const&
159181

160182
std::string artdaq::DispatcherCore::unregister_monitor(std::string const& label)
161183
{
162-
TLOG(TLVL_DEBUG + 32) << "DispatcherCore::unregister_monitor called with argument \"" << label << "\"";
184+
TLOG(TLVL_INFO) << "DispatcherCore::unregister_monitor called for unique_label \"" << label << "\"";
163185
check_filters_();
164186

165187
try
@@ -425,6 +447,12 @@ void artdaq::DispatcherCore::check_filters_()
425447
}
426448
}
427449

450+
void artdaq::DispatcherCore::restart_art_process_(std::string const& label)
451+
{
452+
stop_art_process_(label);
453+
start_art_process_(label);
454+
}
455+
428456
void artdaq::DispatcherCore::start_art_process_(std::string const& label)
429457
{
430458
if (event_store_ptr_ != nullptr)

artdaq/Application/DispatcherCore.hh

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,8 @@ public:
5151
* \param pset ParameterSet used to configure the DispatcherCore
5252
* \return Whether the initialize attempt succeeded
5353
*
54-
* \verbatim
55-
* DispatcherCore accepts the following Parameters:
56-
* "daq" (REQUIRED): FHiCL table containing DAQ configuration
57-
* "Dispatcher" (REQUIRED): FHiCL table containing Dispatcher paramters
58-
* "expected_events_per_bunch" (REQUIRED): Number of events to collect before sending them to art
59-
* "enq_timeout" (Default: 5.0): Maximum amount of time to wait while enqueueing events to the ConcurrentQueue
60-
* "is_data_logger": True if the Dispatcher is a Data Logger
61-
* "is_online_monitor": True if the Dispatcher is an Online Monitor. is_data_logger takes precedence
62-
* "is_dispatcher": True if the Dispatcher is a Dispatcher. is_data_logger and is_online_monitor take precedence
63-
* NOTE: At least ONE of these three parameters must be specified.
64-
* "inrun_recv_timeout_usec" (Default: 100000): Amount of time to wait for new events while running
65-
* "endrun_recv_timeout_usec" (Default: 20000000): Amount of time to wait for additional events at EndOfRun
66-
* "pause_recv_timeout_usec" (Default: 3000000): Amount of time to wait for additional events at PauseRun
67-
* "onmon_event_prescale" (Default: 1): Only send 1/N events to art for online monitoring (requires is_data_logger: true)
68-
* "verbose" (Default: true): Whether to print transition messages
69-
* "metrics": FHiCL table containing configuration for MetricManager
70-
* "outputs" (REQUIRED): FHiCL table containing output parameters
71-
* "normalOutput" (REQUIRED): FHiCL table containing default output parameters
72-
* "fileName" (Default: ""): Name template of the output file. Used to determine output directory
73-
* \endverbatim
54+
* Configuration Parameters unique to the Dispatcher:
55+
* "allow_label_overwrites" (default: true): Allow a new process to start with the same unique_label as an old one, stopping the appropriate art process and restarting with the new configuration.
7456
* Note that the "Dispatcher" ParameterSet is also used to configure the EventStore. See that class' documentation for more information.
7557
*/
7658
bool initialize(fhicl::ParameterSet const& pset) override;
@@ -98,12 +80,14 @@ private:
9880

9981
void start_art_process_(std::string const& label);
10082
void stop_art_process_(std::string const& label);
83+
void restart_art_process_(std::string const& label);
10184

10285
std::mutex dispatcher_transfers_mutex_;
10386
std::unordered_map<std::string, fhicl::ParameterSet> registered_monitors_;
10487
std::unordered_map<std::string, pid_t> registered_monitor_pids_;
10588
fhicl::ParameterSet pset_; // The ParameterSet initially passed to the Dispatcher (contains input info)
10689
bool broadcast_mode_;
90+
bool allow_label_overwrites_;
10791
};
10892

10993
#endif

artdaq/ArtModules/ArtdaqInputHelper.hh

Lines changed: 36 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ void art::ArtdaqInputHelper<U>::readAndConstructPrincipal(std::unique_ptr<TBuffe
492492
<< "Unable to fetch a high-resolution time with clock_gettime for art::SubRun Timestamp. ";
493493
}
494494

495+
TLOG(TLVL_DEBUG + 37, "ArtdaqInputHelper") << "inR: " << static_cast<void*>(inR) << " run " << (inR ? std::to_string(inR->run()) : "invalid")
496+
<< ", inSR: " << static_cast<void*>(inSR) << " run " << (inSR ? std::to_string(inSR->run()) : "invalid")
497+
<< ", subrun " << (inSR ? std::to_string(inSR->subRun()) : "invalid");
498+
495499
// Process Run Aux
496500
TLOG(TLVL_DEBUG + 37, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
497501
<< "processing Run auxiliary ...";
@@ -515,7 +519,7 @@ void art::ArtdaqInputHelper<U>::readAndConstructPrincipal(std::unique_ptr<TBuffe
515519
TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
516520
<< "finished processing Run auxiliary.";
517521

518-
if (msg_type_code != artdaq::NetMonHeader::MessageType::Run)
522+
if (msg_type_code != artdaq::NetMonHeader::MessageType::Run) // SubRun or Event
519523
{
520524
TLOG(TLVL_DEBUG + 38, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
521525
<< "processing SubRun auxiliary ...";
@@ -548,13 +552,6 @@ void art::ArtdaqInputHelper<U>::readAndConstructPrincipal(std::unique_ptr<TBuffe
548552
TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
549553
<< "finished processing SubRun auxiliary.";
550554
}
551-
else if (inSR == nullptr || !inSR->subRunID().isValid())
552-
{
553-
TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: Faking Subrun 1 because there was no input Subrun for Run message";
554-
555-
art::SubRunID subrun_guess(outR->runID(), 1);
556-
outSR = pm_.makeSubRunPrincipal(subrun_guess, currentTime);
557-
}
558555

559556
if (msg_type_code == artdaq::NetMonHeader::MessageType::Event)
560557
{ // Event message.
@@ -584,23 +581,23 @@ void art::ArtdaqInputHelper<U>::readAndConstructPrincipal(std::unique_ptr<TBuffe
584581
TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: "
585582
<< "finished processing Event auxiliary.";
586583
}
587-
588-
if (!outR && !outSR && !outE)
584+
else if (msg_type_code == artdaq::NetMonHeader::MessageType::Subrun)
589585
{
590-
TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "No principals created, making Flush Event based on whether there was an existing SubRun";
591-
592-
if (!inSR || !inSR->subRunID().isValid())
586+
if (outSR == nullptr)
593587
{
594-
TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: Making run flush event";
595-
art::EventID const flush_evid(art::EventID::flushEvent(inR->runID()));
596-
outSR = pm_.makeSubRunPrincipal(flush_evid.subRunID(), currentTime);
597-
outE = pm_.makeEventPrincipal(flush_evid, currentTime);
588+
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "SubrunDataFragment for current Subrun received, returning Flush event";
589+
art::EventID const evid(art::EventID::flushEvent(inSR->subRunID()));
590+
outE = pm_.makeEventPrincipal(evid, currentTime);
598591
}
599-
else
592+
}
593+
else if (msg_type_code == artdaq::NetMonHeader::MessageType::Run)
594+
{
595+
if (outR == nullptr)
600596
{
601-
TLOG(TLVL_DEBUG + 39, "ArtdaqInputHelper") << "readAndConstructPrincipal: Making subrun flush event";
602-
art::EventID const flush_evid(art::EventID::flushEvent(inSR->subRunID()));
603-
outE = pm_.makeEventPrincipal(flush_evid, currentTime);
597+
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "RunDataFragment for current Run received, returning Flush subrun/event";
598+
art::EventID const evid(art::EventID::flushEvent(inR->runID()));
599+
outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
600+
outE = pm_.makeEventPrincipal(evid, currentTime);
604601
}
605602
}
606603
}
@@ -646,64 +643,22 @@ bool art::ArtdaqInputHelper<U>::constructPrincipal(std::shared_ptr<ArtdaqEvent>
646643
<< "The art::Event Timestamp will be zero for event " << eventPtr->header->event_id;
647644
}
648645

649-
if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfRunFragmentType)
650-
{
651-
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfRunFragment received, returning Flush event";
652-
art::EventID const evid(art::EventID::flushEvent());
653-
outR = pm_.makeRunPrincipal(evid.runID(), currentTime);
654-
outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
655-
outE = pm_.makeEventPrincipal(evid, currentTime);
656-
return true;
657-
}
658-
659-
if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfSubrunFragmentType)
660-
{
661-
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfSubrunFragment received, creating new Subrun Principal";
662-
// Check if inR == 0 or is a new run
663-
if (inR == nullptr || !inR->runID().isValid() || inR->run() != eventPtr->header->run_id)
664-
{
665-
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making subrun principal with subrun_id " << eventPtr->header->subrun_id;
666-
if (outSR) delete outSR;
667-
outSR = pm_.makeSubRunPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, currentTime);
668-
art::EventID const evid(art::EventID::flushEvent(outSR->subRunID()));
669-
outE = pm_.makeEventPrincipal(evid, currentTime);
670-
}
671-
else
672-
{
673-
// If the previous subrun was neither 0 nor flush and was identical with the current
674-
// subrun, then it must have been associated with a data event. In that case, we need
675-
// to generate a flush event with a valid run but flush subrun and event number in order
676-
// to end the subrun.
677-
if (inSR != nullptr && !inSR->subRunID().isFlush() && inSR->subRun() == eventPtr->header->subrun_id)
678-
{
679-
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Flushing old run id " << inR->runID();
680-
art::EventID const evid(art::EventID::flushEvent(inR->runID()));
681-
outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
682-
outE = pm_.makeEventPrincipal(evid, currentTime);
683-
// If this is either a new or another empty subrun, then generate a flush event with
684-
// valid run and subrun numbers but flush event number
685-
//} else if(inSR==0 || inSR->id().isFlush()){
686-
}
687-
else
688-
{
689-
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making subrun principal with subrun_id " << eventPtr->header->subrun_id;
690-
outSR = pm_.makeSubRunPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, currentTime);
691-
art::EventID const evid(art::EventID::flushEvent(outSR->subRunID()));
692-
outE = pm_.makeEventPrincipal(evid, currentTime);
693-
// Possible error condition
694-
//} else {
695-
}
696-
}
697-
return true;
698-
}
699-
700646
// make new run if inR is 0 or if the run has changed
701647
if (inR == nullptr || !inR->runID().isValid() || inR->run() != eventPtr->header->run_id)
702648
{
703649
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making run principal with run_id " << eventPtr->header->run_id;
704650
outR = pm_.makeRunPrincipal(eventPtr->header->run_id, currentTime);
705651
}
706652

653+
if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfRunFragmentType)
654+
{
655+
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfRunFragment received, returning Flush subrun/event";
656+
art::EventID const evid(art::EventID::flushEvent(outR != nullptr ? outR->runID() : inR->runID()));
657+
outSR = pm_.makeSubRunPrincipal(evid.subRunID(), currentTime);
658+
outE = pm_.makeEventPrincipal(evid, currentTime);
659+
return true;
660+
}
661+
707662
// make new subrun if inSR is 0 or if the subrun has changed
708663
art::SubRunID subrun_check(eventPtr->header->run_id, eventPtr->header->subrun_id);
709664
if (inSR == nullptr || !inSR->subRunID().isValid() || subrun_check != inSR->subRunID())
@@ -712,6 +667,14 @@ bool art::ArtdaqInputHelper<U>::constructPrincipal(std::shared_ptr<ArtdaqEvent>
712667
outSR = pm_.makeSubRunPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, currentTime);
713668
}
714669

670+
if (eventPtr->FirstFragmentType() == artdaq::Fragment::EndOfSubrunFragmentType)
671+
{
672+
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "EndOfSubrunFragment received, returning Flush event";
673+
art::EventID const evid(art::EventID::flushEvent(outSR != nullptr ? outSR->subRunID() : inSR->subRunID()));
674+
outE = pm_.makeEventPrincipal(evid, currentTime);
675+
return true;
676+
}
677+
715678
TLOG(TLVL_DEBUG + 43, "ArtdaqInputHelper") << "Making event principal with event_id " << eventPtr->header->event_id;
716679
outE = pm_.makeEventPrincipal(eventPtr->header->run_id, eventPtr->header->subrun_id, eventPtr->header->event_id, currentTime);
717680
return true;
@@ -1002,28 +965,7 @@ bool art::ArtdaqInputHelper<U>::readNext(art::RunPrincipal* const inR, art::SubR
1002965
TLOG(TLVL_DEBUG + 32, "ArtdaqInputHelper") << "First Fragment type is " << static_cast<int>(firstFragmentType);
1003966
if (constructPrincipal(eventMap, inR, inSR, outR, outSR, outE))
1004967
{
1005-
auto rfret = readFragments(eventMap->fragments, outR ? outR : inR, outSR ? outSR : inSR, outE);
1006-
1007-
// No event data
1008-
if (!rfret.second)
1009-
{
1010-
delete outE;
1011-
outE = nullptr;
1012-
1013-
if (inSR != nullptr && outSR != nullptr)
1014-
{
1015-
// No products added to subrun
1016-
if (!rfret.first)
1017-
{
1018-
// New subrun is identical to old
1019-
if (inSR->subRunID() == outSR->subRunID())
1020-
{
1021-
delete outSR;
1022-
outSR = nullptr;
1023-
}
1024-
}
1025-
}
1026-
}
968+
readFragments(eventMap->fragments, outR ? outR : inR, outSR ? outSR : inSR, outE);
1027969
}
1028970
}
1029971
else
@@ -1091,25 +1033,6 @@ bool art::ArtdaqInputHelper<U>::readNext(art::RunPrincipal* const inR, art::SubR
10911033
}
10921034
else if (msg_type_code == artdaq::NetMonHeader::MessageType::Subrun)
10931035
{
1094-
// EndSubRun message.
1095-
// From the code above, EndRun and EndSubRun messages cause
1096-
// the construction of principals that have:
1097-
// Run:Subrun:Event=flush:flush:flush.
1098-
// This is a problem when you have two neighboring EndSubRuns
1099-
// which are both associated with empty subruns because art will
1100-
// complain that you a new subrun with a subrun number identical
1101-
// to that of the previous subrun. So the solution is to not
1102-
// return new principals.
1103-
if (inR != nullptr && inSR != nullptr && outR != nullptr && outSR != nullptr)
1104-
{
1105-
if (inR->runID().isFlush() && inSR->subRunID().isFlush() && outR->runID().isFlush() &&
1106-
outSR->subRunID().isFlush())
1107-
{
1108-
outR = nullptr;
1109-
outSR = nullptr;
1110-
return outE ? true : false;
1111-
}
1112-
}
11131036
// FIXME: We need to merge these into the input SubRunPrincipal.
11141037
readDataProducts(msgs, outSR ? outSR : inSR);
11151038
}
@@ -1122,8 +1045,6 @@ bool art::ArtdaqInputHelper<U>::readNext(art::RunPrincipal* const inR, art::SubR
11221045
{
11231046
readFragments(eventMap->fragments, outR ? outR : inR, outSR ? outSR : inSR, outE);
11241047
}
1125-
1126-
TLOG(TLVL_DEBUG + 47, "ArtdaqInputHelper") << "readNext: returning true on Event message.";
11271048
}
11281049
else
11291050
{

artdaq/ArtModules/ArtdaqOutput.hh

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ inline void art::ArtdaqOutput::write(EventPrincipal& ep)
558558

559559
// Subrun number starts at 1
560560
TLOG(TLVL_WRITE, "ArtdaqOutput") << "ArtdaqOutput::write: Setting Output Fragment Header Fields";
561-
auto seqID = (static_cast<uint64_t>(ep.eventID().subRun() - 1) << 32) + ep.eventID().event();
561+
auto seqID = (static_cast<uint64_t>(ep.eventID().subRun()) << 32) + ep.eventID().event();
562562

563563
art::ProcessTag tag("", processName());
564564
auto res = ep.getMany(art::ModuleContext::invalid(), art::WrappedTypeID::make<artdaq::detail::RawEventHeader>(), art::MatchAllSelector(), tag);
@@ -637,8 +637,10 @@ inline void art::ArtdaqOutput::writeSubRun(SubRunPrincipal& srp)
637637
TLOG(TLVL_WRITESUBRUN, "ArtdaqOutput") << "Begin: ArtdaqOutput::writeSubRun(SubRunPrincipal& srp)";
638638
if (!initMsgSent_)
639639
{
640-
send_init_message();
641-
initMsgSent_ = true;
640+
TLOG(TLVL_WARNING, "ArtdaqOutput") << "Not sending Subrun message before Event!";
641+
return;
642+
// send_init_message();
643+
// initMsgSent_ = true;
642644
}
643645

644646
//
@@ -659,7 +661,7 @@ inline void art::ArtdaqOutput::writeSubRun(SubRunPrincipal& srp)
659661
//
660662
// Begin preparing message.
661663
//
662-
auto msg = prepareMessage(last_sequence_id_ + 1, srp.subRun() + 1, artdaq::Fragment::SubrunDataFragmentType);
664+
auto msg = prepareMessage(static_cast<uint64_t>(srp.subRun()) << 32, srp.subRun() + 1, artdaq::Fragment::SubrunDataFragmentType);
663665
//
664666
// Write message type code.
665667
//
@@ -752,8 +754,10 @@ inline void art::ArtdaqOutput::writeRun(RunPrincipal& rp)
752754
(void)rp;
753755
if (!initMsgSent_)
754756
{
755-
send_init_message();
756-
initMsgSent_ = true;
757+
TLOG(TLVL_WARNING, "ArtdaqOutput") << "Not sending Run message before Event!";
758+
return;
759+
// send_init_message();
760+
// initMsgSent_ = true;
757761
}
758762

759763
//
@@ -765,7 +769,7 @@ inline void art::ArtdaqOutput::writeRun(RunPrincipal& rp)
765769
//
766770
// Begin preparing message.
767771
//
768-
auto msg = prepareMessage(last_sequence_id_ + 1, rp.run() + 1, artdaq::Fragment::RunDataFragmentType);
772+
auto msg = prepareMessage(static_cast<uint64_t>(rp.run()) << 32, rp.run() + 1, artdaq::Fragment::RunDataFragmentType);
769773
//
770774
// Write message type code.
771775
//

0 commit comments

Comments
 (0)