Skip to content

Commit 2249241

Browse files
authored
DPL: example on how to use the Arrow Dataset API (AliceO2Group#13646)
1 parent 7c0b93e commit 2249241

File tree

5 files changed

+862
-0
lines changed

5 files changed

+862
-0
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ o2_add_library(Framework
117117
src/ResourcesMonitoringHelper.cxx
118118
src/ResourcePolicy.cxx
119119
src/ResourcePolicyHelpers.cxx
120+
src/RootArrowFilesystem.cxx
120121
src/SendingPolicy.cxx
121122
src/ServiceRegistry.cxx
122123
src/ServiceSpec.cxx
@@ -166,6 +167,7 @@ o2_add_library(Framework
166167
O2::X9
167168
RapidJSON::RapidJSON
168169
Arrow::arrow_shared
170+
ArrowDataset::arrow_dataset_shared
169171
Microsoft.GSL::GSL
170172
O2::FrameworkLogger
171173
Gandiva::gandiva_shared
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#ifndef O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_
12+
#define O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_
13+
14+
#include <arrow/dataset/type_fwd.h>
15+
#include <arrow/dataset/file_base.h>
16+
#include <arrow/filesystem/type_fwd.h>
17+
#include <arrow/type_fwd.h>
18+
19+
class TTree;
20+
class TBufferFile;
21+
class TDirectoryFile;
22+
23+
namespace o2::framework
24+
{
25+
26+
class TTreeFileWriteOptions : public arrow::dataset::FileWriteOptions
27+
{
28+
public:
29+
TTreeFileWriteOptions(std::shared_ptr<arrow::dataset::FileFormat> format)
30+
: FileWriteOptions(format)
31+
{
32+
}
33+
};
34+
35+
// This is a virtual filesystem based on a ttree, where branches with the
36+
// same prefix get grouped into a fragment
37+
class TTreeFileSystem : public arrow::fs::FileSystem
38+
{
39+
public:
40+
arrow::Result<arrow::fs::FileInfo> GetFileInfo(const std::string& path) override;
41+
arrow::Result<arrow::fs::FileInfoVector> GetFileInfo(const arrow::fs::FileSelector& select) override;
42+
43+
bool Equals(const FileSystem& other) const override
44+
{
45+
return this->type_name() == other.type_name();
46+
}
47+
48+
arrow::Status CreateDir(const std::string& path, bool recursive) override;
49+
50+
arrow::Status DeleteDir(const std::string& path) override;
51+
52+
arrow::Status CopyFile(const std::string& src, const std::string& dest) override;
53+
54+
arrow::Status Move(const std::string& src, const std::string& dest) override;
55+
56+
arrow::Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override;
57+
58+
arrow::Status DeleteRootDirContents() override;
59+
60+
arrow::Status DeleteFile(const std::string& path) override;
61+
62+
arrow::Result<std::shared_ptr<arrow::io::InputStream>> OpenInputStream(const std::string& path) override;
63+
64+
arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> OpenInputFile(const std::string& path) override;
65+
66+
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
67+
const std::string& path,
68+
const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;
69+
70+
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenAppendStream(
71+
const std::string& path,
72+
const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;
73+
74+
virtual TTree* GetTree(arrow::dataset::FileSource) = 0;
75+
};
76+
77+
class SingleTreeFileSystem : public TTreeFileSystem
78+
{
79+
public:
80+
SingleTreeFileSystem(TTree* tree)
81+
: TTreeFileSystem(),
82+
mTree(tree)
83+
{
84+
}
85+
86+
std::string type_name() const override
87+
{
88+
return "ttree";
89+
}
90+
91+
TTree* GetTree(arrow::dataset::FileSource) override
92+
{
93+
// Simply return the only TTree we have
94+
return mTree;
95+
}
96+
97+
private:
98+
TTree* mTree;
99+
};
100+
101+
class TFileFileSystem : public TTreeFileSystem
102+
{
103+
public:
104+
TFileFileSystem(TDirectoryFile* f, size_t readahead);
105+
106+
std::string type_name() const override
107+
{
108+
return "TDirectoryFile";
109+
}
110+
111+
TTree* GetTree(arrow::dataset::FileSource source) override;
112+
113+
// We can go back to the TFile in case this is needed.
114+
TDirectoryFile* GetFile()
115+
{
116+
return mFile;
117+
}
118+
119+
private:
120+
TDirectoryFile* mFile;
121+
};
122+
123+
class TBufferFileFS : public TTreeFileSystem
124+
{
125+
public:
126+
TBufferFileFS(TBufferFile* f);
127+
128+
std::string type_name() const override
129+
{
130+
return "tbufferfile";
131+
}
132+
133+
TTree* GetTree(arrow::dataset::FileSource) override
134+
{
135+
// Simply return the only TTree we have
136+
return mTree;
137+
}
138+
139+
private:
140+
TTree* mTree;
141+
};
142+
143+
class TTreeFileFragment : public arrow::dataset::FileFragment
144+
{
145+
public:
146+
TTreeFileFragment(arrow::dataset::FileSource source,
147+
std::shared_ptr<arrow::dataset::FileFormat> format,
148+
arrow::compute::Expression partition_expression,
149+
std::shared_ptr<arrow::Schema> physical_schema)
150+
: FileFragment(std::move(source), std::move(format), std::move(partition_expression), std::move(physical_schema))
151+
{
152+
}
153+
};
154+
155+
class TTreeFileFormat : public arrow::dataset::FileFormat
156+
{
157+
size_t& mTotCompressedSize;
158+
size_t& mTotUncompressedSize;
159+
160+
public:
161+
TTreeFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize)
162+
: FileFormat({}),
163+
mTotCompressedSize(totalCompressedSize),
164+
mTotUncompressedSize(totalUncompressedSize)
165+
{
166+
}
167+
168+
~TTreeFileFormat() override = default;
169+
170+
std::string type_name() const override
171+
{
172+
return "ttree";
173+
}
174+
175+
bool Equals(const FileFormat& other) const override
176+
{
177+
return other.type_name() == this->type_name();
178+
}
179+
180+
arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
181+
{
182+
auto fs = std::dynamic_pointer_cast<TTreeFileSystem>(source.filesystem());
183+
return fs != nullptr;
184+
}
185+
186+
arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
187+
/// \brief Create a FileFragment for a FileSource.
188+
arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> MakeFragment(
189+
arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
190+
std::shared_ptr<arrow::Schema> physical_schema) override;
191+
192+
arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const override;
193+
194+
std::shared_ptr<arrow::dataset::FileWriteOptions> DefaultWriteOptions() override;
195+
196+
arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
197+
const std::shared_ptr<arrow::dataset::ScanOptions>& options,
198+
const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
199+
};
200+
201+
} // namespace o2::framework
202+
203+
#endif // O2_FRAMEWORK_ROOT_ARROW_FILESYSTEM_H_

0 commit comments

Comments
 (0)