Skip to content

Commit 18c24b3

Browse files
committed
feat: implement avro file reader
1 parent 5aec271 commit 18c24b3

File tree

10 files changed

+327
-12
lines changed

10 files changed

+327
-12
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ if(ICEBERG_BUILD_BUNDLE)
8888
arrow/demo_arrow.cc
8989
arrow/arrow_fs_file_io.cc
9090
avro/demo_avro.cc
91+
avro/avro_data_util.cc
92+
avro/avro_reader.cc
9193
avro/avro_schema_util.cc
9294
avro/avro_stream_internal.cc)
9395

src/iceberg/arrow/arrow_fs_file_io.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
4646
/// \brief Delete a file at the given location.
4747
Status DeleteFile(const std::string& file_location) override;
4848

49+
/// \brief Get the Arrow file system.
50+
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; }
51+
4952
private:
5053
std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_;
5154
};

src/iceberg/avro/avro_data_util.cc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_data_util_internal.h"
21+
22+
namespace iceberg::avro {} // namespace iceberg::avro
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
namespace iceberg::avro {} // namespace iceberg::avro

src/iceberg/avro/avro_reader.cc

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_reader.h"
21+
22+
#include <memory>
23+
24+
#include <arrow/array/builder_base.h>
25+
#include <arrow/c/bridge.h>
26+
#include <arrow/filesystem/filesystem.h>
27+
#include <arrow/result.h>
28+
#include <arrow/type.h>
29+
#include <avro/DataFile.hh>
30+
#include <avro/GenericDatum.hh>
31+
32+
#include "iceberg/arrow/arrow_fs_file_io.h"
33+
#include "iceberg/avro/avro_schema_util_internal.h"
34+
#include "iceberg/avro/avro_stream_internal.h"
35+
#include "iceberg/schema_internal.h"
36+
#include "iceberg/util/checked_cast.h"
37+
#include "iceberg/util/macros.h"
38+
39+
namespace iceberg::avro {
40+
41+
namespace {
42+
43+
Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions& options,
44+
int64_t buffer_size) {
45+
::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
46+
if (options.length) {
47+
file_info.set_size(options.length.value());
48+
}
49+
50+
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
51+
auto result = io->fs()->OpenInputFile(file_info);
52+
if (!result.ok()) {
53+
return IOError("Failed to open file {} for {}", options.path,
54+
result.status().message());
55+
}
56+
57+
return std::make_unique<AvroInputStream>(result.MoveValueUnsafe(), buffer_size);
58+
}
59+
60+
} // namespace
61+
62+
// A stateful context to keep track of the reading progress.
63+
struct ReadContext {
64+
// The datum to reuse for reading the data.
65+
std::unique_ptr<::avro::GenericDatum> datum_;
66+
// The arrow schema to build the record batch.
67+
std::shared_ptr<::arrow::Schema> arrow_schema_;
68+
// The builder to build the record batch.
69+
std::shared_ptr<::arrow::ArrayBuilder> builder_;
70+
};
71+
72+
// TODO(gang.wu): there are a lot to do to make this reader work.
73+
// 1. read the datum from the avro file
74+
// 2. append the datum to the builder until it reaches the batch size
75+
// 3. convert the builder to the record batch
76+
// 4. check if the reader has reached the split end
77+
// 5. prune the reader schema based on the projection
78+
// 6. read key-value metadata from the avro file
79+
// 7. collect basic reader metrics
80+
class AvroBatchReader::Impl {
81+
public:
82+
Result<Data> Next();
83+
84+
Status Open(const ReaderOptions& options) {
85+
batch_size_ = options.batch_size;
86+
if (options.split) {
87+
split_end_ = options.split->offset + options.split->length;
88+
if (options.length) {
89+
split_end_ =
90+
std::min(split_end_.value(), static_cast<int64_t>(options.length.value()));
91+
}
92+
}
93+
read_schema_ = options.projection;
94+
95+
// Open the input stream and adapt to the avro interface.
96+
// TODO(gangwu): make this configurable
97+
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
98+
ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
99+
CreateInputStream(options, kDefaultBufferSize));
100+
101+
// Create a base reader without setting reader schema to enable projection.
102+
auto base_reader =
103+
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
104+
const ::avro::ValidSchema& file_schema = base_reader->dataSchema();
105+
106+
// Validate field ids in the file schema.
107+
HasIdVisitor has_id_visitor;
108+
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
109+
if (has_id_visitor.HasNoIds()) {
110+
// TODO(gangwu): support applying field-ids based on name mapping
111+
return NotImplemented("Avro file schema has no field IDs");
112+
}
113+
if (!has_id_visitor.AllHaveIds()) {
114+
return InvalidSchema("Not all fields in the Avro file schema have field IDs");
115+
}
116+
117+
// Project the read schema on top of the file schema.
118+
// TODO(gangwu): support pruning source fields
119+
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(),
120+
/*prune_source=*/false));
121+
base_reader->init(file_schema);
122+
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
123+
std::move(base_reader));
124+
return {};
125+
}
126+
127+
Status Close() {
128+
if (reader_ != nullptr) {
129+
reader_->close();
130+
reader_.reset();
131+
}
132+
return {};
133+
}
134+
135+
private:
136+
Status InitReadContext() {
137+
context_ = std::make_unique<ReadContext>();
138+
context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema());
139+
140+
ArrowSchema arrow_schema;
141+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
142+
auto import_result = ::arrow::ImportSchema(&arrow_schema);
143+
if (!import_result.ok()) {
144+
return InvalidSchema("Failed to import the arrow schema: {}",
145+
import_result.status().message());
146+
}
147+
context_->arrow_schema_ = import_result.MoveValueUnsafe();
148+
149+
auto arrow_struct_type =
150+
std::make_shared<::arrow::StructType>(context_->arrow_schema_->fields());
151+
auto builder_result = ::arrow::MakeBuilder(arrow_struct_type);
152+
if (!builder_result.ok()) {
153+
return InvalidSchema("Failed to make the arrow builder: {}",
154+
builder_result.status().message());
155+
}
156+
context_->builder_ = builder_result.MoveValueUnsafe();
157+
158+
return {};
159+
}
160+
161+
private:
162+
// Max number of rows in the record batch to read.
163+
int64_t batch_size_{};
164+
// The end of the split to read and used to terminate the reading.
165+
std::optional<int64_t> split_end_;
166+
// The schema to read.
167+
std::shared_ptr<Schema> read_schema_;
168+
// The projection result to apply to the read schema.
169+
SchemaProjection projection_;
170+
// The avro reader to read the data into a datum.
171+
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
172+
// The context to keep track of the reading progress.
173+
std::unique_ptr<ReadContext> context_;
174+
};
175+
176+
Result<Reader::Data> AvroBatchReader::Next() { return impl_->Next(); }
177+
178+
Status AvroBatchReader::Open(const ReaderOptions& options) {
179+
impl_ = std::make_unique<Impl>();
180+
return impl_->Open(options);
181+
}
182+
183+
Status AvroBatchReader::Close() { return impl_->Close(); }
184+
185+
} // namespace iceberg::avro

src/iceberg/avro/avro_reader.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/file_reader.h"
23+
#include "iceberg/iceberg_bundle_export.h"
24+
25+
namespace iceberg::avro {
26+
27+
/// \brief A reader that reads ArrowArray from Avro files.
28+
class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader {
29+
public:
30+
AvroBatchReader() = default;
31+
32+
~AvroBatchReader() override = default;
33+
34+
Status Open(const ReaderOptions& options) final;
35+
36+
Status Close() final;
37+
38+
Result<Data> Next() final;
39+
40+
DataLayout data_layout() const final { return DataLayout::kArrowArray; }
41+
42+
private:
43+
class Impl;
44+
std::unique_ptr<Impl> impl_;
45+
};
46+
47+
} // namespace iceberg::avro

src/iceberg/avro/demo_avro.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ Reader::DataLayout DemoAvroReader::data_layout() const {
5555
return Reader::DataLayout::kStructLike;
5656
}
5757

58-
ICEBERG_REGISTER_READER_FACTORY(
59-
Avro, [](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
60-
return std::make_unique<DemoAvroReader>();
61-
});
58+
Status DemoAvroReader::Open(const ReaderOptions& options) { return {}; }
59+
60+
Status DemoAvroReader::Close() { return {}; }
61+
62+
ICEBERG_REGISTER_READER_FACTORY(Avro, []() -> Result<std::unique_ptr<Reader>> {
63+
return std::make_unique<DemoAvroReader>();
64+
});
6265

6366
} // namespace iceberg::avro

src/iceberg/avro/demo_avro.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader {
3838
public:
3939
DemoAvroReader() = default;
4040
~DemoAvroReader() override = default;
41+
Status Open(const ReaderOptions& options) override;
42+
Status Close() override;
4143
Result<Data> Next() override;
4244
DataLayout data_layout() const override;
4345
};

src/iceberg/file_reader.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323

2424
#include "iceberg/expected.h"
2525
#include "iceberg/util/formatter.h"
26+
#include "iceberg/util/macros.h"
2627

2728
namespace iceberg {
2829

2930
namespace {
3031

3132
ReaderFactory GetNotImplementedFactory(FileFormatType format_type) {
32-
return [format_type](const ReaderOptions& options) -> Result<std::unique_ptr<Reader>> {
33+
return [format_type]() -> Result<std::unique_ptr<Reader>> {
3334
return NotImplemented("Missing reader factory for file format: {}", format_type);
3435
};
3536
}
@@ -51,9 +52,11 @@ ReaderFactoryRegistry::ReaderFactoryRegistry(FileFormatType format_type,
5152
GetFactory(format_type) = std::move(factory);
5253
}
5354

54-
Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Create(
55+
Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
5556
FileFormatType format_type, const ReaderOptions& options) {
56-
return GetFactory(format_type)(options);
57+
ICEBERG_ASSIGN_OR_RAISE(auto reader, GetFactory(format_type)());
58+
ICEBERG_RETURN_UNEXPECTED(reader->Open(options));
59+
return reader;
5760
}
5861

5962
StructLikeReader::StructLikeReader(std::unique_ptr<Reader> reader)

0 commit comments

Comments
 (0)