Skip to content

Commit 9019955

Browse files
committed
extend opt. max-tf-per-file to raw-tf reader, move it to device
1 parent 0c01d1b commit 9019955

File tree

5 files changed

+26
-20
lines changed

5 files changed

+26
-20
lines changed

Detectors/CTF/workflow/src/CTFReaderSpec.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ void CTFReaderSpec::init(InitContext& ic)
148148
mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
149149
mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
150150
mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
151+
mInput.maxTFs = ic.options().get<int>("max-tf");
152+
mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
153+
mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
154+
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
151155
mRunning = true;
152156
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
153157
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
@@ -474,6 +478,9 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
474478
options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
475479
options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
476480
options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}});
481+
options.emplace_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
482+
options.emplace_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});
483+
477484
if (!inp.metricChannel.empty()) {
478485
options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
479486
}

Detectors/CTF/workflow/src/ctf-reader-workflow.cxx

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
5454
options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}});
5555
options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}});
5656
options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
57-
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
58-
options.push_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});
5957
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
6058
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
6159
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
@@ -117,11 +115,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
117115
if (ctfInput.delay_us < 0) {
118116
ctfInput.delay_us = 0;
119117
}
120-
int n = configcontext.options().get<int>("max-tf");
121-
ctfInput.maxTFs = n > 0 ? n : 0x7fffffff;
122-
123-
n = configcontext.options().get<int>("max-tf-per-file");
124-
ctfInput.maxTFsPerFile = n > 0 ? n : 0x7fffffff;
125118

126119
ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
127120

Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp)
9999
void TFReaderSpec::init(o2f::InitContext& ic)
100100
{
101101
mInput.tfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-tf-ids"));
102+
mInput.maxTFs = ic.options().get<int>("max-tf");
103+
mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
104+
mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
105+
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
106+
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
107+
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
102108
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
103109
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
104110
mFileFetcher->setMaxLoops(mInput.maxLoops);
@@ -417,15 +423,17 @@ void TFReaderSpec::TFBuilder()
417423
}
418424
mTFBuilderCounter++;
419425
}
420-
if (!acceptTF) {
421-
continue;
422-
}
423426
if (mRunning && tf) {
424-
mWaitSendingLast = true;
425-
mTFQueue.push(std::move(tf));
427+
if (acceptTF) {
428+
mWaitSendingLast = true;
429+
mTFQueue.push(std::move(tf));
430+
}
426431
} else {
427432
break;
428433
}
434+
if (mInput.maxTFsPerFile > 0 && mInput.maxTFsPerFile >= locID) { // go to next file
435+
break;
436+
}
429437
}
430438
// remove already processed file from the queue, unless they are needed for further looping
431439
if (mFileFetcher) {
@@ -527,6 +535,11 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
527535
}
528536
spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
529537
spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
538+
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
539+
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
540+
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
541+
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
542+
530543
spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);
531544

532545
return spec;

Detectors/Raw/TFReaderDD/src/TFReaderSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct TFReaderInp {
4343
int64_t delay_us = 0;
4444
int maxLoops = 0;
4545
int maxTFs = -1;
46+
int maxTFsPerFile = -1;
4647
bool sendDummyForMissing = true;
4748
bool sup0xccdb = false;
4849
std::vector<o2::header::DataHeader> hdVec;

Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,16 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
2828
options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, "all", {"list of dectors"}});
2929
options.push_back(ConfigParamSpec{"raw-only-det", VariantType::String, "none", {"do not open non-raw channel for these detectors"}});
3030
options.push_back(ConfigParamSpec{"non-raw-only-det", VariantType::String, "none", {"do not open raw channel for these detectors"}});
31-
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
3231
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (-1 = infinite)"}});
3332
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
3433
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
3534
options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}});
3635
options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
37-
options.push_back(ConfigParamSpec{"max-cached-tf", VariantType::Int, 3, {"max TFs to cache in memory"}});
38-
options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
3936
options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}});
4037
options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}});
4138
options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
4239
options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});
4340
options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});
44-
4541
options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
4642
options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});
4743

@@ -59,8 +55,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
5955
o2::rawdd::TFReaderInp rinp;
6056
rinp.inpdata = configcontext.options().get<std::string>("input-data");
6157
rinp.maxLoops = configcontext.options().get<int>("loop");
62-
int n = configcontext.options().get<int>("max-tf");
63-
rinp.maxTFs = n > 0 ? n : 0x7fffffff;
6458
auto detlistSelect = configcontext.options().get<std::string>("onlyDet");
6559
if (detlistSelect == "all") {
6660
// Exclude FOCAL from default detlist (must be selected on request)
@@ -74,8 +68,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
7468
rinp.rawChannelConfig = configcontext.options().get<std::string>("raw-channel-config");
7569
rinp.delay_us = uint64_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
7670
rinp.verbosity = configcontext.options().get<int>("tf-reader-verbosity");
77-
rinp.maxTFCache = std::max(1, configcontext.options().get<int>("max-cached-tf"));
78-
rinp.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
7971
rinp.copyCmd = configcontext.options().get<std::string>("copy-cmd");
8072
rinp.tffileRegex = configcontext.options().get<std::string>("tf-file-regex");
8173
rinp.remoteRegex = configcontext.options().get<std::string>("remote-regex");

0 commit comments

Comments
 (0)