Skip to content

Commit 4a12ace

Browse files
author
xiao.dong
committed
fix comments
1 parent 598ba40 commit 4a12ace

13 files changed

+166
-144
lines changed

src/iceberg/arrow/nanoarrow_error_transform_internal.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@
1919

2020
#pragma once
2121

22-
#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error) \
23-
if (status != NANOARROW_OK) [[unlikely]] { \
24-
return InvalidArrowData("nanoarrow error: {}", error.message); \
22+
#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status) \
23+
if (status != NANOARROW_OK) [[unlikely]] { \
24+
return iceberg::InvalidArrowData("nanoarrow error: {}", status); \
25+
}
26+
27+
#define ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error) \
28+
if (status != NANOARROW_OK) [[unlikely]] { \
29+
return iceberg::InvalidArrowData("nanoarrow error: {} msg:{}", status, \
30+
error.message); \
2531
}

src/iceberg/manifest_adapter.cc

Lines changed: 80 additions & 87 deletions
Large diffs are not rendered by default.

src/iceberg/manifest_adapter.h

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ class ICEBERG_EXPORT ManifestAdapter {
5252
static Status AppendField(ArrowArray* arrowArray, uint64_t value);
5353
static Status AppendField(ArrowArray* arrowArray, double value);
5454
static Status AppendField(ArrowArray* arrowArray, std::string_view value);
55-
static Status AppendField(ArrowArray* arrowArray,
56-
const std::span<const uint8_t>& value);
55+
static Status AppendField(ArrowArray* arrowArray, std::span<const uint8_t> value);
5756

5857
protected:
5958
ArrowArray array_;
6059
ArrowSchema schema_; // converted from manifest_schema_ or manifest_list_schema_
6160
int64_t size_ = 0;
61+
std::unordered_map<std::string, std::string> metadata_;
6262
};
6363

6464
// \brief Implemented by different versions with different schemas to
@@ -98,19 +98,18 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
9898
const std::map<int32_t, std::vector<uint8_t>>& map_value);
9999

100100
virtual Result<std::optional<int64_t>> GetSequenceNumber(const ManifestEntry& entry);
101-
virtual Result<std::optional<std::string>> GetWrappedReferenceDataFile(
101+
virtual Result<std::optional<std::string>> GetReferenceDataFile(
102102
const std::shared_ptr<DataFile>& file);
103-
virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(
103+
virtual Result<std::optional<int64_t>> GetFirstRowId(
104104
const std::shared_ptr<DataFile>& file);
105-
virtual Result<std::optional<int64_t>> GetWrappedContentOffset(
105+
virtual Result<std::optional<int64_t>> GetContentOffset(
106106
const std::shared_ptr<DataFile>& file);
107-
virtual Result<std::optional<int64_t>> GetWrappedContentSizeInBytes(
107+
virtual Result<std::optional<int64_t>> GetContentSizeInBytes(
108108
const std::shared_ptr<DataFile>& file);
109109

110110
protected:
111111
std::shared_ptr<PartitionSpec> partition_spec_;
112112
std::shared_ptr<Schema> manifest_schema_;
113-
std::unordered_map<std::string, std::string> metadata_;
114113
};
115114

116115
// \brief Implemented by different versions with different schemas to
@@ -131,17 +130,16 @@ class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
131130
/// schema based on the fields_ids.
132131
Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
133132
Status AppendInternal(const ManifestFile& file);
134-
static Status AppendPartitions(ArrowArray* arrow_array,
135-
const std::shared_ptr<ListType>& partition_type,
136-
const std::vector<PartitionFieldSummary>& partitions);
133+
static Status AppendPartitionSummary(
134+
ArrowArray* arrow_array, const std::shared_ptr<ListType>& summary_type,
135+
const std::vector<PartitionFieldSummary>& summaries);
137136

138137
virtual Result<int64_t> GetSequenceNumber(const ManifestFile& file);
139-
virtual Result<int64_t> GetWrappedMinSequenceNumber(const ManifestFile& file);
140-
virtual Result<std::optional<int64_t>> GetWrappedFirstRowId(const ManifestFile& file);
138+
virtual Result<int64_t> GetMinSequenceNumber(const ManifestFile& file);
139+
virtual Result<std::optional<int64_t>> GetFirstRowId(const ManifestFile& file);
141140

142141
protected:
143142
std::shared_ptr<Schema> manifest_list_schema_;
144-
std::unordered_map<std::string, std::string> metadata_;
145143
};
146144

147145
} // namespace iceberg

src/iceberg/manifest_reader_internal.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,12 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
204204
ArrowError error;
205205
ArrowArrayView array_view;
206206
auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
207-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error);
207+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error);
208208
internal::ArrowArrayViewGuard view_guard(&array_view);
209209
status = ArrowArrayViewSetArray(&array_view, array_in, &error);
210-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error);
210+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error);
211211
status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error);
212-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error);
212+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error);
213213

214214
std::vector<ManifestFile> manifest_files;
215215
manifest_files.resize(array_in->length);
@@ -467,12 +467,12 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
467467
ArrowError error;
468468
ArrowArrayView array_view;
469469
auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
470-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error);
470+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error);
471471
internal::ArrowArrayViewGuard view_guard(&array_view);
472472
status = ArrowArrayViewSetArray(&array_view, array_in, &error);
473-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error);
473+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error);
474474
status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error);
475-
ICEBERG_NANOARROW_RETURN_IF_NOT_OK(status, error);
475+
ICEBERG_NANOARROW_RETURN_IF_NOT_OK_WITH_ERROR(status, error);
476476

477477
std::vector<ManifestEntry> manifest_entries;
478478
manifest_entries.resize(array_in->length);

src/iceberg/partition_spec.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
6060

6161
std::span<const PartitionField> PartitionSpec::fields() const { return fields_; }
6262

63-
Result<std::shared_ptr<Schema>> PartitionSpec::GetPartitionSchema() {
63+
Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
6464
if (fields_.empty()) {
6565
return nullptr;
6666
}
6767
{
6868
std::scoped_lock<std::mutex> lock(mutex_);
69-
if (partition_schema_ != nullptr) {
70-
return partition_schema_;
69+
if (partition_type_ != nullptr) {
70+
return partition_type_;
7171
}
7272
}
7373

@@ -77,6 +77,8 @@ Result<std::shared_ptr<Schema>> PartitionSpec::GetPartitionSchema() {
7777
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
7878
schema_->FindFieldById(partition_field.source_id()));
7979
if (!source_field.has_value()) {
80+
// TODO(xiao.dong) when source field is missing,
81+
// should return an error or just use UNKNOWN type
8082
return InvalidSchema("Cannot find source field for partition field:{}",
8183
partition_field.field_id());
8284
}
@@ -96,10 +98,10 @@ Result<std::shared_ptr<Schema>> PartitionSpec::GetPartitionSchema() {
9698
}
9799

98100
std::scoped_lock<std::mutex> lock(mutex_);
99-
if (partition_schema_ == nullptr) {
100-
partition_schema_ = std::make_shared<Schema>(std::move(partition_fields));
101+
if (partition_type_ == nullptr) {
102+
partition_type_ = std::make_shared<StructType>(std::move(partition_fields));
101103
}
102-
return partition_schema_;
104+
return partition_type_;
103105
}
104106

105107
std::string PartitionSpec::ToString() const {

src/iceberg/partition_spec.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
6969
/// \brief Get a view of the partition fields.
7070
std::span<const PartitionField> fields() const;
7171

72-
Result<std::shared_ptr<Schema>> GetPartitionSchema();
72+
Result<std::shared_ptr<StructType>> PartitionType();
7373

7474
std::string ToString() const override;
7575

@@ -88,7 +88,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
8888
std::vector<PartitionField> fields_;
8989
int32_t last_assigned_field_id_;
9090
std::mutex mutex_;
91-
std::shared_ptr<Schema> partition_schema_;
91+
std::shared_ptr<StructType> partition_type_;
9292
};
9393

9494
} // namespace iceberg

src/iceberg/test/manifest_reader_writer_test.cc

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ class ManifestReaderV1Test : public ManifestReaderTestBase {
154154
void TestWriteManifest(const std::string& manifest_list_path,
155155
std::shared_ptr<PartitionSpec> partition_spec,
156156
const std::vector<ManifestEntry>& manifest_entries) {
157-
std::cout << "Writing manifest list to " << manifest_list_path << std::endl;
158157
auto result = ManifestWriter::MakeV1Writer(1, manifest_list_path, file_io_,
159158
std::move(partition_spec));
160159
ASSERT_TRUE(result.has_value()) << result.error().message;
@@ -167,6 +166,7 @@ class ManifestReaderV1Test : public ManifestReaderTestBase {
167166
};
168167

169168
TEST_F(ManifestReaderV1Test, PartitionedTest) {
169+
// TODO(xiao.dong) we need to add more cases for different partition types
170170
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
171171
auto partition_schema =
172172
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
@@ -176,13 +176,15 @@ TEST_F(ManifestReaderV1Test, PartitionedTest) {
176176
}
177177

178178
TEST_F(ManifestReaderV1Test, WritePartitionedTest) {
179+
iceberg::SchemaField table_field(1, "order_ts_hour_source", iceberg::int32(), true);
179180
iceberg::SchemaField partition_field(1000, "order_ts_hour", iceberg::int32(), true);
181+
auto table_schema = std::make_shared<Schema>(std::vector<SchemaField>({table_field}));
180182
auto partition_schema =
181183
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
182184
auto identity_transform = Transform::Identity();
183185
std::vector<PartitionField> fields{
184-
PartitionField(1000, 1000, "order_ts_hour", identity_transform)};
185-
auto partition_spec = std::make_shared<PartitionSpec>(partition_schema, 1, fields);
186+
PartitionField(1, 1000, "order_ts_hour", identity_transform)};
187+
auto partition_spec = std::make_shared<PartitionSpec>(table_schema, 1, fields);
186188

187189
auto expected_entries = PreparePartitionedTestData();
188190
auto write_manifest_path = CreateNewTempFilePath();
@@ -259,7 +261,6 @@ class ManifestReaderV2Test : public ManifestReaderTestBase {
259261
void TestWriteManifest(int64_t snapshot_id, const std::string& manifest_list_path,
260262
std::shared_ptr<PartitionSpec> partition_spec,
261263
const std::vector<ManifestEntry>& manifest_entries) {
262-
std::cout << "Writing manifest list to " << manifest_list_path << std::endl;
263264
auto result = ManifestWriter::MakeV2Writer(snapshot_id, manifest_list_path, file_io_,
264265
std::move(partition_spec));
265266
ASSERT_TRUE(result.has_value()) << result.error().message;
@@ -301,10 +302,15 @@ TEST_F(ManifestReaderV2Test, WriteInheritancePartitionedTest) {
301302
auto expected_entries = PrepareMetadataInheritanceTestData();
302303
auto write_manifest_path = CreateNewTempFilePath();
303304
TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr, expected_entries);
304-
for (auto& entry : expected_entries) {
305-
entry.data_file->partition_spec_id = PartitionSpec::kInitialSpecId;
306-
}
307-
TestManifestReadingByPath(write_manifest_path, expected_entries);
305+
ManifestFile manifest_file{
306+
.manifest_path = write_manifest_path,
307+
.manifest_length = 100,
308+
.partition_spec_id = 12,
309+
.content = ManifestFile::Content::kData,
310+
.sequence_number = 15,
311+
.added_snapshot_id = 679879563479918846LL,
312+
};
313+
TestManifestReadingWithManifestFile(manifest_file, expected_entries);
308314
}
309315

310316
} // namespace iceberg

src/iceberg/test/partition_spec_test.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,23 @@ TEST(PartitionSpecTest, Equality) {
8787
ASSERT_NE(schema1, schema6);
8888
ASSERT_NE(schema6, schema1);
8989
}
90+
91+
TEST(PartitionSpecTest, PartitionSchemaTest) {
92+
SchemaField field1(5, "ts", iceberg::timestamp(), true);
93+
SchemaField field2(7, "bar", iceberg::string(), true);
94+
auto const schema =
95+
std::make_shared<Schema>(std::vector<SchemaField>{field1, field2}, 100);
96+
auto identity_transform = Transform::Identity();
97+
PartitionField pt_field1(5, 1000, "day", identity_transform);
98+
PartitionField pt_field2(7, 1001, "hour", identity_transform);
99+
PartitionSpec spec(schema, 100, {pt_field1, pt_field2});
100+
101+
auto partition_schema = spec.PartitionType();
102+
ASSERT_TRUE(partition_schema.has_value());
103+
ASSERT_EQ(2, partition_schema.value()->fields().size());
104+
EXPECT_EQ(pt_field1.name(), partition_schema.value()->fields()[0].name());
105+
EXPECT_EQ(pt_field1.field_id(), partition_schema.value()->fields()[0].field_id());
106+
EXPECT_EQ(pt_field2.name(), partition_schema.value()->fields()[1].name());
107+
EXPECT_EQ(pt_field2.field_id(), partition_schema.value()->fields()[1].field_id());
108+
}
90109
} // namespace iceberg

src/iceberg/v1_metadata.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ Result<std::shared_ptr<StructType>> ManifestEntryAdapterV1::GetManifestEntryStru
6868
// Deprecated. Always write a default in v1. Do not write in v2 or v3.
6969
static const SchemaField kBlockSizeInBytes = SchemaField::MakeRequired(
7070
105, "block_size_in_bytes", int64(), "Block size in bytes");
71-
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->GetPartitionSchema());
71+
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType());
7272
if (!partition_type) {
7373
partition_type = PartitionSpec::Unpartitioned()->schema();
7474
}

src/iceberg/v2_metadata.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ Result<std::optional<int64_t>> ManifestEntryAdapterV2::GetSequenceNumber(
9090
return entry.sequence_number;
9191
}
9292

93-
Result<std::optional<std::string>> ManifestEntryAdapterV2::GetWrappedReferenceDataFile(
93+
Result<std::optional<std::string>> ManifestEntryAdapterV2::GetReferenceDataFile(
9494
const std::shared_ptr<DataFile>& file) {
9595
if (file->content == DataFile::Content::kPositionDeletes) {
9696
return file->referenced_data_file;
@@ -141,8 +141,7 @@ Result<int64_t> ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& fil
141141
return file.sequence_number;
142142
}
143143

144-
Result<int64_t> ManifestFileAdapterV2::GetWrappedMinSequenceNumber(
145-
const ManifestFile& file) {
144+
Result<int64_t> ManifestFileAdapterV2::GetMinSequenceNumber(const ManifestFile& file) {
146145
if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
147146
if (snapshot_id_ != file.added_snapshot_id) {
148147
return InvalidManifestList(

0 commit comments

Comments
 (0)