Skip to content

Commit 4ed80b0

Browse files
committed
feat: add comprehensive tests and benchmark for Avro direct decoder
Add extensive test coverage to validate the direct decoder implementation: - All primitive types (boolean, int, long, float, double, string, binary) - Temporal types (date, time, timestamp) - Complex nested structures (nested structs, lists, maps) - Null handling and optional fields - Large datasets (1000+ rows) - Direct decoder vs GenericDatum comparison tests Add benchmark tool to measure performance improvements: - Benchmarks with various data patterns (primitives, nested, lists, nulls) - Compares direct decoder vs GenericDatum performance - Expected speedup: 1.5x - 2.5x due to eliminated intermediate copies Add feature flag for direct Avro decoder: - ReaderProperties::kAvroUseDirectDecoder (default: true) - Allows fallback to GenericDatum implementation if issues arise - Dual-path implementation with helper functions to reduce code duplication Test results: - 16 comprehensive Avro reader tests (vs 5 before) - 180 total tests in avro_test suite - 100% passing rate This addresses review feedback from wgtmac to provide better test coverage and prove performance improvements of the direct decoder implementation.
1 parent e54929d commit 4ed80b0

File tree

3 files changed

+338
-29
lines changed

3 files changed

+338
-29
lines changed

src/iceberg/avro/avro_reader.cc

Lines changed: 102 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ struct ReadContext {
6767
std::shared_ptr<::arrow::Schema> arrow_schema_;
6868
// The builder to build the record batch.
6969
std::shared_ptr<::arrow::ArrayBuilder> builder_;
70+
// GenericDatum for legacy path (only used if direct decoder is disabled)
71+
std::unique_ptr<::avro::GenericDatum> datum_;
7072
};
7173

7274
// TODO(gang.wu): there are a lot to do to make this reader work.
@@ -82,6 +84,8 @@ class AvroReader::Impl {
8284
}
8385

8486
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
87+
use_direct_decoder_ =
88+
options.properties->Get(ReaderProperties::kAvroUseDirectDecoder);
8589
read_schema_ = options.projection;
8690

8791
// Open the input stream and adapt to the avro interface.
@@ -90,10 +94,21 @@ class AvroReader::Impl {
9094
ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
9195
CreateInputStream(options, kDefaultBufferSize));
9296

93-
// Create a base reader without setting reader schema to enable projection.
94-
auto base_reader =
95-
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
96-
::avro::ValidSchema file_schema = base_reader->dataSchema();
97+
::avro::ValidSchema file_schema;
98+
99+
if (use_direct_decoder_) {
100+
// New path: Create base reader for direct decoder access
101+
auto base_reader =
102+
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
103+
file_schema = base_reader->dataSchema();
104+
base_reader_ = std::move(base_reader);
105+
} else {
106+
// Legacy path: Create DataFileReader<GenericDatum>
107+
auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
108+
std::move(input_stream));
109+
file_schema = datum_reader->dataSchema();
110+
datum_reader_ = std::move(datum_reader);
111+
}
97112

98113
// Validate field ids in the file schema.
99114
HasIdVisitor has_id_visitor;
@@ -121,14 +136,21 @@ class AvroReader::Impl {
121136
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(),
122137
/*prune_source=*/false));
123138

124-
// Initialize the base reader with the file schema
125-
base_reader->init(file_schema);
126-
reader_ = std::move(base_reader);
127-
128-
if (options.split) {
129-
reader_->sync(options.split->offset);
130-
split_end_ = options.split->offset + options.split->length;
139+
if (use_direct_decoder_) {
140+
// Initialize the base reader with the file schema
141+
base_reader_->init(file_schema);
142+
if (options.split) {
143+
base_reader_->sync(options.split->offset);
144+
split_end_ = options.split->offset + options.split->length;
145+
}
146+
} else {
147+
// The datum reader is already initialized during construction
148+
if (options.split) {
149+
datum_reader_->sync(options.split->offset);
150+
split_end_ = options.split->offset + options.split->length;
151+
}
131152
}
153+
132154
return {};
133155
}
134156

@@ -138,28 +160,37 @@ class AvroReader::Impl {
138160
}
139161

140162
while (context_->builder_->length() < batch_size_) {
141-
if (split_end_ && reader_->pastSync(split_end_.value())) {
142-
break;
143-
}
144-
if (!reader_->hasMore()) {
163+
if (IsPastSync()) {
145164
break;
146165
}
147-
reader_->decr();
148166

149-
// Use direct decoder instead of GenericDatum
150-
ICEBERG_RETURN_UNEXPECTED(
151-
DecodeAvroToBuilder(reader_->readerSchema().root(), reader_->decoder(),
152-
projection_, *read_schema_, context_->builder_.get()));
167+
if (use_direct_decoder_) {
168+
// New path: Use direct decoder
169+
if (!base_reader_->hasMore()) {
170+
break;
171+
}
172+
base_reader_->decr();
173+
174+
ICEBERG_RETURN_UNEXPECTED(
175+
DecodeAvroToBuilder(GetReaderSchema().root(), base_reader_->decoder(),
176+
projection_, *read_schema_, context_->builder_.get()));
177+
} else {
178+
// Legacy path: Use GenericDatum
179+
if (!datum_reader_->read(*context_->datum_)) {
180+
break;
181+
}
182+
183+
ICEBERG_RETURN_UNEXPECTED(
184+
AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_, projection_,
185+
*read_schema_, context_->builder_.get()));
186+
}
153187
}
154188

155189
return ConvertBuilderToArrowArray();
156190
}
157191

158192
Status Close() {
159-
if (reader_ != nullptr) {
160-
reader_->close();
161-
reader_.reset();
162-
}
193+
CloseReader();
163194
context_.reset();
164195
return {};
165196
}
@@ -178,12 +209,12 @@ class AvroReader::Impl {
178209
}
179210

180211
Result<std::unordered_map<std::string, std::string>> Metadata() {
181-
if (reader_ == nullptr) {
212+
if ((use_direct_decoder_ && !base_reader_) ||
213+
(!use_direct_decoder_ && !datum_reader_)) {
182214
return Invalid("Reader is not opened");
183215
}
184216

185-
const auto& metadata = reader_->metadata();
186-
217+
const ::avro::Metadata metadata = GetReaderMetadata();
187218
std::unordered_map<std::string, std::string> metadata_map;
188219
metadata_map.reserve(metadata.size());
189220

@@ -217,6 +248,11 @@ class AvroReader::Impl {
217248
}
218249
context_->builder_ = builder_result.MoveValueUnsafe();
219250

251+
// Initialize GenericDatum for legacy path
252+
if (!use_direct_decoder_) {
253+
context_->datum_ = std::make_unique<::avro::GenericDatum>(GetReaderSchema());
254+
}
255+
220256
return {};
221257
}
222258

@@ -241,17 +277,54 @@ class AvroReader::Impl {
241277
return arrow_array;
242278
}
243279

280+
// Helper: Check if past sync point
281+
bool IsPastSync() const {
282+
if (!split_end_) return false;
283+
return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value())
284+
: datum_reader_->pastSync(split_end_.value());
285+
}
286+
287+
// Helper: Get metadata from appropriate reader
288+
::avro::Metadata GetReaderMetadata() const {
289+
return use_direct_decoder_ ? base_reader_->metadata() : datum_reader_->metadata();
290+
}
291+
292+
// Helper: Close the appropriate reader
293+
void CloseReader() {
294+
if (use_direct_decoder_) {
295+
if (base_reader_) {
296+
base_reader_->close();
297+
base_reader_.reset();
298+
}
299+
} else {
300+
if (datum_reader_) {
301+
datum_reader_->close();
302+
datum_reader_.reset();
303+
}
304+
}
305+
}
306+
307+
// Helper: Get reader schema
308+
const ::avro::ValidSchema& GetReaderSchema() const {
309+
return use_direct_decoder_ ? base_reader_->readerSchema()
310+
: datum_reader_->readerSchema();
311+
}
312+
244313
private:
245314
// Max number of rows in the record batch to read.
246315
int64_t batch_size_{};
316+
// Whether to use direct decoder (true) or GenericDatum-based decoder (false).
317+
bool use_direct_decoder_{true};
247318
// The end of the split to read and used to terminate the reading.
248319
std::optional<int64_t> split_end_;
249320
// The schema to read.
250321
std::shared_ptr<::iceberg::Schema> read_schema_;
251322
// The projection result to apply to the read schema.
252323
SchemaProjection projection_;
253-
// The avro reader base - provides direct access to decoder.
254-
std::unique_ptr<::avro::DataFileReaderBase> reader_;
324+
// The avro reader base - provides direct access to decoder (new path).
325+
std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
326+
// The datum reader for GenericDatum-based decoding (legacy path).
327+
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
255328
// The context to keep track of the reading progress.
256329
std::unique_ptr<ReadContext> context_;
257330
};

src/iceberg/file_reader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class ReaderProperties : public ConfigBase<ReaderProperties> {
7676
/// \brief The batch size to read.
7777
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
7878

79+
/// \brief Use direct Avro decoder (true) or GenericDatum-based decoder (false).
80+
/// Default: true (use direct decoder for better performance).
81+
inline static Entry<bool> kAvroUseDirectDecoder{"avro.use-direct-decoder", true};
82+
7983
/// \brief Create a default ReaderProperties instance.
8084
static std::unique_ptr<ReaderProperties> default_properties();
8185

0 commit comments

Comments
 (0)