Skip to content

Commit ed49d1e

Browse files
authored
feat: implement Apache Avro file reader (#113)
- Refactor a little bit of Reader and ReaderFactory interfaces. - Implement the skeleton of Avro reader to read data into ArrowArray. - `AppendDatumToBuilder` is not implemented yet.
1 parent 33f2a8a commit ed49d1e

File tree

14 files changed

+401
-24
lines changed

14 files changed

+401
-24
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ if(ICEBERG_BUILD_BUNDLE)
9595
arrow/demo_arrow.cc
9696
arrow/arrow_fs_file_io.cc
9797
avro/demo_avro.cc
98+
avro/avro_data_util.cc
99+
avro/avro_reader.cc
98100
avro/avro_schema_util.cc
99101
avro/avro_stream_internal.cc)
100102

src/iceberg/arrow/arrow_fs_file_io.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
4646
/// \brief Delete a file at the given location.
4747
Status DeleteFile(const std::string& file_location) override;
4848

49+
/// \brief Get the Arrow file system.
50+
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; }
51+
4952
private:
5053
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
5154
};

src/iceberg/avro/avro_data_util.cc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_data_util_internal.h"
21+
22+
namespace iceberg::avro {
23+
24+
Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
25+
const ::avro::GenericDatum& avro_datum,
26+
const SchemaProjection& projection,
27+
const Schema& arrow_schema,
28+
::arrow::ArrayBuilder* array_builder) {
29+
return NotImplemented("AppendDatumToBuilder is not yet implemented");
30+
}
31+
32+
} // namespace iceberg::avro
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <arrow/array/builder_base.h>
23+
#include <avro/GenericDatum.hh>
24+
25+
#include "iceberg/schema_util.h"
26+
27+
namespace iceberg::avro {
28+
29+
Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node,
30+
const ::avro::GenericDatum& avro_datum,
31+
const SchemaProjection& projection,
32+
const Schema& arrow_schema,
33+
::arrow::ArrayBuilder* array_builder);
34+
35+
} // namespace iceberg::avro

src/iceberg/avro/avro_reader.cc

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_reader.h"
21+
22+
#include <memory>
23+
24+
#include <arrow/array/builder_base.h>
25+
#include <arrow/c/bridge.h>
26+
#include <arrow/filesystem/filesystem.h>
27+
#include <arrow/record_batch.h>
28+
#include <arrow/result.h>
29+
#include <arrow/type.h>
30+
#include <avro/DataFile.hh>
31+
#include <avro/Generic.hh>
32+
#include <avro/GenericDatum.hh>
33+
34+
#include "iceberg/arrow/arrow_fs_file_io.h"
35+
#include "iceberg/avro/avro_data_util_internal.h"
36+
#include "iceberg/avro/avro_schema_util_internal.h"
37+
#include "iceberg/avro/avro_stream_internal.h"
38+
#include "iceberg/schema_internal.h"
39+
#include "iceberg/util/checked_cast.h"
40+
#include "iceberg/util/macros.h"
41+
42+
namespace iceberg::avro {
43+
44+
namespace {
45+
46+
Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions& options,
47+
int64_t buffer_size) {
48+
::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
49+
if (options.length) {
50+
file_info.set_size(options.length.value());
51+
}
52+
53+
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
54+
auto result = io->fs()->OpenInputFile(file_info);
55+
if (!result.ok()) {
56+
return IOError("Failed to open file {} for {}", options.path,
57+
result.status().message());
58+
}
59+
60+
return std::make_unique<AvroInputStream>(result.MoveValueUnsafe(), buffer_size);
61+
}
62+
63+
} // namespace
64+
65+
// A stateful context to keep track of the reading progress.
66+
struct ReadContext {
67+
// The datum to reuse for reading the data.
68+
std::unique_ptr<::avro::GenericDatum> datum_;
69+
// The arrow schema to build the record batch.
70+
std::shared_ptr<::arrow::Schema> arrow_schema_;
71+
// The builder to build the record batch.
72+
std::shared_ptr<::arrow::ArrayBuilder> builder_;
73+
};
74+
75+
// TODO(gang.wu): there are a lot to do to make this reader work.
76+
// 1. prune the reader schema based on the projection
77+
// 2. read key-value metadata from the avro file
78+
// 3. collect basic reader metrics
79+
class AvroBatchReader::Impl {
80+
public:
81+
Status Open(const ReaderOptions& options) {
82+
batch_size_ = options.batch_size;
83+
read_schema_ = options.projection;
84+
85+
// Open the input stream and adapt to the avro interface.
86+
// TODO(gangwu): make this configurable
87+
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
88+
ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
89+
CreateInputStream(options, kDefaultBufferSize));
90+
91+
// Create a base reader without setting reader schema to enable projection.
92+
auto base_reader =
93+
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
94+
const ::avro::ValidSchema& file_schema = base_reader->dataSchema();
95+
96+
// Validate field ids in the file schema.
97+
HasIdVisitor has_id_visitor;
98+
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
99+
if (has_id_visitor.HasNoIds()) {
100+
// TODO(gangwu): support applying field-ids based on name mapping
101+
return NotImplemented("Avro file schema has no field IDs");
102+
}
103+
if (!has_id_visitor.AllHaveIds()) {
104+
return InvalidSchema("Not all fields in the Avro file schema have field IDs");
105+
}
106+
107+
// Project the read schema on top of the file schema.
108+
// TODO(gangwu): support pruning source fields
109+
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(),
110+
/*prune_source=*/false));
111+
base_reader->init(file_schema);
112+
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
113+
std::move(base_reader));
114+
115+
if (options.split) {
116+
reader_->sync(options.split->offset);
117+
split_end_ = options.split->offset + options.split->length;
118+
}
119+
return {};
120+
}
121+
122+
Result<Data> Next() {
123+
if (!context_) {
124+
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
125+
}
126+
127+
while (context_->builder_->length() < batch_size_) {
128+
if (split_end_ && reader_->pastSync(split_end_.value())) {
129+
break;
130+
}
131+
if (!reader_->read(*context_->datum_)) {
132+
break;
133+
}
134+
ICEBERG_RETURN_UNEXPECTED(
135+
AppendDatumToBuilder(reader_->readerSchema().root(), *context_->datum_,
136+
projection_, *read_schema_, context_->builder_.get()));
137+
}
138+
139+
return ConvertBuilderToArrowArray();
140+
}
141+
142+
Status Close() {
143+
if (reader_ != nullptr) {
144+
reader_->close();
145+
reader_.reset();
146+
}
147+
context_.reset();
148+
return {};
149+
}
150+
151+
private:
152+
Status InitReadContext() {
153+
context_ = std::make_unique<ReadContext>();
154+
context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema());
155+
156+
ArrowSchema arrow_schema;
157+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
158+
auto import_result = ::arrow::ImportSchema(&arrow_schema);
159+
if (!import_result.ok()) {
160+
return InvalidSchema("Failed to import the arrow schema: {}",
161+
import_result.status().message());
162+
}
163+
context_->arrow_schema_ = import_result.MoveValueUnsafe();
164+
165+
auto arrow_struct_type =
166+
std::make_shared<::arrow::StructType>(context_->arrow_schema_->fields());
167+
auto builder_result = ::arrow::MakeBuilder(arrow_struct_type);
168+
if (!builder_result.ok()) {
169+
return InvalidSchema("Failed to make the arrow builder: {}",
170+
builder_result.status().message());
171+
}
172+
context_->builder_ = builder_result.MoveValueUnsafe();
173+
174+
return {};
175+
}
176+
177+
Result<Data> ConvertBuilderToArrowArray() {
178+
if (context_->builder_->length() == 0) {
179+
return {};
180+
}
181+
182+
auto builder_result = context_->builder_->Finish();
183+
if (!builder_result.ok()) {
184+
return InvalidArrowData("Failed to finish the arrow array builder: {}",
185+
builder_result.status().message());
186+
}
187+
188+
auto array = builder_result.MoveValueUnsafe();
189+
ArrowArray arrow_array;
190+
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
191+
if (!export_result.ok()) {
192+
return InvalidArrowData("Failed to export the arrow array: {}",
193+
export_result.message());
194+
}
195+
return arrow_array;
196+
}
197+
198+
private:
199+
// Max number of rows in the record batch to read.
200+
int64_t batch_size_{};
201+
// The end of the split to read and used to terminate the reading.
202+
std::optional<int64_t> split_end_;
203+
// The schema to read.
204+
std::shared_ptr<Schema> read_schema_;
205+
// The projection result to apply to the read schema.
206+
SchemaProjection projection_;
207+
// The avro reader to read the data into a datum.
208+
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
209+
// The context to keep track of the reading progress.
210+
std::unique_ptr<ReadContext> context_;
211+
};
212+
213+
Result<Reader::Data> AvroBatchReader::Next() { return impl_->Next(); }
214+
215+
Status AvroBatchReader::Open(const ReaderOptions& options) {
216+
impl_ = std::make_unique<Impl>();
217+
return impl_->Open(options);
218+
}
219+
220+
Status AvroBatchReader::Close() { return impl_->Close(); }
221+
222+
} // namespace iceberg::avro

src/iceberg/avro/avro_reader.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_reader.h"
23+
#include "iceberg/iceberg_bundle_export.h"
24+
25+
namespace iceberg::avro {
26+
27+
/// \brief A reader that reads ArrowArray from Avro files.
28+
class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader {
29+
public:
30+
AvroBatchReader() = default;
31+
32+
~AvroBatchReader() override = default;
33+
34+
Status Open(const ReaderOptions& options) final;
35+
36+
Status Close() final;
37+
38+
Result<Data> Next() final;
39+
40+
DataLayout data_layout() const final { return DataLayout::kArrowArray; }
41+
42+
private:
43+
class Impl;
44+
std::unique_ptr<Impl> impl_;
45+
};
46+
47+
} // namespace iceberg::avro

src/iceberg/avro/demo_avro.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ Reader::DataLayout DemoAvroReader::data_layout() const {
5555
return Reader::DataLayout::kStructLike;
5656
}
5757

58-
ICEBERG_REGISTER_READER_FACTORY(
59-
Avro, [](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
60-
return std::make_unique<DemoAvroReader>();
61-
});
58+
Status DemoAvroReader::Open(const ReaderOptions& options) { return {}; }
59+
60+
Status DemoAvroReader::Close() { return {}; }
61+
62+
ICEBERG_REGISTER_READER_FACTORY(Avro, []() -> Result<std::unique_ptr<Reader>> {
63+
return std::make_unique<DemoAvroReader>();
64+
});
6265

6366
} // namespace iceberg::avro

src/iceberg/avro/demo_avro.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader {
3838
public:
3939
DemoAvroReader() = default;
4040
~DemoAvroReader() override = default;
41+
Status Open(const ReaderOptions& options) override;
42+
Status Close() override;
4143
Result<Data> Next() override;
4244
DataLayout data_layout() const override;
4345
};

0 commit comments

Comments
 (0)