@@ -67,6 +67,8 @@ struct ReadContext {
6767 std::shared_ptr<::arrow::Schema> arrow_schema_;
6868 // The builder to build the record batch.
6969 std::shared_ptr<::arrow::ArrayBuilder> builder_;
70+ // GenericDatum for legacy path (only used if direct decoder is disabled)
71+ std::unique_ptr<::avro::GenericDatum> datum_;
7072};
7173
7274// TODO(gang.wu): there are a lot to do to make this reader work.
@@ -82,6 +84,8 @@ class AvroReader::Impl {
8284 }
8385
8486 batch_size_ = options.properties ->Get (ReaderProperties::kBatchSize );
87+ use_direct_decoder_ =
88+ options.properties ->Get (ReaderProperties::kAvroUseDirectDecoder );
8589 read_schema_ = options.projection ;
8690
8791 // Open the input stream and adapt to the avro interface.
@@ -90,10 +94,21 @@ class AvroReader::Impl {
9094 ICEBERG_ASSIGN_OR_RAISE (auto input_stream,
9195 CreateInputStream (options, kDefaultBufferSize ));
9296
93- // Create a base reader without setting reader schema to enable projection.
94- auto base_reader =
95- std::make_unique<::avro::DataFileReaderBase>(std::move (input_stream));
96- ::avro::ValidSchema file_schema = base_reader->dataSchema ();
97+ ::avro::ValidSchema file_schema;
98+
99+ if (use_direct_decoder_) {
100+ // New path: Create base reader for direct decoder access
101+ auto base_reader =
102+ std::make_unique<::avro::DataFileReaderBase>(std::move (input_stream));
103+ file_schema = base_reader->dataSchema ();
104+ base_reader_ = std::move (base_reader);
105+ } else {
106+ // Legacy path: Create DataFileReader<GenericDatum>
107+ auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
108+ std::move (input_stream));
109+ file_schema = datum_reader->dataSchema ();
110+ datum_reader_ = std::move (datum_reader);
111+ }
97112
98113 // Validate field ids in the file schema.
99114 HasIdVisitor has_id_visitor;
@@ -121,14 +136,21 @@ class AvroReader::Impl {
121136 ICEBERG_ASSIGN_OR_RAISE (projection_, Project (*read_schema_, file_schema.root (),
122137 /* prune_source=*/ false ));
123138
124- // Initialize the base reader with the file schema
125- base_reader->init (file_schema);
126- reader_ = std::move (base_reader);
127-
128- if (options.split ) {
129- reader_->sync (options.split ->offset );
130- split_end_ = options.split ->offset + options.split ->length ;
139+ if (use_direct_decoder_) {
140+ // Initialize the base reader with the file schema
141+ base_reader_->init (file_schema);
142+ if (options.split ) {
143+ base_reader_->sync (options.split ->offset );
144+ split_end_ = options.split ->offset + options.split ->length ;
145+ }
146+ } else {
147+ // The datum reader is already initialized during construction
148+ if (options.split ) {
149+ datum_reader_->sync (options.split ->offset );
150+ split_end_ = options.split ->offset + options.split ->length ;
151+ }
131152 }
153+
132154 return {};
133155 }
134156
@@ -138,28 +160,37 @@ class AvroReader::Impl {
138160 }
139161
140162 while (context_->builder_ ->length () < batch_size_) {
141- if (split_end_ && reader_->pastSync (split_end_.value ())) {
142- break ;
143- }
144- if (!reader_->hasMore ()) {
163+ if (IsPastSync ()) {
145164 break ;
146165 }
147- reader_->decr ();
148166
149- // Use direct decoder instead of GenericDatum
150- ICEBERG_RETURN_UNEXPECTED (
151- DecodeAvroToBuilder (reader_->readerSchema ().root (), reader_->decoder (),
152- projection_, *read_schema_, context_->builder_ .get ()));
167+ if (use_direct_decoder_) {
168+ // New path: Use direct decoder
169+ if (!base_reader_->hasMore ()) {
170+ break ;
171+ }
172+ base_reader_->decr ();
173+
174+ ICEBERG_RETURN_UNEXPECTED (
175+ DecodeAvroToBuilder (GetReaderSchema ().root (), base_reader_->decoder (),
176+ projection_, *read_schema_, context_->builder_ .get ()));
177+ } else {
178+ // Legacy path: Use GenericDatum
179+ if (!datum_reader_->read (*context_->datum_ )) {
180+ break ;
181+ }
182+
183+ ICEBERG_RETURN_UNEXPECTED (
184+ AppendDatumToBuilder (GetReaderSchema ().root (), *context_->datum_ , projection_,
185+ *read_schema_, context_->builder_ .get ()));
186+ }
153187 }
154188
155189 return ConvertBuilderToArrowArray ();
156190 }
157191
158192 Status Close () {
159- if (reader_ != nullptr ) {
160- reader_->close ();
161- reader_.reset ();
162- }
193+ CloseReader ();
163194 context_.reset ();
164195 return {};
165196 }
@@ -178,12 +209,12 @@ class AvroReader::Impl {
178209 }
179210
180211 Result<std::unordered_map<std::string, std::string>> Metadata () {
181- if (reader_ == nullptr ) {
212+ if ((use_direct_decoder_ && !base_reader_) ||
213+ (!use_direct_decoder_ && !datum_reader_)) {
182214 return Invalid (" Reader is not opened" );
183215 }
184216
185- const auto & metadata = reader_->metadata ();
186-
217+ const ::avro::Metadata metadata = GetReaderMetadata ();
187218 std::unordered_map<std::string, std::string> metadata_map;
188219 metadata_map.reserve (metadata.size ());
189220
@@ -217,6 +248,11 @@ class AvroReader::Impl {
217248 }
218249 context_->builder_ = builder_result.MoveValueUnsafe ();
219250
251+ // Initialize GenericDatum for legacy path
252+ if (!use_direct_decoder_) {
253+ context_->datum_ = std::make_unique<::avro::GenericDatum>(GetReaderSchema ());
254+ }
255+
220256 return {};
221257 }
222258
@@ -241,17 +277,54 @@ class AvroReader::Impl {
241277 return arrow_array;
242278 }
243279
280+ // Helper: Check if past sync point
281+ bool IsPastSync () const {
282+ if (!split_end_) return false ;
283+ return use_direct_decoder_ ? base_reader_->pastSync (split_end_.value ())
284+ : datum_reader_->pastSync (split_end_.value ());
285+ }
286+
287+ // Helper: Get metadata from appropriate reader
288+ ::avro::Metadata GetReaderMetadata () const {
289+ return use_direct_decoder_ ? base_reader_->metadata () : datum_reader_->metadata ();
290+ }
291+
292+ // Helper: Close the appropriate reader
293+ void CloseReader () {
294+ if (use_direct_decoder_) {
295+ if (base_reader_) {
296+ base_reader_->close ();
297+ base_reader_.reset ();
298+ }
299+ } else {
300+ if (datum_reader_) {
301+ datum_reader_->close ();
302+ datum_reader_.reset ();
303+ }
304+ }
305+ }
306+
307+ // Helper: Get reader schema
308+ const ::avro::ValidSchema& GetReaderSchema () const {
309+ return use_direct_decoder_ ? base_reader_->readerSchema ()
310+ : datum_reader_->readerSchema ();
311+ }
312+
244313 private:
245314 // Max number of rows in the record batch to read.
246315 int64_t batch_size_{};
316+ // Whether to use direct decoder (true) or GenericDatum-based decoder (false).
317+ bool use_direct_decoder_{true };
247318 // The end of the split to read and used to terminate the reading.
248319 std::optional<int64_t > split_end_;
249320 // The schema to read.
250321 std::shared_ptr<::iceberg::Schema> read_schema_;
251322 // The projection result to apply to the read schema.
252323 SchemaProjection projection_;
253- // The avro reader base - provides direct access to decoder.
254- std::unique_ptr<::avro::DataFileReaderBase> reader_;
324+ // The avro reader base - provides direct access to decoder (new path).
325+ std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
326+ // The datum reader for GenericDatum-based decoding (legacy path).
327+ std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
255328 // The context to keep track of the reading progress.
256329 std::unique_ptr<ReadContext> context_;
257330};
0 commit comments