Skip to content

Commit 5201ade

Browse files
author
xiao.dong
committed
feat: add manifest list reader
- Add manifest list reader - Integrate with avro reader - Add simple ut
1 parent 0779a52 commit 5201ade

10 files changed

+519
-12
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ set(ICEBERG_SOURCES
4343
transform.cc
4444
transform_function.cc
4545
type.cc
46+
manifest_reader.cc
47+
manifest_reader_internal.cc
4648
util/murmurhash3_internal.cc
4749
util/timepoint.cc
4850
util/unreachable.cc

src/iceberg/arrow_c_data.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,45 @@ struct ArrowArray {
7373

7474
#endif // ARROW_C_DATA_INTERFACE
7575

76+
#ifndef ARROW_C_STREAM_INTERFACE
77+
# define ARROW_C_STREAM_INTERFACE
78+
79+
struct ArrowArrayStream {
80+
// Callback to get the stream type
81+
// (will be the same for all arrays in the stream).
82+
//
83+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
84+
//
85+
// If successful, the ArrowSchema must be released independently from the stream.
86+
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
87+
88+
// Callback to get the next array
89+
// (if no error and the array is released, the stream has ended)
90+
//
91+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
92+
//
93+
// If successful, the ArrowArray must be released independently from the stream.
94+
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
95+
96+
// Callback to get optional detailed error information.
97+
// This must only be called if the last stream operation failed
98+
// with a non-0 return code.
99+
//
100+
// Return value: pointer to a null-terminated character array describing
101+
// the last error, or NULL if no description is available.
102+
//
103+
// The returned pointer is only valid until the next operation on this stream
104+
// (including release).
105+
const char* (*get_last_error)(struct ArrowArrayStream*);
106+
107+
// Release callback: release the stream's own resources.
108+
// Note that arrays returned by `get_next` must be individually released.
109+
void (*release)(struct ArrowArrayStream*);
110+
111+
// Opaque producer-specific data
112+
void* private_data;
113+
};
114+
115+
#endif // ARROW_C_STREAM_INTERFACE
116+
76117
} // extern "C"

src/iceberg/manifest_list.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
#include "iceberg/manifest_list.h"
2121

22-
#include <vector>
23-
24-
#include "iceberg/type.h"
22+
#include "iceberg/schema.h"
2523

2624
namespace iceberg {
2725

src/iceberg/manifest_reader.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest_reader.h"
21+
22+
#include "iceberg/manifest_reader_internal.h"
23+
24+
namespace iceberg {
25+
26+
std::shared_ptr<ManifestReader> ManifestReader::NewReader(
27+
std::unique_ptr<Reader> reader) {
28+
return std::make_shared<ManifestReaderImpl>(std::move(reader));
29+
}
30+
31+
std::shared_ptr<ManifestListReader> ManifestListReader::NewReader(
32+
std::unique_ptr<Reader> reader) {
33+
return std::make_shared<ManifestListReaderImpl>(std::move(reader));
34+
}
35+
36+
} // namespace iceberg

src/iceberg/manifest_reader.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
/// Data reader interface for manifest files.
2424

2525
#include <memory>
26-
#include <span>
26+
#include <vector>
2727

2828
#include "iceberg/file_reader.h"
2929
#include "iceberg/iceberg_export.h"
@@ -34,19 +34,17 @@ namespace iceberg {
3434
/// \brief Read manifest entries from a manifest file.
3535
class ICEBERG_EXPORT ManifestReader {
3636
public:
37-
virtual Result<std::span<std::unique_ptr<ManifestEntry>>> Entries() const = 0;
37+
virtual Result<std::vector<std::unique_ptr<ManifestEntry>>> Entries() const = 0;
3838

39-
private:
40-
std::unique_ptr<Reader> reader_;
39+
static std::shared_ptr<ManifestReader> NewReader(std::unique_ptr<Reader> reader);
4140
};
4241

4342
/// \brief Read manifest files from a manifest list file.
4443
class ICEBERG_EXPORT ManifestListReader {
4544
public:
46-
virtual Result<std::span<std::unique_ptr<ManifestFile>>> Files() const = 0;
45+
virtual Result<std::vector<std::unique_ptr<ManifestFile>>> Files() const = 0;
4746

48-
private:
49-
std::unique_ptr<Reader> reader_;
47+
static std::shared_ptr<ManifestListReader> NewReader(std::unique_ptr<Reader> reader);
5048
};
5149

5250
} // namespace iceberg
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "manifest_reader_internal.h"
21+
22+
#include <array>
23+
24+
#include <nanoarrow/nanoarrow.h>
25+
26+
#include "iceberg/manifest_entry.h"
27+
#include "iceberg/manifest_list.h"
28+
#include "iceberg/schema.h"
29+
#include "iceberg/schema_internal.h"
30+
#include "iceberg/type.h"
31+
32+
namespace iceberg {
33+
34+
#define ARROW_RETURN_IF_NOT_OK(status, error) \
35+
if (status != NANOARROW_OK) { \
36+
return InvalidArrowData("NanoArrow error: {}", error.message); \
37+
}
38+
39+
Result<std::vector<std::unique_ptr<ManifestFile>>> ParseManifestListEntry(
40+
ArrowSchema* schema, ArrowArray* array_in, const Schema& iceberg_schema) {
41+
if (schema->n_children != array_in->n_children) {
42+
return InvalidArgument("Columns size not match between schema:{} and array:{}",
43+
schema->n_children, array_in->n_children);
44+
}
45+
if (iceberg_schema.fields().size() != array_in->n_children) {
46+
return InvalidArgument("Columns size not match between schema:{} and array:{}",
47+
iceberg_schema.fields().size(), array_in->n_children);
48+
}
49+
50+
ArrowError error;
51+
ArrowArrayView array_view;
52+
auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
53+
ARROW_RETURN_IF_NOT_OK(status, error);
54+
status = ArrowArrayViewSetArray(&array_view, array_in, &error);
55+
ARROW_RETURN_IF_NOT_OK(status, error);
56+
status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error);
57+
ARROW_RETURN_IF_NOT_OK(status, error);
58+
59+
std::vector<std::unique_ptr<ManifestFile>> manifest_files;
60+
manifest_files.resize(array_in->length);
61+
for (auto& manifest_file : manifest_files) {
62+
manifest_file = std::make_unique<ManifestFile>();
63+
}
64+
65+
for (int64_t idx = 0; idx < array_in->n_children; idx++) {
66+
const auto& field = iceberg_schema.GetFieldByIndex(idx);
67+
if (!field.has_value()) {
68+
ArrowArrayRelease(array_in);
69+
ArrowArrayViewReset(&array_view);
70+
return InvalidArgument("Field not found in schema: {}", idx);
71+
}
72+
auto field_name = field.value().get().name();
73+
auto view_of_column = array_view.children[idx];
74+
75+
#define PARSE_PRIMITIVE_FIELD(field_name) \
76+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) { \
77+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \
78+
auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx); \
79+
manifest_files[row_idx]->field_name = value; \
80+
} \
81+
}
82+
83+
if (field_name == ManifestFile::kManifestPath.name()) {
84+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
85+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
86+
auto value = ArrowArrayViewGetStringUnsafe(view_of_column, row_idx);
87+
std::string path_str(value.data, value.size_bytes);
88+
manifest_files[row_idx]->manifest_path = path_str;
89+
}
90+
}
91+
} else if (field_name == ManifestFile::kManifestLength.name()) {
92+
PARSE_PRIMITIVE_FIELD(manifest_length);
93+
} else if (field_name == ManifestFile::kPartitionSpecId.name()) {
94+
PARSE_PRIMITIVE_FIELD(partition_spec_id);
95+
} else if (field_name == ManifestFile::kContent.name()) {
96+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
97+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
98+
auto value = ArrowArrayViewGetIntUnsafe(view_of_column, row_idx);
99+
manifest_files[row_idx]->content = static_cast<ManifestFile::Content>(value);
100+
}
101+
}
102+
} else if (field_name == ManifestFile::kSequenceNumber.name()) {
103+
PARSE_PRIMITIVE_FIELD(sequence_number);
104+
} else if (field_name == ManifestFile::kMinSequenceNumber.name()) {
105+
PARSE_PRIMITIVE_FIELD(min_sequence_number);
106+
} else if (field_name == ManifestFile::kAddedSnapshotId.name()) {
107+
PARSE_PRIMITIVE_FIELD(added_snapshot_id);
108+
} else if (field_name == ManifestFile::kAddedFilesCount.name()) {
109+
PARSE_PRIMITIVE_FIELD(added_files_count);
110+
} else if (field_name == ManifestFile::kExistingFilesCount.name()) {
111+
PARSE_PRIMITIVE_FIELD(existing_files_count);
112+
} else if (field_name == ManifestFile::kDeletedFilesCount.name()) {
113+
PARSE_PRIMITIVE_FIELD(deleted_files_count);
114+
} else if (field_name == ManifestFile::kAddedRowsCount.name()) {
115+
PARSE_PRIMITIVE_FIELD(added_rows_count);
116+
} else if (field_name == ManifestFile::kExistingRowsCount.name()) {
117+
PARSE_PRIMITIVE_FIELD(existing_rows_count);
118+
} else if (field_name == ManifestFile::kDeletedRowsCount.name()) {
119+
PARSE_PRIMITIVE_FIELD(deleted_rows_count);
120+
} else if (field_name == ManifestFile::kPartitions.name()) {
121+
// view_of_column is list<struct<PartitionFieldSummary>>
122+
auto manifest_count = view_of_column->length;
123+
if (view_of_column->storage_type != ArrowType::NANOARROW_TYPE_LIST) {
124+
return InvalidArgument("partitions field should be a list.");
125+
}
126+
auto view_of_list_iterm = view_of_column->children[0];
127+
// view_of_list_iterm is struct<PartitionFieldSummary>
128+
if (view_of_list_iterm->storage_type != ArrowType::NANOARROW_TYPE_STRUCT) {
129+
return InvalidArgument("partitions list field should be a list.");
130+
}
131+
if (view_of_list_iterm->n_children != 4) {
132+
return InvalidArgument("PartitionFieldSummary should have 4 fields.");
133+
}
134+
if (view_of_list_iterm->children[0]->storage_type !=
135+
ArrowType::NANOARROW_TYPE_BOOL) {
136+
return InvalidArgument("contains_null should have be bool type column.");
137+
}
138+
auto contains_null = view_of_list_iterm->children[0];
139+
if (view_of_list_iterm->children[1]->storage_type !=
140+
ArrowType::NANOARROW_TYPE_BOOL) {
141+
return InvalidArgument("contains_nan should have be bool type column.");
142+
}
143+
auto contains_nan = view_of_list_iterm->children[1];
144+
if (view_of_list_iterm->children[2]->storage_type !=
145+
ArrowType::NANOARROW_TYPE_BINARY) {
146+
return InvalidArgument("lower_bound should have be binary type column.");
147+
}
148+
auto lower_bound_list = view_of_list_iterm->children[2];
149+
if (view_of_list_iterm->children[3]->storage_type !=
150+
ArrowType::NANOARROW_TYPE_BINARY) {
151+
return InvalidArgument("upper_bound should have be binary type column.");
152+
}
153+
auto upper_bound_list = view_of_list_iterm->children[3];
154+
for (int64_t manifest_idx = 0; manifest_idx < manifest_count; manifest_idx++) {
155+
auto offset = ArrowArrayViewListChildOffset(view_of_column, manifest_idx);
156+
auto next_offset =
157+
ArrowArrayViewListChildOffset(view_of_column, manifest_idx + 1);
158+
// partitions from offset to next_offset belongs to manifest_idx
159+
auto& manifest_file = manifest_files[manifest_idx];
160+
for (int64_t partition_idx = offset; partition_idx < next_offset;
161+
partition_idx++) {
162+
PartitionFieldSummary partition_field_summary;
163+
if (!ArrowArrayViewIsNull(contains_null, partition_idx)) {
164+
partition_field_summary.contains_null =
165+
ArrowArrayViewGetIntUnsafe(contains_null, partition_idx);
166+
}
167+
if (!ArrowArrayViewIsNull(contains_nan, partition_idx)) {
168+
partition_field_summary.contains_nan =
169+
ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
170+
}
171+
if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
172+
auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx);
173+
partition_field_summary.lower_bound = std::vector<uint8_t>(
174+
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
175+
}
176+
if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
177+
auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx);
178+
partition_field_summary.upper_bound = std::vector<uint8_t>(
179+
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
180+
}
181+
182+
manifest_file->partitions.emplace_back(partition_field_summary);
183+
}
184+
}
185+
} else if (field_name == ManifestFile::kKeyMetadata.name()) {
186+
for (size_t row_idx = 0; row_idx < view_of_column->length; row_idx++) {
187+
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
188+
auto value = ArrowArrayViewGetUIntUnsafe(view_of_column, row_idx);
189+
manifest_files[row_idx]->key_metadata.push_back(value);
190+
}
191+
}
192+
} else if (field_name == ManifestFile::kFirstRowId.name()) {
193+
PARSE_PRIMITIVE_FIELD(first_row_id);
194+
} else {
195+
return InvalidArgument("Unsupported type: {}", field_name);
196+
}
197+
}
198+
#undef PARSE_PRIMITIVE_FIELD
199+
ArrowArrayRelease(array_in);
200+
ArrowArrayViewReset(&array_view);
201+
return manifest_files;
202+
} // namespace iceberg
203+
204+
Result<std::vector<std::unique_ptr<ManifestEntry>>> ManifestReaderImpl::Entries() const {
205+
return {};
206+
}
207+
208+
Result<std::vector<std::unique_ptr<ManifestFile>>> ManifestListReaderImpl::Files() const {
209+
std::vector<std::unique_ptr<ManifestFile>> manifest_files;
210+
auto arrow_schema = reader_->Schema();
211+
if (!arrow_schema.has_value()) {
212+
return InvalidArgument("Get schema failed in reader:{}",
213+
arrow_schema.error().message);
214+
}
215+
auto schema = FromArrowSchema(arrow_schema.value(), std::nullopt);
216+
if (!schema.has_value()) {
217+
return InvalidArgument("Parse iceberg schema failed:{}", schema.error().message);
218+
}
219+
while (true) {
220+
auto result = reader_->Next();
221+
if (!result.has_value()) {
222+
return InvalidArgument("Failed to read manifest list entry:{}",
223+
result.error().message);
224+
}
225+
if (result.value().has_value()) {
226+
auto parse_result = ParseManifestListEntry(
227+
&arrow_schema.value(), &result.value().value(), *schema.value());
228+
if (!parse_result.has_value()) {
229+
return InvalidArgument("Failed to parse manifest list entry:{}",
230+
parse_result.error().message);
231+
}
232+
manifest_files.insert(manifest_files.end(),
233+
std::make_move_iterator(parse_result.value().begin()),
234+
std::make_move_iterator(parse_result.value().end()));
235+
} else {
236+
break;
237+
}
238+
}
239+
return manifest_files;
240+
}
241+
242+
} // namespace iceberg

0 commit comments

Comments
 (0)