Skip to content

Commit 84565b5

Browse files
dongxiao1198xiao.dong
andauthored
feat: add manifest list reader (#143)
- Add manifest list reader - Integrate with avro reader - Add simple ut --------- Co-authored-by: xiao.dong <[email protected]>
1 parent b07c0f9 commit 84565b5

14 files changed

+690
-35
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ set(ICEBERG_SOURCES
4444
transform.cc
4545
transform_function.cc
4646
type.cc
47+
manifest_reader.cc
48+
manifest_reader_internal.cc
49+
arrow_c_data_guard_internal.cc
4750
util/murmurhash3_internal.cc
4851
util/timepoint.cc
4952
util/gzip_internal.cc)

src/iceberg/arrow_c_data.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,45 @@ struct ArrowArray {
7373

7474
#endif // ARROW_C_DATA_INTERFACE
7575

76+
#ifndef ARROW_C_STREAM_INTERFACE
77+
# define ARROW_C_STREAM_INTERFACE
78+
79+
struct ArrowArrayStream {
80+
// Callback to get the stream type
81+
// (will be the same for all arrays in the stream).
82+
//
83+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
84+
//
85+
// If successful, the ArrowSchema must be released independently from the stream.
86+
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
87+
88+
// Callback to get the next array
89+
// (if no error and the array is released, the stream has ended)
90+
//
91+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
92+
//
93+
// If successful, the ArrowArray must be released independently from the stream.
94+
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
95+
96+
// Callback to get optional detailed error information.
97+
// This must only be called if the last stream operation failed
98+
// with a non-0 return code.
99+
//
100+
// Return value: pointer to a null-terminated character array describing
101+
// the last error, or NULL if no description is available.
102+
//
103+
// The returned pointer is only valid until the next operation on this stream
104+
// (including release).
105+
const char* (*get_last_error)(struct ArrowArrayStream*);
106+
107+
// Release callback: release the stream's own resources.
108+
// Note that arrays returned by `get_next` must be individually released.
109+
void (*release)(struct ArrowArrayStream*);
110+
111+
// Opaque producer-specific data
112+
void* private_data;
113+
};
114+
115+
#endif // ARROW_C_STREAM_INTERFACE
116+
76117
} // extern "C"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow_c_data_guard_internal.h"
21+
22+
namespace iceberg::internal {
23+
24+
ArrowArrayGuard::~ArrowArrayGuard() {
25+
if (array_ != nullptr) {
26+
ArrowArrayRelease(array_);
27+
}
28+
}
29+
30+
ArrowSchemaGuard::~ArrowSchemaGuard() {
31+
if (schema_ != nullptr) {
32+
ArrowSchemaRelease(schema_);
33+
}
34+
}
35+
36+
ArrowArrayViewGuard::~ArrowArrayViewGuard() {
37+
if (view_ != nullptr) {
38+
ArrowArrayViewReset(view_);
39+
}
40+
}
41+
42+
ArrowArrayBufferGuard::~ArrowArrayBufferGuard() {
43+
if (buffer_ != nullptr) {
44+
ArrowBufferReset(buffer_);
45+
}
46+
}
47+
48+
} // namespace iceberg::internal
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <nanoarrow/nanoarrow.h>
23+
24+
#include "iceberg/arrow_c_data.h"
25+
26+
namespace iceberg::internal {
27+
28+
class ArrowArrayGuard {
29+
public:
30+
explicit ArrowArrayGuard(ArrowArray* array) : array_(array) {}
31+
~ArrowArrayGuard();
32+
33+
private:
34+
ArrowArray* array_;
35+
};
36+
37+
class ArrowSchemaGuard {
38+
public:
39+
explicit ArrowSchemaGuard(ArrowSchema* schema) : schema_(schema) {}
40+
~ArrowSchemaGuard();
41+
42+
private:
43+
ArrowSchema* schema_;
44+
};
45+
46+
class ArrowArrayViewGuard {
47+
public:
48+
explicit ArrowArrayViewGuard(ArrowArrayView* view) : view_(view) {}
49+
~ArrowArrayViewGuard();
50+
51+
private:
52+
ArrowArrayView* view_;
53+
};
54+
55+
class ArrowArrayBufferGuard {
56+
public:
57+
explicit ArrowArrayBufferGuard(ArrowBuffer* buffer) : buffer_(buffer) {}
58+
~ArrowArrayBufferGuard();
59+
60+
private:
61+
ArrowBuffer* buffer_;
62+
};
63+
64+
} // namespace iceberg::internal

src/iceberg/manifest_list.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
#include "iceberg/manifest_list.h"
2121

22-
#include <vector>
23-
24-
#include "iceberg/type.h"
22+
#include "iceberg/schema.h"
2523

2624
namespace iceberg {
2725

src/iceberg/manifest_reader.cc

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest_reader.h"
21+
22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
24+
#include "iceberg/manifest_reader_internal.h"
25+
#include "iceberg/schema.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
Result<std::unique_ptr<ManifestReader>> ManifestReader::MakeReader(
31+
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
32+
std::shared_ptr<Schema> partition_schema) {
33+
auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema);
34+
auto fields_span = manifest_entry_schema->fields();
35+
std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
36+
auto schema = std::make_shared<Schema>(fields);
37+
ICEBERG_ASSIGN_OR_RAISE(
38+
auto reader, ReaderFactoryRegistry::Open(FileFormatType::kAvro,
39+
{.path = std::string(manifest_location),
40+
.io = std::move(file_io),
41+
.projection = schema}));
42+
return std::make_unique<ManifestReaderImpl>(std::move(reader), std::move(schema));
43+
}
44+
45+
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::MakeReader(
46+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
47+
std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
48+
ManifestFile::Type().fields().end());
49+
auto schema = std::make_shared<Schema>(fields);
50+
ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open(
51+
FileFormatType::kAvro,
52+
{.path = std::string(manifest_list_location),
53+
.io = std::move(file_io),
54+
.projection = schema}));
55+
return std::make_unique<ManifestListReaderImpl>(std::move(reader), std::move(schema));
56+
}
57+
58+
} // namespace iceberg

src/iceberg/manifest_reader.h

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
/// Data reader interface for manifest files.
2424

2525
#include <memory>
26-
#include <span>
26+
#include <vector>
2727

2828
#include "iceberg/file_reader.h"
2929
#include "iceberg/iceberg_export.h"
@@ -35,35 +35,29 @@ namespace iceberg {
3535
class ICEBERG_EXPORT ManifestReader {
3636
public:
3737
virtual ~ManifestReader() = default;
38-
virtual Result<std::span<std::unique_ptr<ManifestEntry>>> Entries() const = 0;
39-
40-
private:
41-
std::unique_ptr<Reader> reader_;
38+
virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
39+
40+
/// \brief Creates a reader for a manifest file.
41+
/// \param manifest_location Path to the manifest file.
42+
/// \param file_io File IO implementation to use.
43+
/// \return A Result containing the reader or an error.
44+
static Result<std::unique_ptr<ManifestReader>> MakeReader(
45+
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
46+
std::shared_ptr<Schema> partition_schema);
4247
};
4348

4449
/// \brief Read manifest files from a manifest list file.
4550
class ICEBERG_EXPORT ManifestListReader {
4651
public:
4752
virtual ~ManifestListReader() = default;
48-
virtual Result<std::span<std::unique_ptr<ManifestFile>>> Files() const = 0;
49-
50-
private:
51-
std::unique_ptr<Reader> reader_;
53+
virtual Result<std::vector<ManifestFile>> Files() const = 0;
54+
55+
/// \brief Creates a reader for the manifest list.
56+
/// \param manifest_list_location Path to the manifest list file.
57+
/// \param file_io File IO implementation to use.
58+
/// \return A Result containing the reader or an error.
59+
static Result<std::unique_ptr<ManifestListReader>> MakeReader(
60+
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io);
5261
};
5362

54-
/// \brief Creates a reader for the manifest list.
55-
/// \param file_path Path to the manifest list file.
56-
/// \return A Result containing the reader or an error.
57-
Result<std::unique_ptr<ManifestListReader>> CreateManifestListReader(
58-
std::string_view file_path) {
59-
return NotImplemented("CreateManifestListReader is not implemented yet.");
60-
}
61-
62-
/// \brief Creates a reader for a manifest file.
63-
/// \param file_path Path to the manifest file.
64-
/// \return A Result containing the reader or an error.
65-
Result<std::unique_ptr<ManifestReader>> CreateManifestReader(std::string_view file_path) {
66-
return NotImplemented("CreateManifestReader is not implemented yet.");
67-
}
68-
6963
} // namespace iceberg

0 commit comments

Comments
 (0)