Skip to content

Commit ef3f7af

Browse files
author
nullccxsy
committed
feat: implement AvroWriter Write method
1 parent 9f13bac commit ef3f7af

File tree

2 files changed

+113
-4
lines changed

2 files changed

+113
-4
lines changed

src/iceberg/avro/avro_writer.cc

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626
#include <arrow/record_batch.h>
2727
#include <arrow/result.h>
2828
#include <avro/DataFile.hh>
29+
#include <avro/Generic.hh>
2930
#include <avro/GenericDatum.hh>
3031

3132
#include "iceberg/arrow/arrow_error_transform_internal.h"
3233
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
34+
#include "iceberg/avro/avro_data_util_internal.h"
3335
#include "iceberg/avro/avro_register.h"
3436
#include "iceberg/avro/avro_schema_util_internal.h"
3537
#include "iceberg/avro/avro_stream_internal.h"
3638
#include "iceberg/schema.h"
39+
#include "iceberg/schema_internal.h"
3740
#include "iceberg/util/checked_cast.h"
3841
#include "iceberg/util/macros.h"
3942

@@ -73,9 +76,22 @@ class AvroWriter::Impl {
7376
return {};
7477
}
7578

76-
Status Write(ArrowArray /*data*/) {
77-
// TODO(xiao.dong) convert data and write to avro
78-
// total_bytes_+= written_bytes;
79+
Status Write(ArrowArray data) {
80+
ArrowSchema arrow_schema;
81+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema));
82+
auto import_result = ::arrow::ImportArray(&data, &arrow_schema);
83+
if (!import_result.ok()) {
84+
std::cout << import_result.status() << std::endl;
85+
return InvalidArgument("Failed to import ArrowArray: {}",
86+
import_result.status().ToString());
87+
}
88+
89+
auto arrow_array = import_result.ValueOrDie();
90+
::avro::GenericDatum datum(*avro_schema_);
91+
for (int64_t i = 0; i < arrow_array->length(); i++) {
92+
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*arrow_array, i, &datum));
93+
writer_->write(datum);
94+
}
7995
return {};
8096
}
8197

test/avro_test.cc

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
#include <arrow/array/array_base.h>
2121
#include <arrow/c/bridge.h>
22-
#include <arrow/c/helpers.h>
2322
#include <arrow/filesystem/localfs.h>
2423
#include <arrow/io/file.h>
2524
#include <arrow/json/from_string.h>
@@ -28,11 +27,14 @@
2827
#include <avro/Generic.hh>
2928
#include <avro/GenericDatum.hh>
3029
#include <gtest/gtest.h>
30+
#include <nanoarrow/nanoarrow.hpp>
3131

3232
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3333
#include "iceberg/avro/avro_register.h"
34+
#include "iceberg/avro/avro_writer.h"
3435
#include "iceberg/file_reader.h"
3536
#include "iceberg/schema.h"
37+
#include "iceberg/schema_internal.h"
3638
#include "iceberg/type.h"
3739
#include "matchers.h"
3840
#include "temp_file_test_base.h"
@@ -105,6 +107,30 @@ class AvroReaderTest : public TempFileTestBase {
105107
ASSERT_FALSE(data.value().has_value());
106108
}
107109

110+
void WriteAndVerify(std::shared_ptr<Schema> schema, const std::string& expected_string,
111+
ArrowArray array) {
112+
iceberg::WriterOptions options;
113+
options.schema = schema;
114+
options.path = temp_avro_file_;
115+
options.io = file_io_;
116+
117+
auto writer_result =
118+
iceberg::WriterFactoryRegistry::Open(iceberg::FileFormatType::kAvro, options);
119+
ASSERT_TRUE(writer_result.has_value());
120+
auto writer = std::move(writer_result).value();
121+
ASSERT_THAT(writer->Write(array), IsOk());
122+
ASSERT_THAT(writer->Close(), IsOk());
123+
124+
auto reader_result = ReaderFactoryRegistry::Open(
125+
FileFormatType::kAvro,
126+
{.path = temp_avro_file_, .io = file_io_, .projection = schema});
127+
ASSERT_THAT(reader_result, IsOk());
128+
auto reader = std::move(reader_result.value());
129+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
130+
131+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
132+
}
133+
108134
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
109135
std::shared_ptr<FileIO> file_io_;
110136
std::string temp_avro_file_;
@@ -161,4 +187,71 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
161187
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
162188
}
163189

190+
TEST_F(AvroReaderTest, AvroWriterBasicType) {
191+
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
192+
SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});
193+
194+
ArrowSchema struct_schema;
195+
ASSERT_THAT(ToArrowSchema(*schema, &struct_schema), IsOk());
196+
197+
ArrowArray array;
198+
NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr));
199+
NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array));
200+
201+
std::vector<std::string> str_values{"Hello", "世界", "nanoarrow"};
202+
std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])";
203+
204+
for (const auto& element : str_values) {
205+
NANOARROW_THROW_NOT_OK(
206+
ArrowArrayAppendString(array.children[0], ArrowCharView(element.c_str())));
207+
NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array));
208+
}
209+
NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr));
210+
211+
WriteAndVerify(schema, expected_string, array);
212+
213+
ArrowSchemaRelease(&struct_schema);
214+
}
215+
216+
TEST_F(AvroReaderTest, AvroWriterNestedType) {
217+
auto nested_schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
218+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
219+
SchemaField::MakeRequired(
220+
2, "info",
221+
std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
222+
SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()),
223+
SchemaField::MakeRequired(4, "age", std::make_shared<IntType>())}))});
224+
225+
ArrowSchema struct_schema;
226+
ASSERT_THAT(ToArrowSchema(*nested_schema, &struct_schema), IsOk());
227+
228+
ArrowArray array;
229+
NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr));
230+
231+
std::vector<int> int_array = {1, 2, 3};
232+
std::vector<std::pair<std::string, int>> info_array = {
233+
{"Alice", 25}, {"Bob", 30}, {"Ivy", 35}};
234+
235+
std::string expected_string =
236+
R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])";
237+
238+
NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array));
239+
240+
for (int i = 0; i < int_array.size(); i++) {
241+
NANOARROW_THROW_NOT_OK(ArrowArrayAppendInt(array.children[0], int_array[i]));
242+
243+
NANOARROW_THROW_NOT_OK(ArrowArrayAppendString(
244+
array.children[1]->children[0], ArrowCharView(info_array[i].first.c_str())));
245+
NANOARROW_THROW_NOT_OK(
246+
ArrowArrayAppendInt(array.children[1]->children[1], info_array[i].second));
247+
248+
NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(array.children[1]));
249+
NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array));
250+
}
251+
252+
NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr));
253+
WriteAndVerify(nested_schema, expected_string, array);
254+
ArrowSchemaRelease(&struct_schema);
255+
}
256+
164257
} // namespace iceberg::avro

0 commit comments

Comments
 (0)