Skip to content

Commit 8eaad5c

Browse files
author
nullccxsy
committed
fix comments
1 parent ef3f7af commit 8eaad5c

File tree

4 files changed

+51
-72
lines changed

4 files changed

+51
-72
lines changed

src/iceberg/avro/avro_stream_internal.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,8 @@ void AvroOutputStream::flush() {
145145
}
146146
}
147147

148+
std::shared_ptr<::arrow::io::OutputStream> AvroOutputStream::get_output_stream() const {
149+
return output_stream_;
150+
}
151+
148152
} // namespace iceberg::avro

src/iceberg/avro/avro_stream_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class AvroOutputStream : public ::avro::OutputStream {
8787
/// store, if any.
8888
void flush() override;
8989

90+
std::shared_ptr<::arrow::io::OutputStream> get_output_stream() const;
91+
9092
private:
9193
std::shared_ptr<::arrow::io::OutputStream> output_stream_;
9294
const int64_t buffer_size_;

src/iceberg/avro/avro_writer.cc

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,35 +70,32 @@ class AvroWriter::Impl {
7070
constexpr int64_t kDefaultBufferSize = 1024 * 1024;
7171
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
7272
CreateOutputStream(options, kDefaultBufferSize));
73-
73+
arrow_output_stream_ = output_stream->get_output_stream();
7474
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
7575
std::move(output_stream), *avro_schema_);
76+
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
77+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
7678
return {};
7779
}
7880

7981
Status Write(ArrowArray data) {
80-
ArrowSchema arrow_schema;
81-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema));
82-
auto import_result = ::arrow::ImportArray(&data, &arrow_schema);
83-
if (!import_result.ok()) {
84-
std::cout << import_result.status() << std::endl;
85-
return InvalidArgument("Failed to import ArrowArray: {}",
86-
import_result.status().ToString());
87-
}
82+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
83+
::arrow::ImportArray(&data, &arrow_schema_));
8884

89-
auto arrow_array = import_result.ValueOrDie();
90-
::avro::GenericDatum datum(*avro_schema_);
91-
for (int64_t i = 0; i < arrow_array->length(); i++) {
92-
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*arrow_array, i, &datum));
93-
writer_->write(datum);
85+
for (int64_t i = 0; i < result->length(); i++) {
86+
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get()));
87+
writer_->write(*datum_);
9488
}
89+
9590
return {};
9691
}
9792

9893
Status Close() {
9994
if (writer_ != nullptr) {
10095
writer_->close();
10196
writer_.reset();
97+
ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, arrow_output_stream_->Tell());
98+
ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
10299
}
103100
return {};
104101
}
@@ -115,6 +112,12 @@ class AvroWriter::Impl {
115112
std::shared_ptr<::avro::ValidSchema> avro_schema_;
116113
// The avro writer to write the data into a datum.
117114
std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
115+
// Arrow schema for data conversion (C API format)
116+
ArrowSchema arrow_schema_;
117+
// Reusable Avro datum for writing individual records
118+
std::unique_ptr<::avro::GenericDatum> datum_;
119+
// Arrow output stream for writing data to filesystem
120+
std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_;
118121
};
119122

120123
AvroWriter::~AvroWriter() = default;

test/avro_test.cc

Lines changed: 28 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
#include <avro/Generic.hh>
2828
#include <avro/GenericDatum.hh>
2929
#include <gtest/gtest.h>
30-
#include <nanoarrow/nanoarrow.hpp>
3130

3231
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3332
#include "iceberg/avro/avro_register.h"
@@ -107,27 +106,41 @@ class AvroReaderTest : public TempFileTestBase {
107106
ASSERT_FALSE(data.value().has_value());
108107
}
109108

110-
void WriteAndVerify(std::shared_ptr<Schema> schema, const std::string& expected_string,
111-
ArrowArray array) {
112-
iceberg::WriterOptions options;
113-
options.schema = schema;
114-
options.path = temp_avro_file_;
115-
options.io = file_io_;
109+
void WriteAndVerify(std::shared_ptr<Schema> schema,
110+
const std::string& expected_string) {
111+
ArrowSchema arrow_c_schema;
112+
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
116113

117-
auto writer_result =
118-
iceberg::WriterFactoryRegistry::Open(iceberg::FileFormatType::kAvro, options);
114+
auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
115+
ASSERT_TRUE(arrow_schema_result.ok());
116+
auto arrow_schema = arrow_schema_result.ValueOrDie();
117+
118+
auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, expected_string);
119+
ASSERT_TRUE(array_result.ok());
120+
auto array = array_result.ValueOrDie();
121+
122+
struct ArrowArray arrow_array;
123+
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
124+
ASSERT_TRUE(export_result.ok());
125+
126+
auto writer_result = WriterFactoryRegistry::Open(
127+
FileFormatType::kAvro,
128+
{.path = temp_avro_file_, .io = file_io_, .schema = schema});
119129
ASSERT_TRUE(writer_result.has_value());
120-
auto writer = std::move(writer_result).value();
121-
ASSERT_THAT(writer->Write(array), IsOk());
130+
auto writer = std::move(writer_result.value());
131+
ASSERT_THAT(writer->Write(arrow_array), IsOk());
122132
ASSERT_THAT(writer->Close(), IsOk());
123133

134+
auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
135+
ASSERT_TRUE(file_info_result.ok());
136+
ASSERT_EQ(file_info_result->size(), writer->length().value());
137+
124138
auto reader_result = ReaderFactoryRegistry::Open(
125139
FileFormatType::kAvro,
126140
{.path = temp_avro_file_, .io = file_io_, .projection = schema});
127141
ASSERT_THAT(reader_result, IsOk());
128142
auto reader = std::move(reader_result.value());
129143
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
130-
131144
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
132145
}
133146

@@ -191,67 +204,24 @@ TEST_F(AvroReaderTest, AvroWriterBasicType) {
191204
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
192205
SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});
193206

194-
ArrowSchema struct_schema;
195-
ASSERT_THAT(ToArrowSchema(*schema, &struct_schema), IsOk());
196-
197-
ArrowArray array;
198-
NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr));
199-
NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array));
200-
201-
std::vector<std::string> str_values{"Hello", "世界", "nanoarrow"};
202207
std::string expected_string = R"([["Hello"], ["世界"], ["nanoarrow"]])";
203208

204-
for (const auto& element : str_values) {
205-
NANOARROW_THROW_NOT_OK(
206-
ArrowArrayAppendString(array.children[0], ArrowCharView(element.c_str())));
207-
NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array));
208-
}
209-
NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr));
210-
211-
WriteAndVerify(schema, expected_string, array);
212-
213-
ArrowSchemaRelease(&struct_schema);
209+
WriteAndVerify(schema, expected_string);
214210
}
215211

216212
TEST_F(AvroReaderTest, AvroWriterNestedType) {
217-
auto nested_schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
213+
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
218214
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
219215
SchemaField::MakeRequired(
220216
2, "info",
221217
std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
222218
SchemaField::MakeRequired(3, "name", std::make_shared<StringType>()),
223219
SchemaField::MakeRequired(4, "age", std::make_shared<IntType>())}))});
224220

225-
ArrowSchema struct_schema;
226-
ASSERT_THAT(ToArrowSchema(*nested_schema, &struct_schema), IsOk());
227-
228-
ArrowArray array;
229-
NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(&array, &struct_schema, nullptr));
230-
231-
std::vector<int> int_array = {1, 2, 3};
232-
std::vector<std::pair<std::string, int>> info_array = {
233-
{"Alice", 25}, {"Bob", 30}, {"Ivy", 35}};
234-
235221
std::string expected_string =
236222
R"([[1, ["Alice", 25]], [2, ["Bob", 30]], [3, ["Ivy", 35]]])";
237223

238-
NANOARROW_THROW_NOT_OK(ArrowArrayStartAppending(&array));
239-
240-
for (int i = 0; i < int_array.size(); i++) {
241-
NANOARROW_THROW_NOT_OK(ArrowArrayAppendInt(array.children[0], int_array[i]));
242-
243-
NANOARROW_THROW_NOT_OK(ArrowArrayAppendString(
244-
array.children[1]->children[0], ArrowCharView(info_array[i].first.c_str())));
245-
NANOARROW_THROW_NOT_OK(
246-
ArrowArrayAppendInt(array.children[1]->children[1], info_array[i].second));
247-
248-
NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(array.children[1]));
249-
NANOARROW_THROW_NOT_OK(ArrowArrayFinishElement(&array));
250-
}
251-
252-
NANOARROW_THROW_NOT_OK(ArrowArrayFinishBuildingDefault(&array, nullptr));
253-
WriteAndVerify(nested_schema, expected_string, array);
254-
ArrowSchemaRelease(&struct_schema);
224+
WriteAndVerify(schema, expected_string);
255225
}
256226

257227
} // namespace iceberg::avro

0 commit comments

Comments
 (0)