Skip to content

Commit ab7d8f0

Browse files
authored
Merge 7503b94 into sapling-pr-archive-ktf
2 parents 6ccbee2 + 7503b94 commit ab7d8f0

File tree

12 files changed

+118
-38
lines changed

12 files changed

+118
-38
lines changed

Detectors/ITSMFT/ITS/tracking/GPU/cuda/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
if(CUDA_ENABLED)
1414
find_package(CUDAToolkit)
1515
message(STATUS "Building ITS CUDA tracker")
16-
add_compile_options(-O0 -g -lineinfo -fPIC)
16+
# add_compile_options(-O0 -g -lineinfo -fPIC)
1717
# add_compile_definitions(ITS_MEASURE_GPU_TIME)
1818
o2_add_library(ITStrackingCUDA
1919
SOURCES ClusterLinesGPU.cu

Detectors/MUON/MCH/Conditions/src/scan-hvlv-ccdb.cxx

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ using DPBMAP = std::map<uint64_t, uint64_t>;
5656
using ISSUE = std::tuple<uint64_t, uint64_t, double, double, std::string>;
5757
using ISSUELIST = std::vector<ISSUE>;
5858
using ISSUEMAP = std::map<std::string, ISSUELIST>;
59+
using ULL = unsigned long long;
5960

6061
//----------------------------------------------------------------------------
6162
bool containsAKey(std::string data, const std::set<std::string>& Keys)
@@ -238,14 +239,14 @@ void checkRunBoundaries(const RBMAP& runBoundaries)
238239
for (const auto& [run, boundaries] : runBoundaries) {
239240
if (boundaries.second <= boundaries.first) {
240241
printf("error: run %d EOR <= SOR: %llu - %llu (%s - %s)\n",
241-
run, boundaries.first, boundaries.second,
242+
run, (ULL)boundaries.first, (ULL)boundaries.second,
242243
getTime(boundaries.first).c_str(), getTime(boundaries.second).c_str());
243244
error = true;
244245
}
245246
if (boundaries.first <= endOfPreviousRun) {
246247
printf("error: SOR run %d <= EOR run %d: %llu (%s) <= %llu (%s)\n",
247-
run, previousRun, boundaries.first, getTime(boundaries.first).c_str(),
248-
endOfPreviousRun, getTime(endOfPreviousRun).c_str());
248+
run, previousRun, (ULL)boundaries.first, getTime(boundaries.first).c_str(),
249+
(ULL)endOfPreviousRun, getTime(endOfPreviousRun).c_str());
249250
error = true;
250251
}
251252
previousRun = run;
@@ -266,7 +267,7 @@ void printRunBoundaries(const RBMAP& runBoundaries)
266267
printf("------------------------------------\n");
267268

268269
for (const auto& [run, boundaries] : runBoundaries) {
269-
printf("%d: %llu - %llu (%s - %s)\n", run, boundaries.first, boundaries.second,
270+
printf("%d: %llu - %llu (%s - %s)\n", run, (ULL)boundaries.first, (ULL)boundaries.second,
270271
getTime(boundaries.first).c_str(), getTime(boundaries.second).c_str());
271272
}
272273

@@ -324,7 +325,7 @@ DPBMAP getDPBoundaries(ccdb::CcdbApi const& api, std::string what,
324325

325326
if (dpBoundaries.empty()) {
326327
printf("\e[0;31merror: no file found in %s in time range %llu - %llu (%s - %s) --> use the default one\e[0m\n",
327-
what.c_str(), tStart, tStop, getTime(tStart).c_str(), getTime(tStop).c_str());
328+
what.c_str(), (ULL)tStart, (ULL)tStop, getTime(tStart).c_str(), getTime(tStop).c_str());
328329
dpBoundaries.emplace(1, 9999999999999);
329330
}
330331

@@ -340,13 +341,13 @@ void checkDPBoundaries(const DPBMAP& dpBoundaries, bool scanHV, uint64_t tStart,
340341

341342
if (dpBoundaries.begin()->first > tStart) {
342343
printf("error: the beginning of the time range is not covered: %llu > %llu (%s > %s)\n",
343-
dpBoundaries.begin()->first, tStart,
344+
(ULL)dpBoundaries.begin()->first, (ULL)tStart,
344345
getTime(dpBoundaries.begin()->first).c_str(), getTime(tStart).c_str());
345346
error = true;
346347
}
347348
if (dpBoundaries.rbegin()->second < tStop) {
348349
printf("error: the end of the time range is not covered: %llu < %llu (%s < %s)\n",
349-
dpBoundaries.rbegin()->second, tStop,
350+
(ULL)dpBoundaries.rbegin()->second, (ULL)tStop,
350351
getTime(dpBoundaries.rbegin()->second).c_str(), getTime(tStop).c_str());
351352
error = true;
352353
}
@@ -355,13 +356,13 @@ void checkDPBoundaries(const DPBMAP& dpBoundaries, bool scanHV, uint64_t tStart,
355356
for (auto [tStart, tStop] : dpBoundaries) {
356357
if (tStop <= tStart) {
357358
printf("error: EOF <= SOF: %llu - %llu (%s - %s)\n",
358-
tStart, tStop, getTime(tStart).c_str(), getTime(tStop).c_str());
359+
(ULL)tStart, (ULL)tStop, getTime(tStart).c_str(), getTime(tStop).c_str());
359360
error = true;
360361
}
361362
if (tStart != previousTStop) {
362363
printf("error: end of %s file != start of next %s file: %llu (%s) != %llu (%s))\n",
363364
scanHV ? "HV" : "LV", scanHV ? "HV" : "LV",
364-
previousTStop, getTime(previousTStop).c_str(), tStart, getTime(tStart).c_str());
365+
(ULL)previousTStop, getTime(previousTStop).c_str(), (ULL)tStart, getTime(tStart).c_str());
365366
error = true;
366367
}
367368
previousTStop = tStop;
@@ -381,10 +382,10 @@ void printDPBoundaries(const DPBMAP& dpBoundaries, bool scanHV, uint64_t timeInt
381382
printf("------------------------------------\n");
382383

383384
for (auto [tStart, tStop] : dpBoundaries) {
384-
printf("%llu - %llu (%s - %s)", tStart, tStop, getTime(tStart).c_str(), getTime(tStop).c_str());
385+
printf("%llu - %llu (%s - %s)", (ULL)tStart, (ULL)tStop, getTime(tStart).c_str(), getTime(tStop).c_str());
385386
if (tStop - tStart < 60000 * (timeInterval - 1) || tStop - tStart > 60000 * (timeInterval + 1)) {
386387
printf("\e[0;31m ! warning: validity range %s != %llu±1 min\e[0m\n",
387-
getDuration(tStart, tStop).c_str(), timeInterval);
388+
getDuration(tStart, tStop).c_str(), (ULL)timeInterval);
388389
} else {
389390
printf("\n");
390391
}
@@ -478,14 +479,15 @@ void fillDataPoints(const std::vector<DPVAL>& dps, std::map<uint64_t, double>& d
478479
auto previousTS = dps2.rbegin()->first;
479480
if (ts != previousTS || getValue(*itDP) != dps2.rbegin()->second) {
480481
if (ts <= previousTS) {
481-
printf("error: wrong data point order (%llu <= %llu)\n", ts, previousTS);
482+
printf("error: wrong data point order (%llu <= %llu)\n", (ULL)ts, (ULL)previousTS);
482483
exit(1);
483484
}
484485
if (printWarning) {
485486
printf("%s%s missing the previous data point (dt = %s%llu ms)", color.c_str(), header.c_str(),
486-
(previousTS < tMin) ? "-" : "+", (previousTS < tMin) ? tMin - previousTS : previousTS - tMin);
487+
(previousTS < tMin) ? "-" : "+",
488+
(ULL)((previousTS < tMin) ? tMin - previousTS : previousTS - tMin));
487489
if (ts <= tMin) {
488-
printf(" but get one at dt = -%llu ms\e[0m\n", tMin - ts);
490+
printf(" but get one at dt = -%llu ms\e[0m\n", (ULL)(tMin - ts));
489491
} else {
490492
printf("\e[0m\n");
491493
}
@@ -496,11 +498,11 @@ void fillDataPoints(const std::vector<DPVAL>& dps, std::map<uint64_t, double>& d
496498

497499
// add the first data point (should be before the start of validity of the file)
498500
if (ts >= tMax) {
499-
printf("error: first data point exceeding file validity range (dt = +%llu ms)\n", ts - tMax);
501+
printf("error: first data point exceeding file validity range (dt = +%llu ms)\n", (ULL)(ts - tMax));
500502
exit(1);
501503
} else if (ts > tMin && printWarning) {
502504
printf("%s%s missing data point prior file start of validity (dt = +%llu ms)\e[0m\n",
503-
color.c_str(), header.c_str(), ts - tMin);
505+
color.c_str(), header.c_str(), (ULL)(ts - tMin));
504506
header = " ";
505507
}
506508
dps2.emplace(ts, getValue(*itDP));
@@ -510,15 +512,15 @@ void fillDataPoints(const std::vector<DPVAL>& dps, std::map<uint64_t, double>& d
510512
for (++itDP; itDP < dps.end(); ++itDP) {
511513
ts = itDP->get_epoch_time();
512514
if (ts <= previousTS) {
513-
printf("error: wrong data point order (%llu <= %llu)\n", ts, previousTS);
515+
printf("error: wrong data point order (%llu <= %llu)\n", (ULL)ts, (ULL)previousTS);
514516
exit(1);
515517
}
516518
if (ts < tMin && (warningLevel > 1 || (warningLevel == 1 && ts + tolerance < tMin))) {
517519
printf("%s%s data point outside of file validity range (dt = -%llu ms)\e[0m\n",
518-
(ts + tolerance < tMin) ? "\e[0;31m" : "\e[0;34m", header.c_str(), tMin - ts);
520+
(ts + tolerance < tMin) ? "\e[0;31m" : "\e[0;34m", header.c_str(), (ULL)(tMin - ts));
519521
} else if (ts >= tMax && warningLevel >= 1) {
520522
printf("\e[0;31m%s data point outside of file validity range (dt = +%llu ms)\e[0m\n",
521-
header.c_str(), ts - tMax);
523+
header.c_str(), (ULL)(ts - tMax));
522524
}
523525
dps2.emplace(ts, getValue(*itDP));
524526
previousTS = ts;
@@ -580,22 +582,22 @@ void printDataPoints(const DPMAP2 dpsMapsPerCh[10], std::string hvlvFormat, bool
580582

581583
for (const auto& [alias, dps] : dpsMapsPerCh[ch]) {
582584

583-
printf("- %s: %lu values", alias.c_str(), dps.size());
585+
printf("- %s: %zu values", alias.c_str(), dps.size());
584586

585587
if (all) {
586588

587589
printf("\n");
588590
for (const auto& [ts, val] : dps) {
589-
printf(format1.c_str(), ts, getTime(ts).c_str(), val);
591+
printf(format1.c_str(), (ULL)ts, getTime(ts).c_str(), val);
590592
}
591593

592594
} else if (!dps.empty()) {
593595

594596
const auto firstdt = dps.begin();
595597
const auto lastdt = dps.rbegin();
596598
printf(format2.c_str(),
597-
firstdt->first, getTime(firstdt->first).c_str(), firstdt->second,
598-
lastdt->first, getTime(lastdt->first).c_str(), lastdt->second);
599+
(ULL)firstdt->first, getTime(firstdt->first).c_str(), firstdt->second,
600+
(ULL)lastdt->first, getTime(lastdt->first).c_str(), lastdt->second);
599601

600602
} else {
601603
printf("\n");
@@ -719,22 +721,22 @@ void fillO2Issues(const std::vector<mch::HVStatusCreator::TimeRange>& o2issues,
719721
// exclude issues fully outside of the DP file boudaries
720722
if (itIssue->end <= tMin || itIssue->begin >= tMax) {
721723
printf("\e[0;35mwarning: skipping O2 issue outside of file boundaries (%llu - %llu)\e[0m\n",
722-
itIssue->begin, itIssue->end);
724+
(ULL)itIssue->begin, (ULL)itIssue->end);
723725
continue;
724726
}
725727

726728
// only the first issue could in principle extend before the start of the DP file, to O
727729
if (itIssue->begin < tMin - mch::StatusMapCreatorParam::Instance().timeMargin &&
728730
(itIssue != o2issues.begin() || itIssue->begin != 0)) {
729731
printf("\e[0;35mwarning: O2 returns an issue with uncommon start time (%llu < %llu)\e[0m\n",
730-
itIssue->begin, tMin - mch::StatusMapCreatorParam::Instance().timeMargin);
732+
(ULL)itIssue->begin, (ULL)(tMin - mch::StatusMapCreatorParam::Instance().timeMargin));
731733
}
732734

733735
// only the last issue could in principle extend beyond the end of the DP file, to infinity
734736
if (itIssue->end >= tMax + mch::StatusMapCreatorParam::Instance().timeMargin &&
735737
(itIssue != std::prev(o2issues.end()) || itIssue->end != std::numeric_limits<uint64_t>::max())) {
736738
printf("\e[0;35mwarning: O2 returns an issue with uncommon end time (%llu >= %llu)\e[0m\n",
737-
itIssue->end, tMax + mch::StatusMapCreatorParam::Instance().timeMargin);
739+
(ULL)itIssue->end, (ULL)(tMax + mch::StatusMapCreatorParam::Instance().timeMargin));
738740
}
739741

740742
// extend the last issue in case of continuity accross the DP files or add a new one,
@@ -897,7 +899,7 @@ void printIssues(const ISSUEMAP issuesPerCh[10], const ISSUEMAP o2IssuesPerCh[10
897899
auto printIssue = [&format](ISSUE issue, std::string color) {
898900
const auto& [tStart, tStop, min, mean, runs] = issue;
899901
printf("%s", color.c_str());
900-
printf(format.c_str(), tStart, tStop,
902+
printf(format.c_str(), (ULL)tStart, (ULL)tStop,
901903
getTime(tStart).c_str(), getDuration(tStart, tStop).c_str(), min, mean, runs.c_str());
902904
printf("\e[0m");
903905
};

Framework/Core/include/Framework/CompletionPolicyHelpers.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ struct CompletionPolicyHelpers {
4343

4444
/// When any of the parts of the record have been received, consume them.
4545
static CompletionPolicy consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher);
46+
47+
#if __has_include(<fairmq/shmem/Message.h>)
48+
/// When any of the parts which has arrived has a refcount of 1.
49+
static CompletionPolicy consumeWhenAnyZeroCount(const char* name, CompletionPolicy::Matcher matcher);
50+
#endif
4651
/// Default matcher applies for all devices
4752
static CompletionPolicy consumeWhenAny(CompletionPolicy::Matcher matcher = [](auto const&) -> bool { return true; })
4853
{

Framework/Core/include/Framework/InputSpan.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class InputSpan
4646
/// index and the buffer associated.
4747
/// @nofPartsGetter is the getter for the number of parts associated with an index
4848
/// @a size is the number of elements in the span.
49-
InputSpan(std::function<DataRef(size_t, size_t)> getter, std::function<size_t(size_t)> nofPartsGetter, size_t size);
49+
InputSpan(std::function<DataRef(size_t, size_t)> getter, std::function<size_t(size_t)> nofPartsGetter, std::function<int(size_t)> refCountGetter, size_t size);
5050

5151
/// @a i-th element of the InputSpan
5252
[[nodiscard]] DataRef get(size_t i, size_t partidx = 0) const
@@ -66,6 +66,18 @@ class InputSpan
6666
return mNofPartsGetter(i);
6767
}
6868

69+
// Get the refcount for a given part
70+
[[nodiscard]] int getRefCount(size_t i) const
71+
{
72+
if (i >= mSize) {
73+
return 0;
74+
}
75+
if (!mRefCountGetter) {
76+
return -1;
77+
}
78+
return mRefCountGetter(i);
79+
}
80+
6981
/// Number of elements in the InputSpan
7082
[[nodiscard]] size_t size() const
7183
{
@@ -236,6 +248,7 @@ class InputSpan
236248
private:
237249
std::function<DataRef(size_t, size_t)> mGetter;
238250
std::function<size_t(size_t)> mNofPartsGetter;
251+
std::function<int(size_t)> mRefCountGetter;
239252
size_t mSize;
240253
};
241254

Framework/Core/src/CompletionPolicy.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ std::vector<CompletionPolicy>
2626
{
2727
return {
2828
CompletionPolicyHelpers::consumeWhenAllOrdered("internal-dpl-aod-writer"),
29+
#if __has_include(<fairmq/shmem/Message.h>)
30+
CompletionPolicyHelpers::consumeWhenAnyZeroCount("internal-dpl-injected-dummy-sink", [](DeviceSpec const& s) { return s.name.find("internal-dpl-injected-dummy-sink") != std::string::npos; }),
31+
#else
2932
CompletionPolicyHelpers::consumeWhenAny("internal-dpl-injected-dummy-sink", [](DeviceSpec const& s) { return s.name.find("internal-dpl-injected-dummy-sink") != std::string::npos; }),
33+
#endif
3034
CompletionPolicyHelpers::consumeWhenAll()};
3135
}
3236

Framework/Core/src/CompletionPolicyHelpers.cxx

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
#include "Framework/TimingInfo.h"
2020
#include "DecongestionService.h"
2121
#include "Framework/Signpost.h"
22+
#if __has_include(<fairmq/shmem/Message.h>)
23+
#include <fairmq/shmem/Message.h>
24+
#endif
2225

2326
#include <cassert>
2427
#include <regex>
@@ -249,6 +252,21 @@ CompletionPolicy CompletionPolicyHelpers::consumeExistingWhenAny(const char* nam
249252
}};
250253
}
251254

255+
#if __has_include(<fairmq/shmem/Message.h>)
256+
CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* name, CompletionPolicy::Matcher matcher)
257+
{
258+
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
259+
for (size_t i = 0; i < inputs.size(); ++i) {
260+
if (inputs.get(i).header != nullptr && inputs.getRefCount(i) == 1) {
261+
return CompletionPolicy::CompletionOp::Consume;
262+
}
263+
}
264+
return CompletionPolicy::CompletionOp::Wait;
265+
};
266+
return CompletionPolicy{name, matcher, callback, false};
267+
}
268+
#endif
269+
252270
CompletionPolicy CompletionPolicyHelpers::consumeWhenAny(const char* name, CompletionPolicy::Matcher matcher)
253271
{
254272
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@
5757
#include <fairmq/Parts.h>
5858
#include <fairmq/Socket.h>
5959
#include <fairmq/ProgOptions.h>
60+
#if __has_include(<fairmq/shmem/Message.h>)
61+
#include <fairmq/shmem/Message.h>
62+
#endif
6063
#include <Configuration/ConfigurationInterface.h>
6164
#include <Configuration/ConfigurationFactory.h>
6265
#include <Monitoring/Monitoring.h>
@@ -1214,12 +1217,14 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
12141217
if (forwarded.matcher.lifetime != Lifetime::Condition) {
12151218
onlyConditions = false;
12161219
}
1220+
#if __has_include(<fairmq/shmem/Message.h>)
12171221
if (strncmp(DataSpecUtils::asConcreteOrigin(forwarded.matcher).str, "AOD", 3) == 0) {
12181222
context.canForwardEarly = false;
12191223
overriddenEarlyForward = true;
12201224
LOG(detail) << "Cannot forward early because of AOD input: " << DataSpecUtils::describe(forwarded.matcher);
12211225
break;
12221226
}
1227+
#endif
12231228
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
12241229
context.canForwardEarly = false;
12251230
overriddenEarlyForward = true;
@@ -2231,7 +2236,15 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22312236
auto nofPartsGetter = [&currentSetOfInputs](size_t i) -> size_t {
22322237
return currentSetOfInputs[i].getNumberOfPairs();
22332238
};
2234-
return InputSpan{getter, nofPartsGetter, currentSetOfInputs.size()};
2239+
#if __has_include(<fairmq/shmem/Message.h>)
2240+
auto refCountGetter = [&currentSetOfInputs](size_t idx) -> int {
2241+
auto& header = static_cast<const fair::mq::shmem::Message&>(*currentSetOfInputs[idx].header(0));
2242+
return header.GetRefCount();
2243+
};
2244+
#else
2245+
std::function<int(size_t)> refCountGetter = nullptr;
2246+
#endif
2247+
return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()};
22352248
};
22362249

22372250
auto markInputsAsDone = [ref](TimesliceSlot slot) -> void {

Framework/Core/src/DataRelayer.cxx

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646

4747
#include <fairlogger/Logger.h>
4848
#include <fairmq/Channel.h>
49+
#include <functional>
50+
#if __has_include(<fairmq/shmem/Message.h>)
51+
#include <fairmq/shmem/Message.h>
52+
#endif
4953
#include <fmt/format.h>
5054
#include <fmt/ostream.h>
5155
#include <gsl/span>
@@ -211,7 +215,15 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
211215
auto nPartsGetter = [&partial](size_t idx) {
212216
return partial[idx].size();
213217
};
214-
InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
218+
#if __has_include(<fairmq/shmem/Message.h>)
219+
auto refCountGetter = [&partial](size_t idx) -> int {
220+
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
221+
return header.GetRefCount();
222+
};
223+
#else
224+
std::function<int(size_t)> refCountGetter = nullptr;
225+
#endif
226+
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
215227
// Setup the input span
216228

217229
if (expirator.checker(services, timestamp.value, span) == false) {
@@ -761,7 +773,15 @@ void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& comp
761773
auto nPartsGetter = [&partial](size_t idx) {
762774
return partial[idx].size();
763775
};
764-
InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
776+
#if __has_include(<fairmq/shmem/Message.h>)
777+
auto refCountGetter = [&partial](size_t idx) -> int {
778+
auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
779+
return header.GetRefCount();
780+
};
781+
#else
782+
std::function<int(size_t)> refCountGetter = nullptr;
783+
#endif
784+
InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
765785
CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
766786

767787
auto& variables = mTimesliceIndex.getVariablesForSlot(slot);

0 commit comments

Comments
 (0)