Skip to content

Commit a44b537

Browse files
authored
Merge pull request #47068 from smorovic/15_0_X-source-improvements-2
[DAQ] Input source improvements and initial Phase-2 DTH format support
2 parents c57823d + 06fdce0 commit a44b537

29 files changed

+2211
-723
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
2+
# DTH orbit/event unpacker for DAQSource
3+
4+
https://github.com/smorovic/cmssw/tree/15_0_0_pre1-source-improvements
5+
<br>
6+
This patch implements unpacking of the the DTH data format by `DAQSource` into `FedRawDataCollection`.
7+
8+
It is rebased over CMSSW master (compatible with 15_0_0_pre1 at the time this file is commited), but it builds and runs in 14_2_0 as well. All changes are contained in `EventFilter/Utilities`.
9+
10+
## Fetching the code
11+
12+
```
13+
scram project CMSSW_15_0_0_pre1 #or CMSSW_14_2_0 (currently it compiles and runs also in 14_X releases)
14+
git cms-addpkg EventFilter/Utilities
15+
git remote add smorovic https://github.com/smorovic/cmssw.git
16+
git fetch smorovic 15_0_0_pre1-source-improvements:15_0_0_pre1-source-improvements
17+
git checkout 15_0_0_pre1-source-improvements
18+
scram b
19+
```
20+
21+
Run the unit test (generates and consumes files with DTH format):
22+
```
23+
cmsenv
24+
cd src/EventFilter/Utilities/test
25+
./RunBUFU.sh
26+
```
27+
28+
## Important code and scripts in `EventFilter/Utilities`:
29+
30+
Definition of DTH orbit header, fragment trailer and SLinkRocket header/trailer (could potentially be moved to DataFormats or another package in the future):
31+
<br>
32+
[interface/DTHHeaders.h](../interface/DTHHeaders.h)
33+
34+
Plugin for DAQSource (input source) which parses the DTH format:
35+
<br>
36+
[src/DAQSourceModelsDTH.cc](../src/DAQSourceModelsDTH.cc)
37+
38+
Generator of dummy DTH payload for the fake "BU" process used in unit tests:
39+
<br>
40+
[plugins/DTHFakeReader.cc](../plugins/DTHFakeReader.cc)
41+
42+
Script which runs the unit test with "fakeBU" process generating payload from multiple DTH sources (per orbit) and "FU" CMSSW job consuming it:
43+
<br>
44+
[test/testDTH.sh](../test/testDTH.sh)
45+
46+
FU cmsRun configuration used in above tests:
47+
<br>
48+
[test/unittest_FU_daqsource.py](../test/unittest_FU_daqsource.py)
49+
50+
## Running on custom input files
51+
`unittest_FU_daqsource.py` script can be used as a starting point to create a custom runner with inputs such as DTH dumps (not generated as in the unit test). DAQSource should be set to `dataMode = cms.untracked.string("DTH")` to process DTH format. Change `fileListMode` to `True` and fill in `fileList` parameter with file paths to run with custom files, however they should be named similarly and could also be placed in similar directory structure, `ramdisk/runXX`, to provide initial run and lumisection to the source. Run number is also passed to the source via the command line as well as the working directory (see `testDTH.sh` script).
52+
53+
Note on the file format: apart of parsing single DTH orbit dump, input source plugin is capable also of building events from multiple DTH orbit blocks, but for the same orbit they must come sequentially in the file. Source scans the file and will find all blocks with orbit headers from the same orbit number, until a different orbit number is found or EOF, then it proceeds to build events from them by starting from last DTH event fragment trailer in each of the orbits found. This is then iterated for the next set of orbit blocks with the same orbit number in the file until file is processed.
54+
55+
It is possible that another DAQ-specific header will be added to both file and per-orbit to better encapsulate data similar is done for Run2/3 RAW data), to provide additional metadata to improve integrity and completeness checks after aggregation of data in DAQ. At present, only RAW DTH is supported by "DTH" format.

EventFilter/Utilities/interface/DAQSource.h

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <memory>
88
#include <mutex>
99
#include <thread>
10+
#include <queue>
1011

1112
#include "oneapi/tbb/concurrent_queue.h"
1213
#include "oneapi/tbb/concurrent_vector.h"
@@ -20,7 +21,8 @@
2021
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
2122

2223
//import InputChunk
23-
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
24+
#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
25+
#include "EventFilter/Utilities/interface/SourceRawFile.h"
2426

2527
class FEDRawDataCollection;
2628
class InputSourceDescription;
@@ -72,6 +74,7 @@ class DAQSource : public edm::RawInputSource {
7274
void maybeOpenNewLumiSection(const uint32_t lumiSection);
7375

7476
void readSupervisor();
77+
void fileDeleter();
7578
void dataArranger();
7679
void readWorker(unsigned int tid);
7780
void threadError();
@@ -92,10 +95,11 @@ class DAQSource : public edm::RawInputSource {
9295
uint64_t maxChunkSize_; // for buffered read-ahead
9396
uint64_t eventChunkBlock_; // how much read(2) asks at the time
9497
unsigned int readBlocks_;
98+
int numConcurrentReads_;
9599
unsigned int numBuffers_;
96100
unsigned int maxBufferedFiles_;
97-
unsigned int numConcurrentReads_;
98101
std::atomic<unsigned int> readingFilesCount_;
102+
std::atomic<unsigned int> heldFilesCount_;
99103

100104
// get LS from filename instead of event header
101105
const bool alwaysStartFromFirstLS_;
@@ -136,6 +140,7 @@ class DAQSource : public edm::RawInputSource {
136140

137141
bool startedSupervisorThread_ = false;
138142
std::unique_ptr<std::thread> readSupervisorThread_;
143+
std::unique_ptr<std::thread> fileDeleterThread_;
139144
std::unique_ptr<std::thread> dataArrangerThread_;
140145
std::vector<std::thread*> workerThreads_;
141146

@@ -164,6 +169,7 @@ class DAQSource : public edm::RawInputSource {
164169
//supervisor thread wakeup
165170
std::mutex mWakeup_;
166171
std::condition_variable cvWakeup_;
172+
std::condition_variable cvWakeupAll_;
167173

168174
//variables for the single buffered mode
169175
int fileDescriptor_ = -1;
@@ -176,28 +182,4 @@ class DAQSource : public edm::RawInputSource {
176182
std::shared_ptr<DataMode> dataMode_;
177183
};
178184

179-
class RawInputFile : public InputFile {
180-
public:
181-
RawInputFile(evf::EvFDaqDirector::FileStatus status,
182-
unsigned int lumi = 0,
183-
std::string const& name = std::string(),
184-
bool deleteFile = true,
185-
int rawFd = -1,
186-
uint64_t fileSize = 0,
187-
uint16_t rawHeaderSize = 0,
188-
uint32_t nChunks = 0,
189-
int nEvents = 0,
190-
DAQSource* parent = nullptr)
191-
: InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
192-
sourceParent_(parent) {}
193-
bool advance(unsigned char*& dataPosition, const size_t size);
194-
void advance(const size_t size) {
195-
chunkPosition_ += size;
196-
bufferPosition_ += size;
197-
}
198-
199-
private:
200-
DAQSource* sourceParent_;
201-
};
202-
203185
#endif // EventFilter_Utilities_DAQSource_h

EventFilter/Utilities/interface/DAQSourceModels.h

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
2525

2626
//import InputChunk
27-
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
27+
#include "EventFilter/Utilities/interface/SourceRawFile.h"
2828

29+
class RawInputFile;
30+
class UnpackedRawEventWrapper;
2931
class DAQSource;
3032

3133
//evf?
@@ -40,18 +42,16 @@ class DataMode {
4042
virtual uint32_t headerSize() const = 0;
4143
virtual bool versionCheck() const = 0;
4244
virtual uint64_t dataBlockSize() const = 0;
43-
virtual void makeDataBlockView(unsigned char* addr,
44-
size_t maxSize,
45-
std::vector<uint64_t> const& fileSizes,
46-
size_t fileHeaderSize) = 0;
47-
virtual bool nextEventView() = 0;
45+
virtual void makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) = 0;
46+
virtual bool nextEventView(RawInputFile*) = 0;
47+
virtual bool blockChecksumValid() = 0;
4848
virtual bool checksumValid() = 0;
4949
virtual std::string getChecksumError() const = 0;
50-
virtual bool isRealData() const = 0;
5150
virtual uint32_t run() const = 0;
5251
virtual bool dataBlockCompleted() const = 0;
5352
virtual bool requireHeader() const = 0;
5453
virtual bool fitToBuffer() const = 0;
54+
virtual void unpackFile(RawInputFile* file) = 0;
5555

5656
virtual bool dataBlockInitialized() const = 0;
5757
virtual void setDataBlockInitialized(bool) = 0;
@@ -66,9 +66,12 @@ class DataMode {
6666
std::string const& runDir) = 0;
6767
void setTesting(bool testing) { testing_ = testing; }
6868

69+
bool errorDetected() { return errorDetected_; }
70+
6971
protected:
7072
DAQSource* daqSource_;
7173
bool testing_ = false;
74+
bool errorDetected_ = false;
7275
};
7376

7477
#endif // EventFilter_Utilities_DAQSourceModels_h
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#ifndef EventFilter_Utilities_DAQSourceModelsDTH_h
2+
#define EventFilter_Utilities_DAQSourceModelsDTH_h
3+
4+
#include <filesystem>
5+
#include <queue>
6+
7+
#include "EventFilter/Utilities/interface/DAQSourceModels.h"
8+
#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
9+
#include "EventFilter/Utilities/interface/DTHHeaders.h"
10+
11+
class FEDRawDataCollection;
12+
13+
class DataModeDTH : public DataMode {
14+
public:
15+
DataModeDTH(DAQSource* daqSource, bool verifyChecksum) : DataMode(daqSource), verifyChecksum_(verifyChecksum) {}
16+
~DataModeDTH() override {}
17+
std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& makeDaqProvenanceHelpers() override;
18+
void readEvent(edm::EventPrincipal& eventPrincipal) override;
19+
20+
//non-virtual
21+
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData);
22+
23+
int dataVersion() const override { return detectedDTHversion_; }
24+
void detectVersion(unsigned char* fileBuf, uint32_t fileHeaderOffset) override {
25+
detectedDTHversion_ = 1; //TODO: read version
26+
}
27+
28+
uint32_t headerSize() const override { return sizeof(evf::DTHOrbitHeader_v1); }
29+
30+
bool versionCheck() const override { return detectedDTHversion_ == 1; }
31+
32+
uint64_t dataBlockSize() const override { return dataBlockSize_; }
33+
34+
void makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) override;
35+
36+
bool nextEventView(RawInputFile*) override;
37+
bool blockChecksumValid() override { return checksumValid_; }
38+
bool checksumValid() override { return checksumValid_; }
39+
std::string getChecksumError() const override { return checksumError_; }
40+
41+
bool isRealData() const { return true; } //this flag could be added to RU/BU-generated index
42+
43+
uint32_t run() const override { return firstOrbitHeader_->runNumber(); }
44+
45+
bool dataBlockCompleted() const override { return blockCompleted_; }
46+
47+
bool requireHeader() const override { return false; }
48+
49+
bool fitToBuffer() const override { return true; }
50+
51+
void unpackFile(RawInputFile*) override {}
52+
53+
bool dataBlockInitialized() const override { return dataBlockInitialized_; }
54+
55+
void setDataBlockInitialized(bool val) override { dataBlockInitialized_ = val; }
56+
57+
void setTCDSSearchRange(uint16_t MINTCDSuTCAFEDID, uint16_t MAXTCDSuTCAFEDID) override {}
58+
59+
void makeDirectoryEntries(std::vector<std::string> const& baseDirs,
60+
std::vector<int> const& numSources,
61+
std::string const& runDir) override {}
62+
63+
std::pair<bool, std::vector<std::string>> defineAdditionalFiles(std::string const& primaryName, bool) const override {
64+
return std::make_pair(true, std::vector<std::string>());
65+
}
66+
67+
private:
68+
bool verifyChecksum_;
69+
std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>> daqProvenanceHelpers_;
70+
uint16_t detectedDTHversion_ = 0;
71+
evf::DTHOrbitHeader_v1* firstOrbitHeader_ = nullptr;
72+
uint64_t nextEventID_ = 0;
73+
std::vector<evf::DTHFragmentTrailer_v1*> eventFragments_; //events in block (DTH trailer)
74+
bool dataBlockInitialized_ = false;
75+
bool blockCompleted_ = true;
76+
77+
std::vector<uint8_t*> addrsStart_; //start of orbit payloads per source
78+
std::vector<uint8_t*> addrsEnd_; //dth trailers per source (go through events from the end)
79+
80+
bool checksumValid_ = false;
81+
std::string checksumError_;
82+
//total
83+
size_t dataBlockSize_ = 0;
84+
//uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
85+
//uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
86+
bool eventCached_ = false;
87+
};
88+
89+
#endif // EventFilter_Utilities_DAQSourceModelsDTH_h

0 commit comments

Comments
 (0)