Skip to content

Commit 475d25f

Browse files
committed
DPL: improve arrow::Dataset integration
- Modularise filesystem to allow easier navigation and support for multiple formats. - Add initial support to multiplex multiple tables on top of the same tree. - Improve support for writing boolean fields.
1 parent c3ffb66 commit 475d25f

File tree

3 files changed

+252
-81
lines changed

3 files changed

+252
-81
lines changed

Framework/Core/include/Framework/RootArrowFilesystem.h

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include <arrow/type_fwd.h>
1818
#include <memory>
1919

20+
class TFile;
21+
class TBranch;
2022
class TTree;
2123
class TBufferFile;
2224
class TDirectoryFile;
@@ -227,11 +229,38 @@ class TTreeFileFormat : public arrow::dataset::FileFormat
227229
const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
228230
};
229231

230-
// An arrow outputstream which allows to write to a ttree
232+
// An arrow outputstream which allows to write to a TDirectoryFile.
233+
// This will point to the location of the file itself. You can
234+
// specify the location of the actual object inside it by passing the
235+
// associated path to the Write() API.
236+
class TDirectoryFileOutputStream : public arrow::io::OutputStream
237+
{
238+
public:
239+
TDirectoryFileOutputStream(TDirectoryFile*);
240+
241+
arrow::Status Close() override;
242+
243+
arrow::Result<int64_t> Tell() const override;
244+
245+
arrow::Status Write(const void* data, int64_t nbytes) override;
246+
247+
bool closed() const override;
248+
249+
TDirectoryFile* GetDirectory()
250+
{
251+
return mDirectory;
252+
}
253+
254+
private:
255+
TDirectoryFile* mDirectory;
256+
};
257+
258+
// An arrow outputstream which allows to write to a TTree. Eventually
259+
// with a prefix for the branches.
231260
class TTreeOutputStream : public arrow::io::OutputStream
232261
{
233262
public:
234-
TTreeOutputStream(TTree* t);
263+
TTreeOutputStream(TTree*, std::string branchPrefix);
235264

236265
arrow::Status Close() override;
237266

@@ -241,13 +270,16 @@ class TTreeOutputStream : public arrow::io::OutputStream
241270

242271
bool closed() const override;
243272

273+
TBranch* CreateBranch(char const* branchName, char const* sizeBranch);
274+
244275
TTree* GetTree()
245276
{
246277
return mTree;
247278
}
248279

249280
private:
250281
TTree* mTree;
282+
std::string mBranchPrefix;
251283
};
252284

253285
} // namespace o2::framework

Framework/Core/src/RootArrowFilesystem.cxx

Lines changed: 116 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include <arrow/array/builder_nested.h>
1818
#include <arrow/array/builder_primitive.h>
1919
#include <memory>
20-
#include <stdexcept>
2120
#include <TFile.h>
2221
#include <TLeaf.h>
2322
#include <TBufferFile.h>
@@ -28,8 +27,11 @@
2827
#include <arrow/dataset/file_base.h>
2928
#include <arrow/result.h>
3029
#include <arrow/status.h>
30+
#include <arrow/util/key_value_metadata.h>
3131
#include <fmt/format.h>
3232

33+
#include <stdexcept>
34+
#include <utility>
3335

3436
O2_DECLARE_DYNAMIC_LOG(root_arrow_fs);
3537

@@ -129,8 +131,15 @@ arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TFileFileSystem::OpenOut
129131
const std::string& path,
130132
const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
131133
{
132-
auto* t = new TTree(path.c_str(), "should put a name here");
133-
auto stream = std::make_shared<TTreeOutputStream>(t);
134+
if (path == "/") {
135+
return std::make_shared<TDirectoryFileOutputStream>(this->GetFile());
136+
}
137+
138+
auto* dir = dynamic_cast<TDirectoryFile*>(this->GetFile()->Get(path.c_str()));
139+
if (!dir) {
140+
throw runtime_error_f("Unable to open directory %s in file %s", path.c_str(), GetFile()->GetName());
141+
}
142+
auto stream = std::make_shared<TDirectoryFileOutputStream>(dir);
134143
return stream;
135144
}
136145

@@ -285,14 +294,48 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> TTreeFileFormat::Ma
285294
return std::dynamic_pointer_cast<arrow::dataset::FileFragment>(fragment);
286295
}
287296

297+
298+
// An arrow outputstream which allows to write to a ttree
299+
TDirectoryFileOutputStream::TDirectoryFileOutputStream(TDirectoryFile* f)
300+
: mDirectory(f)
301+
{
302+
}
303+
304+
arrow::Status TDirectoryFileOutputStream::Close()
305+
{
306+
mDirectory->GetFile()->Close();
307+
return arrow::Status::OK();
308+
}
309+
310+
arrow::Result<int64_t> TDirectoryFileOutputStream::Tell() const
311+
{
312+
return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
313+
}
314+
315+
arrow::Status TDirectoryFileOutputStream::Write(const void* data, int64_t nbytes)
316+
{
317+
return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
318+
}
319+
320+
bool TDirectoryFileOutputStream::closed() const
321+
{
322+
return mDirectory->GetFile()->IsOpen() == false;
323+
}
324+
288325
// An arrow outputstream which allows to write to a ttree
289-
TTreeOutputStream::TTreeOutputStream(TTree* t)
290-
: mTree(t)
326+
// @a branch prefix is to be used to identify a set of branches which all belong to
327+
// the same table.
328+
TTreeOutputStream::TTreeOutputStream(TTree* f, std::string branchPrefix)
329+
: mTree(f),
330+
mBranchPrefix(std::move(branchPrefix))
291331
{
292332
}
293333

294334
arrow::Status TTreeOutputStream::Close()
295335
{
336+
if (mTree->GetCurrentFile() == nullptr) {
337+
return arrow::Status::Invalid("Cannot close a tree not attached to a file");
338+
}
296339
mTree->GetCurrentFile()->Close();
297340
return arrow::Status::OK();
298341
}
@@ -309,9 +352,18 @@ arrow::Status TTreeOutputStream::Write(const void* data, int64_t nbytes)
309352

310353
bool TTreeOutputStream::closed() const
311354
{
355+
// A standalone tree is never closed.
356+
if (mTree->GetCurrentFile() == nullptr) {
357+
return false;
358+
}
312359
return mTree->GetCurrentFile()->IsOpen() == false;
313360
}
314361

362+
TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch)
363+
{
364+
return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str());
365+
}
366+
315367
char const* rootSuffixFromArrow(arrow::Type::type id)
316368
{
317369
switch (id) {
@@ -411,8 +463,24 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
411463
: FileWriter(schema, options, destination, destination_locator)
412464
{
413465
// Batches have the same number of entries for each column.
466+
auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
414467
auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
415-
TTree* tree = treeStream->GetTree();
468+
469+
if (directoryStream.get()) {
470+
TDirectoryFile* dir = directoryStream->GetDirectory();
471+
dir->cd();
472+
auto* tree = new TTree(destination_locator_.path.c_str(), "");
473+
treeStream = std::make_shared<TTreeOutputStream>(tree, "");
474+
} else if (treeStream.get()) {
475+
// We already have a tree stream, let's derive a new one
476+
// with the destination_locator_.path as prefix for the branches
477+
// This way we can multiplex multiple tables in the same tree.
478+
auto tree = treeStream->GetTree();
479+
treeStream = std::make_shared<TTreeOutputStream>(tree, destination_locator_.path);
480+
} else {
481+
// I could simply set a prefix here to merge to an already existing tree.
482+
throw std::runtime_error("Unsupported backend.");
483+
}
416484

417485
for (auto i = 0u; i < schema->fields().size(); ++i) {
418486
auto& field = schema->field(i);
@@ -427,23 +495,23 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
427495
valueTypes.push_back(field->type()->field(0)->type());
428496
sizesBranches.push_back(nullptr);
429497
std::string leafList = fmt::format("{}[{}]{}", field->name(), listSizes.back(), rootSuffixFromArrow(valueTypes.back()->id()));
430-
branches.push_back(tree->Branch(field->name().c_str(), (char*)nullptr, leafList.c_str()));
498+
branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
431499
} break;
432500
case arrow::Type::LIST: {
433501
valueTypes.push_back(field->type()->field(0)->type());
434502
listSizes.back() = 0; // VLA, we need to calculate it on the fly;
435503
std::string leafList = fmt::format("{}[{}_size]{}", field->name(), field->name(), rootSuffixFromArrow(valueTypes.back()->id()));
436504
std::string sizeLeafList = field->name() + "_size/I";
437-
sizesBranches.push_back(tree->Branch((field->name() + "_size").c_str(), (char*)nullptr, sizeLeafList.c_str()));
438-
branches.push_back(tree->Branch(field->name().c_str(), (char*)nullptr, leafList.c_str()));
505+
sizesBranches.push_back(treeStream->CreateBranch((field->name() + "_size").c_str(), sizeLeafList.c_str()));
506+
branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
439507
// Notice that this could be replaced by a better guess of the
440508
// average size of the list elements, but this is not trivial.
441509
} break;
442510
default: {
443511
valueTypes.push_back(field->type());
444512
std::string leafList = field->name() + rootSuffixFromArrow(valueTypes.back()->id());
445513
sizesBranches.push_back(nullptr);
446-
branches.push_back(tree->Branch(field->name().c_str(), (char*)nullptr, leafList.c_str()));
514+
branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
447515
} break;
448516
}
449517
}
@@ -463,11 +531,18 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
463531
}
464532

465533
// Batches have the same number of entries for each column.
534+
auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
535+
TTree* tree = nullptr;
536+
if (directoryStream.get()) {
537+
TDirectoryFile* dir = directoryStream->GetDirectory();
538+
tree = (TTree*)dir->Get(destination_locator_.path.c_str());
539+
}
466540
auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
467-
TTree* tree = treeStream->GetTree();
468541

469-
// Caches for the vectors of bools.
470-
std::vector<std::shared_ptr<arrow::UInt8Array>> caches;
542+
if (!tree) {
543+
// I could simply set a prefix here to merge to an already existing tree.
544+
throw std::runtime_error("Unsupported backend.");
545+
}
471546

472547
for (auto i = 0u; i < batch->columns().size(); ++i) {
473548
auto column = batch->column(i);
@@ -484,24 +559,12 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
484559
auto list = std::static_pointer_cast<arrow::ListArray>(column);
485560
valueArrays.back() = list;
486561
} break;
487-
default:
488-
valueArrays.back() = column;
489-
}
490-
}
491-
492-
int64_t pos = 0;
493-
while (pos < batch->num_rows()) {
494-
for (size_t bi = 0; bi < branches.size(); ++bi) {
495-
auto* branch = branches[bi];
496-
auto* sizeBranch = sizesBranches[bi];
497-
auto array = batch->column(bi);
498-
auto& field = batch->schema()->field(bi);
499-
auto& listSize = listSizes[bi];
500-
auto valueType = valueTypes[bi];
501-
auto valueArray = valueArrays[bi];
562+
case arrow::Type::BOOL: {
563+
// In case of arrays of booleans, we need to go back to their
564+
// char based representation for ROOT to save them.
565+
valueArrays.back() = std::make_shared<arrow::UInt8Array>();
566+
auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
502567

503-
if (field->type()->id() == arrow::Type::BOOL) {
504-
auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(array);
505568
int64_t length = boolArray->length();
506569
arrow::UInt8Builder builder;
507570
auto ok = builder.Reserve(length);
@@ -516,11 +579,24 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
516579
auto ok = builder.AppendNull();
517580
}
518581
}
519-
520-
ok = builder.Finish(&caches[bi]);
521-
branch->SetAddress((void*)(caches[bi]->values()->data()));
522-
continue;
582+
ok = builder.Finish(&valueArrays.back());
523583
}
584+
default:
585+
valueArrays.back() = column;
586+
}
587+
}
588+
589+
int64_t pos = 0;
590+
while (pos < batch->num_rows()) {
591+
for (size_t bi = 0; bi < branches.size(); ++bi) {
592+
auto* branch = branches[bi];
593+
auto* sizeBranch = sizesBranches[bi];
594+
auto array = batch->column(bi);
595+
auto& field = batch->schema()->field(bi);
596+
auto& listSize = listSizes[bi];
597+
auto valueType = valueTypes[bi];
598+
auto valueArray = valueArrays[bi];
599+
524600
switch (field->type()->id()) {
525601
case arrow::Type::LIST: {
526602
auto list = std::static_pointer_cast<arrow::ListArray>(array);
@@ -769,8 +845,12 @@ arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TTreeFileSystem::OpenOut
769845
const std::string& path,
770846
const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
771847
{
772-
auto stream = std::make_shared<TTreeOutputStream>(GetTree({path, shared_from_this()}));
773-
return stream;
848+
arrow::dataset::FileSource source{path, shared_from_this()};
849+
auto prefix = metadata->Get("branch_prefix");
850+
if (prefix.ok()) {
851+
return std::make_shared<TTreeOutputStream>(GetTree(source), *prefix);
852+
}
853+
return std::make_shared<TTreeOutputStream>(GetTree(source), "");
774854
}
775855

776856
TBufferFileFS::TBufferFileFS(TBufferFile* f)

0 commit comments

Comments
 (0)