Skip to content

Commit b696713

Browse files
nullccxsynullccxsy
andauthored
feat: implement AvroWriter Write method (#204)
- Complete AvroWriter::Write implementation using ExtractDatumFromArray --------- Co-authored-by: nullccxsy <[email protected]>
1 parent 1002270 commit b696713

File tree

4 files changed

+97
-9
lines changed

4 files changed

+97
-9
lines changed

src/iceberg/avro/avro_stream_internal.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,9 @@ void AvroOutputStream::flush() {
145145
}
146146
}
147147

148+
const std::shared_ptr<::arrow::io::OutputStream>& AvroOutputStream::arrow_output_stream()
149+
const {
150+
return output_stream_;
151+
}
152+
148153
} // namespace iceberg::avro

src/iceberg/avro/avro_stream_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class AvroOutputStream : public ::avro::OutputStream {
8787
/// store, if any.
8888
void flush() override;
8989

90+
const std::shared_ptr<::arrow::io::OutputStream>& arrow_output_stream() const;
91+
9092
private:
9193
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
9294
const int64_t buffer_size_;

src/iceberg/avro/avro_writer.cc

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626
#include <arrow/record_batch.h>
2727
#include <arrow/result.h>
2828
#include <avro/DataFile.hh>
29+
#include <avro/Generic.hh>
2930
#include <avro/GenericDatum.hh>
3031

3132
#include "iceberg/arrow/arrow_error_transform_internal.h"
3233
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
34+
#include "iceberg/avro/avro_data_util_internal.h"
3335
#include "iceberg/avro/avro_register.h"
3436
#include "iceberg/avro/avro_schema_util_internal.h"
3537
#include "iceberg/avro/avro_stream_internal.h"
3638
#include "iceberg/schema.h"
39+
#include "iceberg/schema_internal.h"
3740
#include "iceberg/util/checked_cast.h"
3841
#include "iceberg/util/macros.h"
3942

@@ -50,9 +53,6 @@ Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions
5053

5154
} // namespace
5255

53-
// A stateful context to keep track of the writing progress.
54-
struct WriteContext {};
55-
5656
class AvroWriter::Impl {
5757
public:
5858
Status Open(const WriterOptions& options) {
@@ -67,22 +67,32 @@ class AvroWriter::Impl {
6767
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
6868
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
6969
CreateOutputStream(options, kDefaultBufferSize));
70-
70+
arrow_output_stream_ = output_stream->arrow_output_stream();
7171
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
7272
std::move(output_stream), *avro_schema_);
73+
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
74+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
7375
return {};
7476
}
7577

76-
Status Write(ArrowArray /*data*/) {
77-
// TODO(xiao.dong) convert data and write to avro
78-
// total_bytes_+= written_bytes;
78+
Status Write(ArrowArray data) {
79+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
80+
::arrow::ImportArray(&data, &arrow_schema_));
81+
82+
for (int64_t i = 0; i < result->length(); i++) {
83+
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get()));
84+
writer_->write(*datum_);
85+
}
86+
7987
return {};
8088
}
8189

8290
Status Close() {
8391
if (writer_ != nullptr) {
8492
writer_->close();
8593
writer_.reset();
94+
ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell());
95+
ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
8696
}
8797
return {};
8898
}
@@ -92,13 +102,19 @@ class AvroWriter::Impl {
92102
int64_t length() { return total_bytes_; }
93103

94104
private:
95-
int64_t total_bytes_ = 0;
96105
// The schema to write.
97106
std::shared_ptr<::iceberg::Schema> write_schema_;
98107
// The avro schema to write.
99108
std::shared_ptr<::avro::ValidSchema> avro_schema_;
109+
// Arrow output stream of the Avro file to write
110+
std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_;
100111
// The avro writer to write the data into a datum.
101112
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
113+
// Reusable Avro datum for writing individual records.
114+
std::unique_ptr<::avro::GenericDatum> datum_;
115+
// Arrow schema to write data.
116+
ArrowSchema arrow_schema_;
117+
int64_t total_bytes_ = 0;
102118
};
103119

104120
AvroWriter::~AvroWriter() = default;

test/avro_test.cc

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
#include <arrow/array/array_base.h>
2121
#include <arrow/c/bridge.h>
22-
#include <arrow/c/helpers.h>
2322
#include <arrow/filesystem/localfs.h>
2423
#include <arrow/io/file.h>
2524
#include <arrow/json/from_string.h>
@@ -31,8 +30,10 @@
3130

3231
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3332
#include "iceberg/avro/avro_register.h"
33+
#include "iceberg/avro/avro_writer.h"
3434
#include "iceberg/file_reader.h"
3535
#include "iceberg/schema.h"
36+
#include "iceberg/schema_internal.h"
3637
#include "iceberg/type.h"
3738
#include "matchers.h"
3839
#include "temp_file_test_base.h"
@@ -105,6 +106,46 @@ class AvroReaderTest : public TempFileTestBase {
105106
ASSERT_FALSE(data.value().has_value());
106107
}
107108

109+
void WriteAndVerify(std::shared_ptr<Schema> schema,
110+
const std::string& expected_string) {
111+
ArrowSchema arrow_c_schema;
112+
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
113+
114+
auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
115+
ASSERT_TRUE(arrow_schema_result.ok());
116+
auto arrow_schema = arrow_schema_result.ValueOrDie();
117+
118+
auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, expected_string);
119+
ASSERT_TRUE(array_result.ok());
120+
auto array = array_result.ValueOrDie();
121+
122+
struct ArrowArray arrow_array;
123+
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
124+
ASSERT_TRUE(export_result.ok());
125+
126+
auto writer_result = WriterFactoryRegistry::Open(
127+
FileFormatType::kAvro,
128+
{.path = temp_avro_file_, .schema = schema, .io = file_io_});
129+
ASSERT_TRUE(writer_result.has_value());
130+
auto writer = std::move(writer_result.value());
131+
ASSERT_THAT(writer->Write(arrow_array), IsOk());
132+
ASSERT_THAT(writer->Close(), IsOk());
133+
134+
auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
135+
ASSERT_TRUE(file_info_result.ok());
136+
ASSERT_EQ(file_info_result->size(), writer->length().value());
137+
138+
auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro,
139+
{.path = temp_avro_file_,
140+
.length = file_info_result->size(),
141+
.io = file_io_,
142+
.projection = schema});
143+
ASSERT_THAT(reader_result, IsOk());
144+
auto reader = std::move(reader_result.value());
145+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
146+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
147+
}
148+
108149
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
109150
std::shared_ptr<FileIO> file_io_;
110151
std::string temp_avro_file_;
@@ -161,4 +202,28 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
161202
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
162203
}
163204

205+
TEST_F(AvroReaderTest, AvroWriterBasicType) {
206+
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
207+
SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});
208+
209+
std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])";
210+
211+
WriteAndVerify(schema, expected_string);
212+
}
213+
214+
TEST_F(AvroReaderTest, AvroWriterNestedType) {
215+
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
216+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
217+
SchemaField::MakeRequired(
218+
2, "info",
219+
std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
220+
SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()),
221+
SchemaField::MakeRequired(4, "age", std::make_shared<IntType>())}))});
222+
223+
std::string expected_string =
224+
R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])";
225+
226+
WriteAndVerify(schema, expected_string);
227+
}
228+
164229
} // namespace iceberg::avro

0 commit comments

Comments
 (0)