Skip to content

Commit 95f780c

Browse files
authored
Merge 6064fb9 into sapling-pr-archive-ktf
2 parents 071b329 + 6064fb9 commit 95f780c

File tree

13 files changed

+766
-301
lines changed

13 files changed

+766
-301
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "AODJAlienReaderHelpers.h"
13+
#include <memory>
1314
#include "Framework/TableTreeHelpers.h"
1415
#include "Framework/AnalysisHelpers.h"
1516
#include "Framework/DataProcessingStats.h"
1617
#include "Framework/RootTableBuilderHelpers.h"
18+
#include "Framework/RootArrowFilesystem.h"
1719
#include "Framework/AlgorithmSpec.h"
1820
#include "Framework/ConfigParamRegistry.h"
1921
#include "Framework/ControlService.h"
@@ -41,6 +43,8 @@
4143
#include <arrow/io/interfaces.h>
4244
#include <arrow/table.h>
4345
#include <arrow/util/key_value_metadata.h>
46+
#include <arrow/dataset/dataset.h>
47+
#include <arrow/dataset/file_base.h>
4448

4549
using namespace o2;
4650
using namespace o2::aod;
@@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
272276
// Origin file name for derived output map
273277
auto o2 = Output(TFFileNameHeader);
274278
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
275-
std::string currentFilename(fileAndFolder.file->GetName());
276-
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
279+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
280+
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
281+
std::string currentFilename(f->GetFile()->GetName());
282+
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
277283
// This is not an absolute local path. Make it absolute.
278284
static std::string pwd = gSystem->pwd() + std::string("/");
279-
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
285+
currentFilename = pwd + std::string(f->GetName());
280286
}
281287
outputs.make<std::string>(o2) = currentFilename;
282288
}
@@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
312318
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
313319
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
314320
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
315-
if (!fileAndFolder.file) {
321+
322+
// In case the filesource is empty, move to the next one.
323+
if (fileAndFolder.path().empty()) {
316324
fcnt += 1;
317325
ntf = 0;
318326
if (didir->atEnd(fcnt)) {

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 129 additions & 64 deletions
Large diffs are not rendered by default.

Framework/AnalysisSupport/src/DataInputDirector.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515

1616
#include "Framework/DataDescriptorMatcher.h"
1717
#include "Framework/DataAllocator.h"
18+
#include "Framework/RootArrowFilesystem.h"
19+
20+
#include <arrow/filesystem/filesystem.h>
21+
#include <arrow/dataset/dataset.h>
1822

1923
#include <regex>
2024
#include "rapidjson/fwd.h"
@@ -31,16 +35,10 @@ struct FileNameHolder {
3135
std::string fileName;
3236
int numberOfTimeFrames = 0;
3337
std::vector<uint64_t> listOfTimeFrameNumbers;
34-
std::vector<std::string> listOfTimeFrameKeys;
3538
std::vector<bool> alreadyRead;
3639
};
3740
FileNameHolder* makeFileNameHolder(std::string fileName);
3841

39-
struct FileAndFolder {
40-
TFile* file = nullptr;
41-
std::string folderName = "";
42-
};
43-
4442
class DataInputDescriptor
4543
{
4644
/// Holds information concerning the reading of an aod table.
@@ -52,7 +50,6 @@ class DataInputDescriptor
5250
std::string treename = "";
5351
std::unique_ptr<data_matcher::DataDescriptorMatcher> matcher;
5452

55-
DataInputDescriptor() = default;
5653
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");
5754

5855
void printOut();
@@ -78,7 +75,7 @@ class DataInputDescriptor
7875
int findDFNumber(int file, std::string dfName);
7976

8077
uint64_t getTimeFrameNumber(int counter, int numTF);
81-
FileAndFolder getFileFolder(int counter, int numTF);
78+
arrow::dataset::FileSource getFileFolder(int counter, int numTF);
8279
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename);
8380
int getTimeFramesInFile(int counter);
8481
int getReadTimeFramesInFile(int counter);
@@ -90,6 +87,7 @@ class DataInputDescriptor
9087
bool isAlienSupportOn() { return mAlienSupport; }
9188

9289
private:
90+
o2::framework::RootObjectReadingFactory mFactory;
9391
std::string minputfilesFile = "";
9492
std::string* minputfilesFilePtr = nullptr;
9593
std::string mFilenameRegex = "";
@@ -98,7 +96,7 @@ class DataInputDescriptor
9896
std::string mParentFileReplacement;
9997
std::vector<FileNameHolder*> mfilenames;
10098
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
101-
TFile* mcurrentFile = nullptr;
99+
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
102100
int mCurrentFileID = -1;
103101
bool mAlienSupport = false;
104102

@@ -127,7 +125,6 @@ class DataInputDirector
127125
~DataInputDirector();
128126

129127
void reset();
130-
void createDefaultDataInputDescriptor();
131128
void printOut();
132129
bool atEnd(int counter);
133130

@@ -140,10 +137,11 @@ class DataInputDirector
140137
// getters
141138
DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh);
142139
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }
140+
void createDefaultDataInputDescriptor();
143141

144142
bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
145143
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
146-
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
144+
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
147145
int getTimeFramesInFile(header::DataHeader dh, int counter);
148146

149147
uint64_t getTotalSizeCompressed();

Framework/AnalysisSupport/src/Plugin.cxx

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,20 +85,48 @@ std::vector<std::string> getListOfTables(std::unique_ptr<TFile>& f)
8585
{
8686
std::vector<std::string> r;
8787
TList* keyList = f->GetListOfKeys();
88+
// We should handle two cases, one where the list of tables in a TDirectory,
89+
// the other one where the dataframe number is just a prefix
90+
std::string first = "";
8891

8992
for (auto key : *keyList) {
90-
if (!std::string_view(key->GetName()).starts_with("DF_")) {
93+
if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) {
9194
continue;
9295
}
93-
auto* d = (TDirectory*)f->Get(key->GetName());
94-
TList* branchList = d->GetListOfKeys();
95-
for (auto b : *branchList) {
96-
r.emplace_back(b->GetName());
96+
auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory"));
97+
// Objects are in a folder, list it.
98+
if (d) {
99+
TList* branchList = d->GetListOfKeys();
100+
for (auto b : *branchList) {
101+
r.emplace_back(b->GetName());
102+
}
103+
break;
104+
}
105+
106+
void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple"));
107+
if (v) {
108+
std::string s = key->GetName();
109+
size_t pos = s.find('-');
110+
// Check if '-' is found
111+
// Skip metaData and parentFiles
112+
if (pos == std::string::npos) {
113+
continue;
114+
}
115+
std::string t = s.substr(pos + 1);
116+
// If we find a duplicate table name, it means we are in the next DF and we can stop.
117+
if (t == first) {
118+
break;
119+
}
120+
if (first.empty()) {
121+
first = t;
122+
}
123+
// Create a new string starting after the '-'
124+
r.emplace_back(t);
97125
}
98-
break;
99126
}
100127
return r;
101128
}
129+
102130
auto readMetadata(std::unique_ptr<TFile>& currentFile) -> std::vector<ConfigParamSpec>
103131
{
104132
// Get the metadata, if any

Framework/AnalysisSupport/src/RNTuplePlugin.cxx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "Framework/RuntimeError.h"
1313
#include "Framework/RootArrowFilesystem.h"
1414
#include "Framework/Plugins.h"
15+
#include "Framework/FairMQResizableBuffer.h"
1516
#include <ROOT/RNTupleModel.hxx>
1617
#include <ROOT/RNTupleWriteOptions.hxx>
1718
#include <ROOT/RNTupleWriter.hxx>
@@ -852,7 +853,10 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin {
852853
return new RootArrowFactory{
853854
.options = [context]() { return context->format->DefaultWriteOptions(); },
854855
.format = [context]() { return context->format; },
855-
};
856+
.deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
857+
auto treeFragment = std::dynamic_pointer_cast<RNTupleFileFragment>(fragment);
858+
return std::make_shared<FairMQOutputStream>(buffer);
859+
}};
856860
}
857861
};
858862

0 commit comments

Comments
 (0)