Skip to content

Commit e9cf891

Browse files
authored
Merge be4cb11 into sapling-pr-archive-ktf
2 parents a5b3ff3 + be4cb11 commit e9cf891

File tree

13 files changed

+293
-8
lines changed

13 files changed

+293
-8
lines changed

Framework/CCDBSupport/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
1+
# Copyright 2019-2025 CERN and copyright holders of ALICE O2.
22
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
33
# All rights not expressly granted are reserved.
44
#
@@ -9,9 +9,10 @@
99
# granted to it by virtue of its status as an Intergovernmental Organization
1010
# or submit itself to any jurisdiction.
1111
o2_add_library(FrameworkCCDBSupport
12-
SOURCES
12+
SOURCES
1313
src/Plugin.cxx
1414
src/CCDBHelpers.cxx
15+
src/AnalysisCCDBHelpers.cxx
1516
PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src
1617
PUBLIC_LINK_LIBRARIES O2::Framework O2::CCDB)
1718

Framework/CCDBSupport/src/Plugin.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111
#include "Framework/Plugins.h"
1212
#include "Framework/AlgorithmSpec.h"
13+
#include "AnalysisCCDBHelpers.h"
1314
#include "CCDBHelpers.h"
1415

1516
struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
@@ -19,6 +20,14 @@ struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
1920
}
2021
};
2122

23+
struct AnalysisCCDBFetcherPlugin : o2::framework::AlgorithmPlugin {
24+
o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const&) final
25+
{
26+
return o2::framework::AnalysisCCDBHelpers::fetchFromCCDB();
27+
}
28+
};
29+
2230
DEFINE_DPL_PLUGINS_BEGIN
2331
DEFINE_DPL_PLUGIN_INSTANCE(CCDBFetcherPlugin, CustomAlgorithm);
32+
DEFINE_DPL_PLUGIN_INSTANCE(AnalysisCCDBFetcherPlugin, CustomAlgorithm);
2433
DEFINE_DPL_PLUGINS_END

Framework/Core/include/Framework/ASoA.h

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2248,11 +2248,14 @@ ColumnGetterFunction<R, typename T::iterator> getColumnGetterByLabel(const std::
22482248

22492249
namespace o2::aod
22502250
{
2251+
// If you get an error about not satisfying is_origin_hash, you need to add
2252+
// an entry here.
22512253
O2ORIGIN("AOD");
22522254
O2ORIGIN("AOD1");
22532255
O2ORIGIN("AOD2");
22542256
O2ORIGIN("DYN");
22552257
O2ORIGIN("IDX");
2258+
O2ORIGIN("TIM");
22562259
O2ORIGIN("JOIN");
22572260
O2HASH("JOIN/0");
22582261
O2ORIGIN("CONC");
@@ -2313,6 +2316,43 @@ consteval static std::string_view namespace_prefix()
23132316
}; \
23142317
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() }
23152318

2319+
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
2320+
struct _Name_ : o2::soa::Column<std::span<std::byte>, _Name_> { \
2321+
static constexpr const char* mLabel = _Label_; \
2322+
static constexpr const char* query = _CCDBQuery_; \
2323+
static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \
2324+
using base = o2::soa::Column<std::span<std::byte>, _Name_>; \
2325+
using type = std::span<std::byte>; \
2326+
using column_t = _Name_; \
2327+
_Name_(arrow::ChunkedArray const* column) \
2328+
: o2::soa::Column<std::span<std::byte>, _Name_>(o2::soa::ColumnIterator<std::span<std::byte>>(column)) \
2329+
{ \
2330+
} \
2331+
\
2332+
_Name_() = default; \
2333+
_Name_(_Name_ const& other) = default; \
2334+
_Name_& operator=(_Name_ const& other) = default; \
2335+
\
2336+
decltype(auto) _Getter_() const \
2337+
{ \
2338+
static std::byte* payload = nullptr; \
2339+
static _ConcreteType_* deserialised = nullptr; \
2340+
auto span = *mColumnIterator; \
2341+
if (payload != (std::byte*)span.data()) { \
2342+
payload = (std::byte*)span.data(); \
2343+
TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \
2344+
deserialised = (_ConcreteType_*)f.ReadObjectAny(TClass::GetClass(#_ConcreteType_)); \
2345+
} \
2346+
return *deserialised; \
2347+
} \
2348+
\
2349+
decltype(auto) \
2350+
get() const \
2351+
{ \
2352+
return _Getter_(); \
2353+
} \
2354+
};
2355+
23162356
#define DECLARE_SOA_COLUMN(_Name_, _Getter_, _Type_) \
23172357
DECLARE_SOA_COLUMN_FULL(_Name_, _Getter_, _Type_, "f" #_Name_)
23182358

@@ -3188,6 +3228,36 @@ consteval auto getIndexTargets()
31883228
using metadata = _Name_##Metadata; \
31893229
};
31903230

3231+
// Declare were each row is associated to a timestamp column of an _TimestampSource_
3232+
// table.
3233+
//
3234+
// The columns of this table have to be CCDB_COLUMNS so that for each timestamp, we get a row
3235+
// which points to the specified CCDB objectes described by those columns.
3236+
#define DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, _Label_, _TimestampSource_, _TimestampColumn_, _Origin_, _Version_, _Desc_, ...) \
3237+
O2HASH(_Desc_ "/" #_Version_); \
3238+
template <typename O> \
3239+
using _Name_##TimestampFrom = soa::Table<o2::aod::Hash<_Label_ ""_h>, o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, O>; \
3240+
using _Name_##Timestamp = _Name_##TimestampFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3241+
template <typename O = o2::aod::Hash<_Origin_ ""_h>> \
3242+
struct _Name_##TimestampMetadataFrom : TableMetadata<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, __VA_ARGS__> { \
3243+
using base_table_t = _TimestampSource_; \
3244+
using extension_table_t = _Name_##TimestampFrom<O>; \
3245+
using ccdb_pack_t = framework::pack<__VA_ARGS__>; \
3246+
/*static constexpr auto timestampColumn = _TimestampColumn_;*/ \
3247+
}; \
3248+
using _Name_##TimestampMetadata = _Name_##TimestampMetadataFrom<o2::aod::Hash<_Origin_ ""_h>>; \
3249+
template <> \
3250+
struct MetadataTrait<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>> { \
3251+
using metadata = _Name_##TimestampMetadata; \
3252+
}; \
3253+
template <typename O> \
3254+
using _Name_##From = o2::soa::JoinFull<o2::aod::Hash<_Desc_ "/" #_Version_ ""_h>, _TimestampSource_, _Name_##TimestampFrom<O>>; \
3255+
using _Name_ = _Name_##From<o2::aod::Hash<_Origin_ ""_h>>;
3256+
3257+
#define DECLARE_SOA_TIMESTAMPED_TABLE(_Name_, _TimestampSource_, _TimestampColumn_, _Version_, _Desc_, ...) \
3258+
O2HASH(#_Name_ "Timestamped"); \
3259+
DECLARE_SOA_TIMESTAMPED_TABLE_FULL(_Name_, #_Name_ "Timestamped", _TimestampSource_, _TimestampColumn_, "TIM", _Version_, _Desc_, __VA_ARGS__)
3260+
31913261
#define DECLARE_SOA_INDEX_TABLE(_Name_, _Key_, _Description_, ...) \
31923262
DECLARE_SOA_INDEX_TABLE_FULL(_Name_, _Key_, "IDX", 0, _Description_, false, __VA_ARGS__)
31933263

Framework/Core/include/Framework/AnalysisContext.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,24 @@ struct OutputObjectInfo {
2929
std::vector<std::string> bindings;
3030
};
3131

32-
//
32+
// This will keep track of the inputs which have
33+
// been requested and for which we will need to inject
34+
// some source device.
3335
struct AnalysisContext {
3436
std::vector<InputSpec> requestedAODs;
3537
std::vector<OutputSpec> providedAODs;
3638
std::vector<InputSpec> requestedDYNs;
3739
std::vector<OutputSpec> providedDYNs;
3840
std::vector<InputSpec> requestedIDXs;
41+
std::vector<OutputSpec> providedTIMs;
42+
std::vector<InputSpec> requestedTIMs;
3943
std::vector<OutputSpec> providedOutputObjHist;
4044
std::vector<InputSpec> spawnerInputs;
4145

46+
// These are the timestamped tables which are required to
47+
// inject the the CCDB objecs.
48+
std::vector<InputSpec> analysisCCDBInputs;
49+
4250
// Needed to created the hist writer
4351
std::vector<OutputTaskInfo> outTskMap;
4452
std::vector<OutputObjectInfo> outObjHistMap;

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,9 @@ const char* controlOption()
159159
}
160160

161161
template <typename T>
162-
requires(is_spawns<T> || is_builds<T> || is_defines<T>)
162+
concept with_base_table = requires(T const& t) { t.base_specs(); };
163+
164+
template <with_base_table T>
163165
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
164166
{
165167
auto base_specs = entity.base_specs();

Framework/Core/include/Framework/AnalysisSupportHelpers.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ struct AnalysisSupportHelpers {
4343
std::vector<InputSpec>& requestedAODs,
4444
std::vector<InputSpec>& requestedDYNs,
4545
DataProcessorSpec& publisher);
46+
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector<OutputSpec> const& providedTimestampedCCDBObjecs,
47+
std::vector<InputSpec> const& requestedTimestampedCCDBObjects,
48+
DataProcessorSpec& publisher);
4649

47-
/// Match all inputs of kind ATSK and write them to a ROOT file,
48-
/// one root file per originating task.
49-
static DataProcessorSpec getOutputObjHistSink(ConfigContext const&);
50+
/// Match all inputs of kind ATSK and write them to a ROOT file,
51+
/// one root file per originating task.
52+
static DataProcessorSpec getOutputObjHistSink(ConfigContext const&);
5053
/// writes inputs of kind AOD to file
5154
static DataProcessorSpec getGlobalAODSink(ConfigContext const&);
5255
/// Get the data director

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "Framework/TypeIdHelpers.h"
2727
#include "Framework/ArrowTableSlicingCache.h"
2828
#include "Framework/AnalysisDataModel.h"
29+
#include <iostream>
2930

3031
#include <arrow/compute/kernel.h>
3132
#include <arrow/table.h>
@@ -133,6 +134,7 @@ struct AnalysisDataProcessorBuilder {
133134
template <soa::is_table... As>
134135
static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
135136
{
137+
std::cout << "name: " << name << std::endl;
136138
int ai = -1;
137139
([&ai, &hash, &eInfos, &name, &value, &inputs]() mutable {
138140
++ai;

Framework/Core/include/Framework/DataSpecViews.h

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,50 @@ namespace o2::framework::views
1818
{
1919
static auto partial_match_filter(auto what)
2020
{
21-
return std::views::filter([&what](auto const& t) -> bool { return DataSpecUtils::partialMatch(t, what); });
21+
return std::views::filter([what](auto const& t) -> bool { return DataSpecUtils::partialMatch(t, what); });
2222
}
23+
24+
static auto exclude_by_name(std::string name)
25+
{
26+
return std::views::filter([name](auto const& t) -> bool { return t.name != name; });
27+
}
28+
29+
static auto filter_not_matching(auto const& provided)
30+
{
31+
return std::views::filter([&provided](auto const& input) { return std::none_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); });
32+
}
33+
2334
} // namespace o2::framework::views
35+
//
36+
namespace o2::framework::sinks
37+
{
38+
template <class Container>
39+
struct append_to {
40+
Container& c;
41+
// ends the pipeline, returns the container
42+
template <std::ranges::input_range R>
43+
friend Container& operator|(R&& r, append_to self)
44+
{
45+
std::ranges::copy(r, std::back_inserter(self.c));
46+
return self.c;
47+
}
48+
};
49+
50+
template <class Container>
51+
struct update_input_list {
52+
Container& c;
53+
// ends the pipeline, returns the container
54+
template <std::ranges::input_range R>
55+
friend Container& operator|(R&& r, update_input_list self)
56+
{
57+
for (auto& item : r) {
58+
auto copy = item;
59+
DataSpecUtils::updateInputList(self.c, std::move(copy));
60+
}
61+
return self.c;
62+
}
63+
};
64+
65+
} // namespace o2::framework::sinks
2466

2567
#endif // O2_FRAMEWORK_DATASPECVIEWS_H_

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,34 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> c
207207
}
208208
}
209209

210+
void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(
211+
std::vector<OutputSpec> const& providedTimestampedCDBObjects,
212+
std::vector<InputSpec> const& requestedTimestampedCCDBObjects,
213+
DataProcessorSpec& publisher)
214+
{
215+
for (auto& input : requestedTimestampedCCDBObjects) {
216+
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
217+
publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec);
218+
// FIXME: good enough for now...
219+
for (auto& i : input.metadata) {
220+
if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) {
221+
auto value = i.defaultValue.get<std::string>();
222+
std::cout << "XXX " << value << std::endl;
223+
// auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get<std::string>());
224+
// auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; });
225+
// if (j == publisher.inputs.end()) {
226+
// publisher.inputs.push_back(spec);
227+
// }
228+
// if (DataSpecUtils::partialMatch(spec, AODOrigins)) {
229+
// DataSpecUtils::updateInputList(requestedAODs, std::move(spec));
230+
// } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) {
231+
// DataSpecUtils::updateInputList(requestedDYNs, std::move(spec));
232+
// }
233+
}
234+
}
235+
}
236+
}
237+
210238
// =============================================================================
211239
DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx)
212240
{

Framework/Core/src/ArrowSupport.cxx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include "Framework/ServiceRegistry.h"
2020
#include "Framework/ConfigContext.h"
2121
#include "Framework/CommonDataProcessors.h"
22+
#include "Framework/DataSpecUtils.h"
23+
#include "Framework/DataSpecViews.h"
2224
#include "Framework/DeviceSpec.h"
2325
#include "Framework/EndOfStreamContext.h"
2426
#include "Framework/Tracing.h"
@@ -27,6 +29,7 @@
2729
#include "Framework/DeviceInfo.h"
2830
#include "Framework/DevicesManager.h"
2931
#include "Framework/DeviceConfig.h"
32+
#include "Framework/PluginManager.h"
3033
#include "Framework/ServiceMetricsInfo.h"
3134
#include "WorkflowHelpers.h"
3235
#include "Framework/WorkflowSpecNode.h"
@@ -441,13 +444,16 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
441444
.adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
442445
auto& workflow = node.specs;
443446
auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
447+
auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
444448
auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
445449
auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
446450
auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
447451
auto &ac = ctx.services().get<AnalysisContext>();
448452
ac.requestedAODs.clear();
449453
ac.requestedDYNs.clear();
450454
ac.providedDYNs.clear();
455+
ac.providedTIMs.clear();
456+
ac.requestedTIMs.clear();
451457

452458

453459
auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
@@ -511,6 +517,26 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
511517
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
512518
}
513519

520+
if (analysisCCDB != workflow.end()) {
521+
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
522+
d.inputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::update_input_list(ac.requestedTIMs);
523+
d.outputs | views::partial_match_filter(header::DataOrigin{"TIM"}) | sinks::append_to(ac.providedTIMs);
524+
}
525+
std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
526+
std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
527+
// Use ranges::to<std::vector<>> in C++23...
528+
ac.analysisCCDBInputs.clear();
529+
ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to(ac.analysisCCDBInputs);
530+
531+
// recreate inputs and outputs
532+
analysisCCDB->outputs.clear();
533+
analysisCCDB->inputs.clear();
534+
// replace AlgorithmSpec
535+
// FIXME: it should be made more generic, so it does not need replacement...
536+
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
537+
AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher(ac.providedTIMs, ac.requestedTIMs, *analysisCCDB);
538+
}
539+
514540
if (writer != workflow.end()) {
515541
workflow.erase(writer);
516542
}
@@ -538,6 +564,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
538564
}
539565
}
540566

567+
568+
541569
// replace writer as some outputs may have become dangling and some are now consumed
542570
auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
543571

0 commit comments

Comments
 (0)