Skip to content

Commit 9712fd3

Browse files
author
xiao.dong
committed
refactor some impl
1 parent a9f4525 commit 9712fd3

File tree

6 files changed

+165
-143
lines changed

6 files changed

+165
-143
lines changed

src/iceberg/manifest_reader.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
namespace iceberg {
2929

30-
Result<std::shared_ptr<ManifestReader>> ManifestReader::MakeReader(
30+
Result<std::unique_ptr<ManifestReader>> ManifestReader::MakeReader(
3131
const std::string& manifest_location, std::shared_ptr<FileIO> file_io,
3232
std::shared_ptr<Schema> partition_schema) {
3333
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
@@ -39,10 +39,10 @@ Result<std::shared_ptr<ManifestReader>> ManifestReader::MakeReader(
3939
ReaderFactoryRegistry::Open(
4040
FileFormatType::kAvro,
4141
{.path = manifest_location, .io = std::move(file_io), .projection = schema}));
42-
return std::make_shared<ManifestReaderImpl>(std::move(reader), std::move(schema));
42+
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema));
4343
}
4444

45-
Result<std::shared_ptr<ManifestListReader>> ManifestListReader::MakeReader(
45+
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::MakeReader(
4646
const std::string& manifest_list_location, std::shared_ptr<FileIO> file_io) {
4747
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
4848
ManifestFile::Type().fields().end());
@@ -52,7 +52,7 @@ Result<std::shared_ptr<ManifestListReader>> ManifestListReader::MakeReader(
5252
ReaderFactoryRegistry::Open(FileFormatType::kAvro, {.path = manifest_list_location,
5353
.io = std::move(file_io),
5454
.projection = schema}));
55-
return std::make_shared<ManifestListReaderImpl>(std::move(reader), std::move(schema));
55+
return std::make_unique<ManifestListReaderImpl>(std::move(reader), std::move(schema));
5656
}
5757

5858
} // namespace iceberg

src/iceberg/manifest_reader.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class ICEBERG_EXPORT ManifestReader {
3636
public:
3737
virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
3838

39-
static Result<std::shared_ptr<ManifestReader>> MakeReader(
39+
static Result<std::unique_ptr<ManifestReader>> MakeReader(
4040
const std::string& manifest_location, std::shared_ptr<FileIO> file_io,
4141
std::shared_ptr<Schema> partition_schema);
4242
};
@@ -46,7 +46,7 @@ class ICEBERG_EXPORT ManifestListReader {
4646
public:
4747
virtual Result<std::vector<ManifestFile>> Files() const = 0;
4848

49-
static Result<std::shared_ptr<ManifestListReader>> MakeReader(
49+
static Result<std::unique_ptr<ManifestListReader>> MakeReader(
5050
const std::string& manifest_list_location, std::shared_ptr<FileIO> file_io);
5151
};
5252

src/iceberg/manifest_reader_internal.cc

Lines changed: 153 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -33,169 +33,190 @@
3333

3434
namespace iceberg {
3535

36-
#define ARROW_RETURN_IF_NOT_OK(status, error) \
36+
#define NANOARROW_RETURN_IF_NOT_OK(status, error) \
3737
if (status != NANOARROW_OK) [[unlikely]] { \
3838
return InvalidArrowData("Nanoarrow error: {}", error.message); \
3939
}
4040

41+
Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column,
42+
std::vector<ManifestFile>& manifest_files) {
43+
auto manifest_count = view_of_column->length;
44+
// view_of_column is list<struct<PartitionFieldSummary>>
45+
if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) {
46+
return InvalidManifestList("partitions field should be a list.");
47+
}
48+
auto view_of_list_iterm = view_of_column->children[0];
49+
// view_of_list_iterm is struct<PartitionFieldSummary>
50+
if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
51+
return InvalidManifestList("partitions list field should be a list.");
52+
}
53+
if (view_of_list_iterm->n_children != 4) {
54+
return InvalidManifestList("PartitionFieldSummary should have 4 fields.");
55+
}
56+
if (view_of_list_iterm->children[0]->storage_type != ArrowType::NANOARROW_TYPE_BOOL) {
57+
return InvalidManifestList("contains_null should have be bool type column.");
58+
}
59+
auto contains_null = view_of_list_iterm->children[0];
60+
if (view_of_list_iterm->children[1]->storage_type != ArrowType::NANOARROW_TYPE_BOOL) {
61+
return InvalidManifestList("contains_nan should have be bool type column.");
62+
}
63+
auto contains_nan = view_of_list_iterm->children[1];
64+
if (view_of_list_iterm->children[2]->storage_type != ArrowType::NANOARROW_TYPE_BINARY) {
65+
return InvalidManifestList("lower_bound should have be binary type column.");
66+
}
67+
auto lower_bound_list = view_of_list_iterm->children[2];
68+
if (view_of_list_iterm->children[3]->storage_type != ArrowType::NANOARROW_TYPE_BINARY) {
69+
return InvalidManifestList("upper_bound should have be binary type column.");
70+
}
71+
auto upper_bound_list = view_of_list_iterm->children[3];
72+
for (int64_t manifest_idx = 0; manifest_idx < manifest_count; manifest_idx++) {
73+
auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx);
74+
auto next_offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx + 1);
75+
// partitions from offset to next_offset belongs to manifest_idx
76+
auto& manifest_file = manifest_files[manifest_idx];
77+
for (int64_t partition_idx = offset; partition_idx < next_offset; partition_idx++) {
78+
PartitionFieldSummary partition_field_summary;
79+
if (!ArrowArrayViewIsNull(contains_null, partition_idx)) {
80+
partition_field_summary.contains_null =
81+
ArrowArrayViewGetIntUnsafe(contains_null, partition_idx);
82+
}
83+
if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) {
84+
partition_field_summary.contains_nan =
85+
ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
86+
}
87+
if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
88+
auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx);
89+
partition_field_summary.lower_bound = std::vector<uint8_t>(
90+
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
91+
}
92+
if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
93+
auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx);
94+
partition_field_summary.upper_bound = std::vector<uint8_t>(
95+
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
96+
}
97+
98+
manifest_file.partitions.emplace_back(partition_field_summary);
99+
}
100+
}
101+
return {};
102+
}
103+
41104
Result<std::vector<ManifestFile>> ParseManifestListEntry(ArrowSchema* schema,
42105
ArrowArray* array_in,
43106
const Schema& iceberg_schema) {
44107
if (schema->n_children != array_in->n_children) {
45-
return InvalidArgument("Columns size not match between schema:{} and array:{}",
46-
schema->n_children, array_in->n_children);
108+
return InvalidManifestList("Columns size not match between schema:{} and array:{}",
109+
schema->n_children, array_in->n_children);
47110
}
48111
if (iceberg_schema.fields().size() != array_in->n_children) {
49-
return InvalidArgument("Columns size not match between schema:{} and array:{}",
50-
iceberg_schema.fields().size(), array_in->n_children);
112+
return InvalidManifestList("Columns size not match between schema:{} and array:{}",
113+
iceberg_schema.fields().size(), array_in->n_children);
51114
}
52115

53116
ArrowError error;
54117
ArrowArrayView array_view;
55118
auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
56-
ARROW_RETURN_IF_NOT_OK(status, error);
119+
NANOARROW_RETURN_IF_NOT_OK(status, error);
57120
internal::ArrowArrayViewGuard view_guard(&array_view);
58121
status = ArrowArrayViewSetArray(&array_view, array_in, &error);
59-
ARROW_RETURN_IF_NOT_OK(status, error);
122+
NANOARROW_RETURN_IF_NOT_OK(status, error);
60123
status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error);
61-
ARROW_RETURN_IF_NOT_OK(status, error);
124+
NANOARROW_RETURN_IF_NOT_OK(status, error);
62125

63126
std::vector<ManifestFile> manifest_files;
64127
manifest_files.resize(array_in->length);
65128

66129
for (int64_t idx = 0; idx < array_in->n_children; idx++) {
67130
const auto& field = iceberg_schema.GetFieldByIndex(idx);
68131
if (!field.has_value()) {
69-
return InvalidArgument("Field not found in schema: {}", idx);
132+
return InvalidSchema("Field index {} is not found in schema", idx);
70133
}
71134
auto field_name = field.value().get().name();
135+
bool required = !field.value().get().optional();
72136
auto view_of_column = array_view.children[idx];
73137

74-
#define PARSE_PRIMITIVE_FIELD(field_name, type) \
75-
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \
76-
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \
77-
auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \
78-
manifest_files[row_idx].field_name = static_cast<type>(value); \
79-
} \
138+
#define PARSE_PRIMITIVE_FIELD(item, type) \
139+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \
140+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \
141+
auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \
142+
manifest_files[row_idx].item = static_cast<type>(value); \
143+
} else if (required) { \
144+
return InvalidManifestList("Field {} is required but null at row {}", field_name, \
145+
row_idx); \
146+
} \
80147
}
81148

82-
if (field_name == ManifestFile::kManifestPath.name()) {
83-
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
84-
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
85-
auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx);
86-
std::string path_str(value.data, value.size_bytes);
87-
manifest_files[row_idx].manifest_path = path_str;
88-
}
89-
}
90-
} else if (field_name == ManifestFile::kManifestLength.name()) {
91-
PARSE_PRIMITIVE_FIELD(manifest_length, int64_t);
92-
} else if (field_name == ManifestFile::kPartitionSpecId.name()) {
93-
PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t);
94-
} else if (field_name == ManifestFile::kContent.name()) {
95-
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
96-
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
97-
auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx);
98-
manifest_files[row_idx].content = static_cast<ManifestFile::Content>(value);
99-
}
100-
}
101-
} else if (field_name == ManifestFile::kSequenceNumber.name()) {
102-
PARSE_PRIMITIVE_FIELD(sequence_number, int64_t);
103-
} else if (field_name == ManifestFile::kMinSequenceNumber.name()) {
104-
PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t);
105-
} else if (field_name == ManifestFile::kAddedSnapshotId.name()) {
106-
PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t);
107-
} else if (field_name == ManifestFile::kAddedFilesCount.name()) {
108-
PARSE_PRIMITIVE_FIELD(added_files_count, int32_t);
109-
} else if (field_name == ManifestFile::kExistingFilesCount.name()) {
110-
PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t);
111-
} else if (field_name == ManifestFile::kDeletedFilesCount.name()) {
112-
PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t);
113-
} else if (field_name == ManifestFile::kAddedRowsCount.name()) {
114-
PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t);
115-
} else if (field_name == ManifestFile::kExistingRowsCount.name()) {
116-
PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t);
117-
} else if (field_name == ManifestFile::kDeletedRowsCount.name()) {
118-
PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t);
119-
} else if (field_name == ManifestFile::kPartitions.name()) {
120-
// view_of_column is list<struct<PartitionFieldSummary>>
121-
auto manifest_count = view_of_column->length;
122-
if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) {
123-
return InvalidArgument("partitions field should be a list.");
124-
}
125-
auto view_of_list_iterm = view_of_column->children[0];
126-
// view_of_list_iterm is struct<PartitionFieldSummary>
127-
if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
128-
return InvalidArgument("partitions list field should be a list.");
129-
}
130-
if (view_of_list_iterm->n_children != 4) {
131-
return InvalidArgument("PartitionFieldSummary should have 4 fields.");
132-
}
133-
if (view_of_list_iterm->children[0]->storage_type !=
134-
ArrowType::NANOARROW_TYPE_BOOL) {
135-
return InvalidArgument("contains_null should have be bool type column.");
136-
}
137-
auto contains_null = view_of_list_iterm->children[0];
138-
if (view_of_list_iterm->children[1]->storage_type !=
139-
ArrowType::NANOARROW_TYPE_BOOL) {
140-
return InvalidArgument("contains_nan should have be bool type column.");
141-
}
142-
auto contains_nan = view_of_list_iterm->children[1];
143-
if (view_of_list_iterm->children[2]->storage_type !=
144-
ArrowType::NANOARROW_TYPE_BINARY) {
145-
return InvalidArgument("lower_bound should have be binary type column.");
146-
}
147-
auto lower_bound_list = view_of_list_iterm->children[2];
148-
if (view_of_list_iterm->children[3]->storage_type !=
149-
ArrowType::NANOARROW_TYPE_BINARY) {
150-
return InvalidArgument("upper_bound should have be binary type column.");
151-
}
152-
auto upper_bound_list = view_of_list_iterm->children[3];
153-
for (int64_t manifest_idx = 0; manifest_idx < manifest_count; manifest_idx++) {
154-
auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx);
155-
auto next_offset =
156-
ArrowArrayViewListChildOffset(view_of_column, manifest_idx + 1);
157-
// partitions from offset to next_offset belongs to manifest_idx
158-
auto& manifest_file = manifest_files[manifest_idx];
159-
for (int64_t partition_idx = offset; partition_idx < next_offset;
160-
partition_idx++) {
161-
PartitionFieldSummary partition_field_summary;
162-
if (!ArrowArrayViewIsNull(contains_null, partition_idx)) {
163-
partition_field_summary.contains_null =
164-
ArrowArrayViewGetIntUnsafe(contains_null, partition_idx);
165-
}
166-
if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) {
167-
partition_field_summary.contains_nan =
168-
ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
149+
switch (idx) {
150+
case 0:
151+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
152+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
153+
auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx);
154+
std::string path_str(value.data, value.size_bytes);
155+
manifest_files[row_idx].manifest_path = path_str;
169156
}
170-
if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
171-
auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx);
172-
partition_field_summary.lower_bound = std::vector<uint8_t>(
173-
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
157+
}
158+
break;
159+
case 1:
160+
PARSE_PRIMITIVE_FIELD(manifest_length, int64_t);
161+
break;
162+
case 2:
163+
PARSE_PRIMITIVE_FIELD(partition_spec_id, int32_t);
164+
break;
165+
case 3:
166+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
167+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
168+
auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx);
169+
manifest_files[row_idx].content = static_cast<ManifestFile::Content>(value);
174170
}
175-
if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
176-
auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx);
177-
partition_field_summary.upper_bound = std::vector<uint8_t>(
171+
}
172+
break;
173+
case 4:
174+
PARSE_PRIMITIVE_FIELD(sequence_number, int64_t);
175+
break;
176+
case 5:
177+
PARSE_PRIMITIVE_FIELD(min_sequence_number, int64_t);
178+
break;
179+
case 6:
180+
PARSE_PRIMITIVE_FIELD(added_snapshot_id, int64_t);
181+
break;
182+
case 7:
183+
PARSE_PRIMITIVE_FIELD(added_files_count, int32_t);
184+
break;
185+
case 8:
186+
PARSE_PRIMITIVE_FIELD(existing_files_count, int32_t);
187+
break;
188+
case 9:
189+
PARSE_PRIMITIVE_FIELD(deleted_files_count, int32_t);
190+
break;
191+
case 10:
192+
PARSE_PRIMITIVE_FIELD(added_rows_count, int64_t);
193+
break;
194+
case 11:
195+
PARSE_PRIMITIVE_FIELD(existing_rows_count, int64_t);
196+
break;
197+
case 12:
198+
PARSE_PRIMITIVE_FIELD(deleted_rows_count, int64_t);
199+
break;
200+
case 13:
201+
ICEBERG_RETURN_UNEXPECTED(
202+
ParsePartitionFieldSummaryList(view_of_column, manifest_files));
203+
break;
204+
case 14:
205+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
206+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
207+
auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx);
208+
manifest_files[row_idx].key_metadata = std::vector<uint8_t>(
178209
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
179210
}
180-
181-
manifest_file.partitions.emplace_back(partition_field_summary);
182211
}
183-
}
184-
} else if (field_name == ManifestFile::kKeyMetadata.name()) {
185-
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
186-
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
187-
auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_column, row_idx);
188-
manifest_files[row_idx].key_metadata = std::vector<uint8_t>(
189-
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
190-
}
191-
}
192-
} else if (field_name == ManifestFile::kFirstRowId.name()) {
193-
PARSE_PRIMITIVE_FIELD(first_row_id, int64_t);
194-
} else {
195-
return InvalidArgument("Unsupported type: {}", field_name);
212+
break;
213+
case 15:
214+
PARSE_PRIMITIVE_FIELD(first_row_id, int64_t);
215+
break;
216+
default:
217+
return InvalidManifestList("Unsupported type: {}", field_name);
196218
}
197219
}
198-
#undef PARSE_PRIMITIVE_FIELD
199220
return manifest_files;
200221
} // namespace iceberg
201222

@@ -208,20 +229,17 @@ Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
208229
while (true) {
209230
auto result = reader_->Next();
210231
if (!result.has_value()) {
211-
return InvalidArgument("Failed to read manifest list entry:{}",
212-
result.error().message);
232+
return InvalidManifestList("Failed to read manifest list entry:{}",
233+
result.error().message);
213234
}
214235
if (result.value().has_value()) {
215236
internal::ArrowArrayGuard array_guard(&result.value().value());
216-
auto parse_result =
217-
ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_);
218-
if (!parse_result.has_value()) {
219-
return InvalidArgument("Failed to parse manifest list entry:{}",
220-
parse_result.error().message);
221-
}
237+
ICEBERG_ASSIGN_OR_RAISE(
238+
auto entries,
239+
ParseManifestListEntry(&arrow_schema, &result.value().value(), *schema_));
222240
manifest_files.insert(manifest_files.end(),
223-
std::make_move_iterator(parse_result.value().begin()),
224-
std::make_move_iterator(parse_result.value().end()));
241+
std::make_move_iterator(entries.begin()),
242+
std::make_move_iterator(entries.end()));
225243
} else {
226244
// eof
227245
break;

0 commit comments

Comments
 (0)