Skip to content

Commit cd7e63e

Browse files
authored
feat: make avro and parquet reader writer more configurable (#315)
- Added WriterProperties and ReaderProperties with predefined keys - Supported writing key-value metadata to file - Allowed manifest writer to customize avro schema name Fixes #306
1 parent 7f7f85b commit cd7e63e

15 files changed

+213
-64
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class AvroReader::Impl {
8282
return InvalidArgument("Projected schema is required by Avro reader");
8383
}
8484

85-
batch_size_ = options.batch_size;
85+
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
8686
read_schema_ = options.projection;
8787

8888
// Open the input stream and adapt to the avro interface.

src/iceberg/avro/avro_writer.cc

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,30 @@ class AvroWriter::Impl {
6666

6767
::avro::NodePtr root;
6868
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));
69+
if (const auto& schema_name =
70+
options.properties->Get(WriterProperties::kAvroSchemaName);
71+
!schema_name.empty()) {
72+
root->setName(::avro::Name(schema_name));
73+
}
6974

7075
avro_schema_ = std::make_shared<::avro::ValidSchema>(root);
7176

7277
// Open the output stream and adapt to the avro interface.
73-
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
74-
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
75-
CreateOutputStream(options, kDefaultBufferSize));
78+
ICEBERG_ASSIGN_OR_RAISE(
79+
auto output_stream,
80+
CreateOutputStream(options,
81+
options.properties->Get(WriterProperties::kAvroBufferSize)));
7682
arrow_output_stream_ = output_stream->arrow_output_stream();
7783
std::map<std::string, std::vector<uint8_t>> metadata;
78-
for (const auto& [key, value] : options.properties) {
84+
for (const auto& [key, value] : options.metadata) {
7985
std::vector<uint8_t> vec;
8086
vec.reserve(value.size());
8187
vec.assign(value.begin(), value.end());
8288
metadata.emplace(key, std::move(vec));
8389
}
8490
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
85-
std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
91+
std::move(output_stream), *avro_schema_,
92+
options.properties->Get(WriterProperties::kAvroSyncInterval),
8693
::avro::NULL_CODEC /*codec*/, metadata);
8794
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
8895
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));

src/iceberg/file_reader.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,15 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
5959
return reader;
6060
}
6161

62+
std::unique_ptr<ReaderProperties> ReaderProperties::default_properties() {
63+
return std::make_unique<ReaderProperties>();
64+
}
65+
66+
std::unique_ptr<ReaderProperties> ReaderProperties::FromMap(
67+
const std::unordered_map<std::string, std::string>& properties) {
68+
auto reader_properties = std::make_unique<ReaderProperties>();
69+
reader_properties->configs_ = properties;
70+
return reader_properties;
71+
}
72+
6273
} // namespace iceberg

src/iceberg/file_reader.h

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "iceberg/file_format.h"
3131
#include "iceberg/result.h"
3232
#include "iceberg/type_fwd.h"
33+
#include "iceberg/util/config.h"
3334

3435
namespace iceberg {
3536

@@ -42,7 +43,7 @@ class ICEBERG_EXPORT Reader {
4243
Reader& operator=(const Reader&) = delete;
4344

4445
/// \brief Open the reader.
45-
virtual Status Open(const struct ReaderOptions& options) = 0;
46+
virtual Status Open(const ReaderOptions& options) = 0;
4647

4748
/// \brief Close the reader.
4849
virtual Status Close() = 0;
@@ -67,19 +68,30 @@ struct ICEBERG_EXPORT Split {
6768
size_t length;
6869
};
6970

71+
class ReaderProperties : public ConfigBase<ReaderProperties> {
72+
public:
73+
template <typename T>
74+
using Entry = const ConfigBase<ReaderProperties>::Entry<T>;
75+
76+
/// \brief The batch size to read.
77+
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
78+
79+
/// \brief Create a default ReaderProperties instance.
80+
static std::unique_ptr<ReaderProperties> default_properties();
81+
82+
/// \brief Create a ReaderProperties instance from a map of key-value pairs.
83+
static std::unique_ptr<ReaderProperties> FromMap(
84+
const std::unordered_map<std::string, std::string>& properties);
85+
};
86+
7087
/// \brief Options for creating a reader.
7188
struct ICEBERG_EXPORT ReaderOptions {
72-
static constexpr int64_t kDefaultBatchSize = 4096;
73-
7489
/// \brief The path to the file to read.
7590
std::string path;
7691
/// \brief The total length of the file.
7792
std::optional<size_t> length;
7893
/// \brief The split to read.
7994
std::optional<Split> split;
80-
/// \brief The batch size to read. Only applies to implementations that support
81-
/// batching.
82-
int64_t batch_size = kDefaultBatchSize;
8395
/// \brief FileIO instance to open the file. Reader implementations should down cast it
8496
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
8597
/// `ArrowFileSystemFileIO` as the default implementation.
@@ -93,7 +105,7 @@ struct ICEBERG_EXPORT ReaderOptions {
93105
/// that may have different field names than the current schema.
94106
std::shared_ptr<class NameMapping> name_mapping;
95107
/// \brief Format-specific or implementation-specific properties.
96-
std::unordered_map<std::string, std::string> properties;
108+
std::shared_ptr<ReaderProperties> properties = ReaderProperties::default_properties();
97109
};
98110

99111
/// \brief Factory function to create a reader of a specific file format.

src/iceberg/file_writer.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,15 @@ Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
5959
return writer;
6060
}
6161

62+
std::unique_ptr<WriterProperties> WriterProperties::default_properties() {
63+
return std::make_unique<WriterProperties>();
64+
}
65+
66+
std::unique_ptr<WriterProperties> WriterProperties::FromMap(
67+
const std::unordered_map<std::string, std::string>& properties) {
68+
auto writer_properties = std::make_unique<WriterProperties>();
69+
writer_properties->configs_ = properties;
70+
return writer_properties;
71+
}
72+
6273
} // namespace iceberg

src/iceberg/file_writer.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,34 @@
3131
#include "iceberg/metrics.h"
3232
#include "iceberg/result.h"
3333
#include "iceberg/type_fwd.h"
34+
#include "iceberg/util/config.h"
3435

3536
namespace iceberg {
3637

38+
class WriterProperties : public ConfigBase<WriterProperties> {
39+
public:
40+
template <typename T>
41+
using Entry = const ConfigBase<WriterProperties>::Entry<T>;
42+
43+
/// \brief The name of the Avro root node schema to write.
44+
inline static Entry<std::string> kAvroSchemaName{"write.avro.schema-name", ""};
45+
46+
/// \brief The buffer size used by Avro output stream.
47+
inline static Entry<int64_t> kAvroBufferSize{"write.avro.buffer-size", 1024 * 1024};
48+
49+
/// \brief The sync interval used by Avro writer.
50+
inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval", 16 * 1024};
51+
52+
/// TODO(gangwu): add more properties, like compression codec, compression level, etc.
53+
54+
/// \brief Create a default WriterProperties instance.
55+
static std::unique_ptr<WriterProperties> default_properties();
56+
57+
/// \brief Create a WriterProperties instance from a map of key-value pairs.
58+
static std::unique_ptr<WriterProperties> FromMap(
59+
const std::unordered_map<std::string, std::string>& properties);
60+
};
61+
3762
/// \brief Options for creating a writer.
3863
struct ICEBERG_EXPORT WriterOptions {
3964
/// \brief The path to the file to write.
@@ -44,8 +69,10 @@ struct ICEBERG_EXPORT WriterOptions {
4469
/// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses
4570
/// `ArrowFileSystemFileIO` as the default implementation.
4671
std::shared_ptr<class FileIO> io;
72+
/// \brief Metadata to write to the file.
73+
std::unordered_map<std::string, std::string> metadata;
4774
/// \brief Format-specific or implementation-specific properties.
48-
std::unordered_map<std::string, std::string> properties;
75+
std::shared_ptr<WriterProperties> properties = WriterProperties::default_properties();
4976
};
5077

5178
/// \brief Base writer class to write data from different file formats.
@@ -57,7 +84,7 @@ class ICEBERG_EXPORT Writer {
5784
Writer& operator=(const Writer&) = delete;
5885

5986
/// \brief Open the writer.
60-
virtual Status Open(const struct WriterOptions& options) = 0;
87+
virtual Status Open(const WriterOptions& options) = 0;
6188

6289
/// \brief Close the writer.
6390
virtual Status Close() = 0;

src/iceberg/manifest_reader.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@ namespace iceberg {
3636
class ICEBERG_EXPORT ManifestReader {
3737
public:
3838
virtual ~ManifestReader() = default;
39+
40+
/// \brief Read all manifest entries in the manifest file.
3941
virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
4042

43+
/// \brief Get the metadata of the manifest file.
44+
virtual Result<std::unordered_map<std::string, std::string>> Metadata() const = 0;
45+
4146
/// \brief Creates a reader for a manifest file.
4247
/// \param manifest A ManifestFile object containing metadata about the manifest.
4348
/// \param file_io File IO implementation to use.
@@ -61,8 +66,13 @@ class ICEBERG_EXPORT ManifestReader {
6166
class ICEBERG_EXPORT ManifestListReader {
6267
public:
6368
virtual ~ManifestListReader() = default;
69+
70+
/// \brief Read all manifest files in the manifest list file.
6471
virtual Result<std::vector<ManifestFile>> Files() const = 0;
6572

73+
/// \brief Get the metadata of the manifest list file.
74+
virtual Result<std::unordered_map<std::string, std::string>> Metadata() const = 0;
75+
6676
/// \brief Creates a reader for the manifest list.
6777
/// \param manifest_list_location Path to the manifest list file.
6878
/// \param file_io File IO implementation to use.

src/iceberg/manifest_reader_internal.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,11 @@ Result<std::vector<ManifestEntry>> ManifestReaderImpl::Entries() const {
548548
return manifest_entries;
549549
}
550550

551+
Result<std::unordered_map<std::string, std::string>> ManifestReaderImpl::Metadata()
552+
const {
553+
return reader_->Metadata();
554+
}
555+
551556
Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
552557
std::vector<ManifestFile> manifest_files;
553558
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
@@ -569,6 +574,11 @@ Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
569574
return manifest_files;
570575
}
571576

577+
Result<std::unordered_map<std::string, std::string>> ManifestListReaderImpl::Metadata()
578+
const {
579+
return reader_->Metadata();
580+
}
581+
572582
Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
573583
if (index >= 0 && index < static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
574584
return static_cast<ManifestFileField>(index);

src/iceberg/manifest_reader_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class ManifestReaderImpl : public ManifestReader {
4040

4141
Result<std::vector<ManifestEntry>> Entries() const override;
4242

43+
Result<std::unordered_map<std::string, std::string>> Metadata() const override;
44+
4345
private:
4446
std::shared_ptr<Schema> schema_;
4547
std::unique_ptr<Reader> reader_;
@@ -55,6 +57,8 @@ class ManifestListReaderImpl : public ManifestListReader {
5557

5658
Result<std::vector<ManifestFile>> Files() const override;
5759

60+
Result<std::unordered_map<std::string, std::string>> Metadata() const override;
61+
5862
private:
5963
std::shared_ptr<Schema> schema_;
6064
std::unique_ptr<Reader> reader_;

src/iceberg/manifest_writer.cc

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,20 @@ ManifestContent ManifestWriter::content() const { return adapter_->content(); }
5858
Result<std::unique_ptr<Writer>> OpenFileWriter(
5959
std::string_view location, std::shared_ptr<Schema> schema,
6060
std::shared_ptr<FileIO> file_io,
61-
std::unordered_map<std::string, std::string> properties) {
62-
ICEBERG_ASSIGN_OR_RAISE(
63-
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
64-
{.path = std::string(location),
65-
.schema = std::move(schema),
66-
.io = std::move(file_io),
67-
.properties = std::move(properties)}));
61+
std::unordered_map<std::string, std::string> metadata, std::string_view schema_name) {
62+
auto writer_properties = WriterProperties::default_properties();
63+
if (!schema_name.empty()) {
64+
writer_properties->Set(WriterProperties::kAvroSchemaName, std::string(schema_name));
65+
}
66+
ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
67+
FileFormatType::kAvro,
68+
{
69+
.path = std::string(location),
70+
.schema = std::move(schema),
71+
.io = std::move(file_io),
72+
.metadata = std::move(metadata),
73+
.properties = std::move(writer_properties),
74+
}));
6875
return writer;
6976
}
7077

@@ -91,9 +98,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
9198
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
9299

93100
auto schema = adapter->schema();
94-
ICEBERG_ASSIGN_OR_RAISE(auto writer,
95-
OpenFileWriter(manifest_location, std::move(schema),
96-
std::move(file_io), adapter->metadata()));
101+
ICEBERG_ASSIGN_OR_RAISE(
102+
auto writer,
103+
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
104+
adapter->metadata(), "manifest_entry"));
97105
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
98106
}
99107

@@ -119,9 +127,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
119127
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
120128

121129
auto schema = adapter->schema();
122-
ICEBERG_ASSIGN_OR_RAISE(auto writer,
123-
OpenFileWriter(manifest_location, std::move(schema),
124-
std::move(file_io), adapter->metadata()));
130+
ICEBERG_ASSIGN_OR_RAISE(
131+
auto writer,
132+
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
133+
adapter->metadata(), "manifest_entry"));
125134
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
126135
}
127136

@@ -149,9 +158,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
149158
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
150159

151160
auto schema = adapter->schema();
152-
ICEBERG_ASSIGN_OR_RAISE(auto writer,
153-
OpenFileWriter(manifest_location, std::move(schema),
154-
std::move(file_io), adapter->metadata()));
161+
ICEBERG_ASSIGN_OR_RAISE(
162+
auto writer,
163+
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
164+
adapter->metadata(), "manifest_entry"));
155165
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
156166
}
157167

@@ -191,9 +201,10 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
191201
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
192202

193203
auto schema = adapter->schema();
194-
ICEBERG_ASSIGN_OR_RAISE(auto writer,
195-
OpenFileWriter(manifest_list_location, std::move(schema),
196-
std::move(file_io), adapter->metadata()));
204+
ICEBERG_ASSIGN_OR_RAISE(
205+
auto writer,
206+
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
207+
adapter->metadata(), "manifest_file"));
197208
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
198209
}
199210

@@ -207,9 +218,10 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
207218
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
208219

209220
auto schema = adapter->schema();
210-
ICEBERG_ASSIGN_OR_RAISE(auto writer,
211-
OpenFileWriter(manifest_list_location, std::move(schema),
212-
std::move(file_io), adapter->metadata()));
221+
ICEBERG_ASSIGN_OR_RAISE(
222+
auto writer,
223+
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
224+
adapter->metadata(), "manifest_file"));
213225

214226
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
215227
}
@@ -224,9 +236,10 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
224236
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
225237

226238
auto schema = adapter->schema();
227-
ICEBERG_ASSIGN_OR_RAISE(auto writer,
228-
OpenFileWriter(manifest_list_location, std::move(schema),
229-
std::move(file_io), adapter->metadata()));
239+
ICEBERG_ASSIGN_OR_RAISE(
240+
auto writer,
241+
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io),
242+
adapter->metadata(), "manifest_file"));
230243
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
231244
}
232245

0 commit comments

Comments
 (0)