Skip to content

Commit 977466d

Browse files
author
xiao.dong
committed
fix comment and conflict
1 parent d0556df commit 977466d

16 files changed

+55
-47
lines changed

src/iceberg/arrow/nanoarrow_error_transform_internal.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
#pragma once
2121

22-
#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error) \
23-
if (status != NANOARROW_OK) [[unlikely]] { \
24-
return InvalidArrowData("Nanoarrow error msg: {}", error.message); \
22+
#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error) \
23+
if (status != NANOARROW_OK) [[unlikely]] { \
24+
return InvalidArrowData("nanoarrow error: {}", error.message); \
2525
}

src/iceberg/avro/avro_writer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ class AvroWriter::Impl {
7575
return {};
7676
}
7777

78-
Status Write(ArrowArray& data) {
78+
Status Write(ArrowArray* data) {
7979
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
80-
::arrow::ImportArray(&data, &arrow_schema_));
80+
::arrow::ImportArray(data, &arrow_schema_));
8181

8282
for (int64_t i = 0; i < result->length(); i++) {
8383
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, datum_.get()));
@@ -119,7 +119,7 @@ class AvroWriter::Impl {
119119

120120
AvroWriter::~AvroWriter() = default;
121121

122-
Status AvroWriter::Write(ArrowArray& data) { return impl_->Write(data); }
122+
Status AvroWriter::Write(ArrowArray* data) { return impl_->Write(data); }
123123

124124
Status AvroWriter::Open(const WriterOptions& options) {
125125
impl_ = std::make_unique<Impl>();

src/iceberg/avro/avro_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
3535

3636
Status Close() final;
3737

38-
Status Write(ArrowArray& data) final;
38+
Status Write(ArrowArray* data) final;
3939

4040
std::optional<Metrics> metrics() final;
4141

src/iceberg/file_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class ICEBERG_EXPORT Writer {
6565
/// \brief Write arrow data to the file.
6666
///
6767
/// \return Status of write results.
68-
virtual Status Write(ArrowArray& data) = 0;
68+
virtual Status Write(ArrowArray* data) = 0;
6969

7070
/// \brief Get the file statistics.
7171
/// Only valid after the file is closed.

src/iceberg/manifest_adapter.cc

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,19 @@
1919

2020
#include "iceberg/manifest_adapter.h"
2121

22+
#include <nanoarrow/nanoarrow.h>
23+
2224
#include "iceberg/arrow/nanoarrow_error_transform_internal.h"
2325
#include "iceberg/manifest_entry.h"
2426
#include "iceberg/manifest_list.h"
2527
#include "iceberg/schema.h"
2628
#include "iceberg/schema_internal.h"
2729
#include "iceberg/util/checked_cast.h"
2830
#include "iceberg/util/macros.h"
29-
#include "nanoarrow/nanoarrow.h"
3031

31-
#define NANOARROW_RETURN_IF_FAILED(status) \
32-
if (status != NANOARROW_OK) [[unlikely]] { \
33-
return InvalidArrowData("Nanoarrow error code: {}", status); \
32+
#define NANOARROW_RETURN_IF_FAILED(status) \
33+
if (status != NANOARROW_OK) [[unlikely]] { \
34+
return InvalidArrowData("nanoarrow error: {}", status); \
3435
}
3536

3637
namespace {
@@ -43,20 +44,20 @@ Status ManifestAdapter::StartAppending() {
4344
if (size_ > 0) {
4445
return InvalidArgument("Adapter buffer not empty, cannot start appending.");
4546
}
46-
array_ = std::make_shared<ArrowArray>();
47+
array_ = {};
4748
size_ = 0;
4849
ArrowError error;
49-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
50-
ArrowArrayInitFromSchema(array_.get(), &schema_, &error), error);
51-
NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(array_.get()));
50+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayInitFromSchema(&array_, &schema_, &error),
51+
error);
52+
NANOARROW_RETURN_IF_FAILED(ArrowArrayStartAppending(&array_));
5253
return {};
5354
}
5455

55-
Result<std::shared_ptr<ArrowArray>> ManifestAdapter::FinishAppending() {
56+
Result<ArrowArray*> ManifestAdapter::FinishAppending() {
5657
ArrowError error;
57-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(
58-
ArrowArrayFinishBuildingDefault(array_.get(), &error), error);
59-
return array_;
58+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(ArrowArrayFinishBuildingDefault(&array_, &error),
59+
error);
60+
return &array_;
6061
}
6162

6263
Status ManifestAdapter::AppendField(ArrowArray* arrowArray, int64_t value) {
@@ -90,8 +91,8 @@ Status ManifestAdapter::AppendField(ArrowArray* arrowArray,
9091
}
9192

9293
ManifestEntryAdapter::~ManifestEntryAdapter() {
93-
if (array_ != nullptr && array_->release != nullptr) {
94-
ArrowArrayRelease(array_.get());
94+
if (array_.release != nullptr) {
95+
ArrowArrayRelease(&array_);
9596
}
9697
if (schema_.release != nullptr) {
9798
ArrowSchemaRelease(&schema_);
@@ -102,7 +103,7 @@ Result<std::shared_ptr<StructType>> ManifestEntryAdapter::GetManifestEntryStruct
102103
if (partition_spec_ == nullptr) {
103104
return ManifestEntry::TypeFromPartitionType(nullptr);
104105
}
105-
ICEBERG_ASSIGN_OR_RAISE(auto partition_schema, partition_spec_->partition_schema());
106+
ICEBERG_ASSIGN_OR_RAISE(auto partition_schema, partition_spec_->GetPartitionSchema());
106107
return ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
107108
}
108109

@@ -215,6 +216,7 @@ Status ManifestEntryAdapter::AppendMap(ArrowArray* arrow_array,
215216
auto value_array = map_array->children[1];
216217
ICEBERG_RETURN_UNEXPECTED(AppendField(key_array, static_cast<int64_t>(key)));
217218
ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
219+
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(map_array));
218220
}
219221
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
220222
return {};
@@ -231,6 +233,7 @@ Status ManifestEntryAdapter::AppendMap(
231233
auto value_array = map_array->children[1];
232234
ICEBERG_RETURN_UNEXPECTED(AppendField(key_array, static_cast<int64_t>(key)));
233235
ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
236+
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(map_array));
234237
}
235238
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(arrow_array));
236239
return {};
@@ -380,7 +383,7 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
380383
const auto& fields = manifest_schema_->fields();
381384
for (int32_t i = 0; i < fields.size(); i++) {
382385
const auto& field = fields[i];
383-
auto array = array_->children[i];
386+
auto array = array_.children[i];
384387

385388
switch (field.field_id()) {
386389
case 0: // status (required int32)
@@ -427,7 +430,7 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
427430
}
428431
}
429432

430-
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(array_.get()));
433+
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(&array_));
431434
size_++;
432435
return {};
433436
}
@@ -464,8 +467,8 @@ Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>& field
464467
}
465468

466469
ManifestFileAdapter::~ManifestFileAdapter() {
467-
if (array_ != nullptr && array_->release != nullptr) {
468-
ArrowArrayRelease(array_.get());
470+
if (array_.release != nullptr) {
471+
ArrowArrayRelease(&array_);
469472
}
470473
if (schema_.release != nullptr) {
471474
ArrowSchemaRelease(&schema_);
@@ -553,7 +556,7 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) {
553556
const auto& fields = manifest_list_schema_->fields();
554557
for (int32_t i = 0; i < fields.size(); i++) {
555558
const auto& field = fields[i];
556-
auto array = array_->children[i];
559+
auto array = array_.children[i];
557560
switch (field.field_id()) {
558561
case 500: // manifest_path
559562
ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_path));
@@ -662,7 +665,7 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) {
662665
return InvalidManifestList("Unknown field id: {}", field.field_id());
663666
}
664667
}
665-
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(array_.get()));
668+
NANOARROW_RETURN_IF_FAILED(ArrowArrayFinishElement(&array_));
666669
size_++;
667670
return {};
668671
}

src/iceberg/manifest_adapter.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class ICEBERG_EXPORT ManifestAdapter {
4444
virtual Status Init() = 0;
4545

4646
Status StartAppending();
47-
Result<std::shared_ptr<ArrowArray>> FinishAppending();
47+
Result<ArrowArray*> FinishAppending();
4848
int64_t size() const { return size_; }
4949

5050
protected:
@@ -56,7 +56,7 @@ class ICEBERG_EXPORT ManifestAdapter {
5656
const std::span<const uint8_t>& value);
5757

5858
protected:
59-
std::shared_ptr<ArrowArray> array_;
59+
ArrowArray array_;
6060
ArrowSchema schema_; // converted from manifest_schema_ or manifest_list_schema_
6161
int64_t size_ = 0;
6262
};

src/iceberg/manifest_writer.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ namespace iceberg {
3232
Status ManifestWriter::Add(const ManifestEntry& entry) {
3333
if (adapter_->size() >= kBatchSize) {
3434
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
35-
ICEBERG_RETURN_UNEXPECTED(writer_->Write(*array));
35+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
3636
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
3737
}
3838
return adapter_->Append(entry);
@@ -48,7 +48,7 @@ Status ManifestWriter::AddAll(const std::vector<ManifestEntry>& entries) {
4848
Status ManifestWriter::Close() {
4949
if (adapter_->size() > 0) {
5050
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
51-
ICEBERG_RETURN_UNEXPECTED(writer_->Write(*array));
51+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
5252
}
5353
return writer_->Close();
5454
}
@@ -113,7 +113,7 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
113113
Status ManifestListWriter::Add(const ManifestFile& file) {
114114
if (adapter_->size() >= kBatchSize) {
115115
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
116-
ICEBERG_RETURN_UNEXPECTED(writer_->Write(*array));
116+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
117117
ICEBERG_RETURN_UNEXPECTED(adapter_->StartAppending());
118118
}
119119
return adapter_->Append(file);
@@ -129,7 +129,7 @@ Status ManifestListWriter::AddAll(const std::vector<ManifestFile>& files) {
129129
Status ManifestListWriter::Close() {
130130
if (adapter_->size() > 0) {
131131
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
132-
ICEBERG_RETURN_UNEXPECTED(writer_->Write(*array));
132+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
133133
}
134134
return writer_->Close();
135135
}

src/iceberg/parquet/parquet_writer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ class ParquetWriter::Impl {
7676
return {};
7777
}
7878

79-
Status Write(ArrowArray& array) {
79+
Status Write(ArrowArray* array) {
8080
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
81-
::arrow::ImportRecordBatch(&array, arrow_schema_));
81+
::arrow::ImportRecordBatch(array, arrow_schema_));
8282

8383
ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
8484

@@ -132,7 +132,7 @@ Status ParquetWriter::Open(const WriterOptions& options) {
132132
return impl_->Open(options);
133133
}
134134

135-
Status ParquetWriter::Write(ArrowArray& array) { return impl_->Write(array); }
135+
Status ParquetWriter::Write(ArrowArray* array) { return impl_->Write(array); }
136136

137137
Status ParquetWriter::Close() { return impl_->Close(); }
138138

src/iceberg/parquet/parquet_writer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer {
3535

3636
Status Close() final;
3737

38-
Status Write(ArrowArray& array) final;
38+
Status Write(ArrowArray* array) final;
3939

4040
std::optional<Metrics> metrics() final;
4141

src/iceberg/partition_spec.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
6060

6161
std::span<const PartitionField> PartitionSpec::fields() const { return fields_; }
6262

63-
Result<std::shared_ptr<Schema>> PartitionSpec::partition_schema() {
63+
Result<std::shared_ptr<Schema>> PartitionSpec::GetPartitionSchema() {
6464
if (fields_.empty()) {
6565
return nullptr;
6666
}
@@ -92,8 +92,7 @@ Result<std::shared_ptr<Schema>> PartitionSpec::partition_schema() {
9292
partition_fields.emplace_back(partition_field.field_id(),
9393
std::string(partition_field.name()),
9494
std::move(result_type),
95-
true // optional
96-
);
95+
/*optional=*/true);
9796
}
9897

9998
std::scoped_lock<std::mutex> lock(mutex_);

0 commit comments

Comments
 (0)