Skip to content

Commit 043701c

Browse files
dongxiao1198xiao.dong
andauthored
feat: support manifest reader (#150)
- add manifest reader - refactor of arrow array parser - add basic case --------- Co-authored-by: xiao.dong <[email protected]>
1 parent ab095b7 commit 043701c

14 files changed

+684
-87
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE)
108108
avro/avro_data_util.cc
109109
avro/avro_reader.cc
110110
avro/avro_schema_util.cc
111+
avro/avro_register.cc
111112
avro/avro_stream_internal.cc)
112113

113114
# Libraries to link with exported libiceberg_bundle.{so,a}.

src/iceberg/avro/avro_register.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_register.h"
21+
22+
#include "iceberg/avro/avro_schema_util_internal.h"
23+
24+
namespace iceberg::avro {
25+
26+
void RegisterLogicalTypes() {
27+
static std::once_flag flag{};
28+
std::call_once(flag, []() {
29+
// Register the map logical type with the avro custom logical type registry.
30+
// See https://github.com/apache/avro/pull/3326 for details.
31+
::avro::CustomLogicalTypeRegistry::instance().registerType(
32+
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
33+
});
34+
}
35+
36+
} // namespace iceberg::avro

src/iceberg/avro/avro_register.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/iceberg_bundle_export.h"
23+
24+
namespace iceberg::avro {
25+
26+
ICEBERG_BUNDLE_EXPORT void RegisterLogicalTypes();
27+
28+
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util.cc

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
#include <string_view>
2323

2424
#include <arrow/type.h>
25-
#include <arrow/util/decimal.h>
2625
#include <avro/CustomAttributes.hh>
2726
#include <avro/LogicalType.hh>
2827
#include <avro/NodeImpl.hh>
2928
#include <avro/Schema.hh>
3029
#include <avro/Types.hh>
3130
#include <avro/ValidSchema.hh>
3231

32+
#include "iceberg/avro/avro_register.h"
3333
#include "iceberg/avro/avro_schema_util_internal.h"
3434
#include "iceberg/metadata_columns.h"
3535
#include "iceberg/schema.h"
@@ -49,18 +49,8 @@ constexpr std::string_view kValueIdProp = "value-id";
4949
constexpr std::string_view kElementIdProp = "element-id";
5050
constexpr std::string_view kAdjustToUtcProp = "adjust-to-utc";
5151

52-
struct MapLogicalType : public ::avro::CustomLogicalType {
53-
MapLogicalType() : ::avro::CustomLogicalType("map") {}
54-
};
55-
5652
::avro::LogicalType GetMapLogicalType() {
57-
static std::once_flag flag{};
58-
std::call_once(flag, []() {
59-
// Register the map logical type with the avro custom logical type registry.
60-
// See https://github.com/apache/avro/pull/3326 for details.
61-
::avro::CustomLogicalTypeRegistry::instance().registerType(
62-
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
63-
});
53+
RegisterLogicalTypes();
6454
return ::avro::LogicalType(std::make_shared<MapLogicalType>());
6555
}
6656

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ class ValidSchema;
3434

3535
namespace iceberg::avro {
3636

37+
struct MapLogicalType : public ::avro::CustomLogicalType {
38+
MapLogicalType() : ::avro::CustomLogicalType("map") {}
39+
};
40+
3741
/// \brief A visitor that converts an Iceberg type to an Avro node.
3842
class ToAvroNodeVisitor {
3943
public:

src/iceberg/file_format.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include "iceberg/iceberg_export.h"
2929
#include "iceberg/result.h"
30+
#include "iceberg/util/string_utils.h"
3031

3132
namespace iceberg {
3233

@@ -54,12 +55,13 @@ ICEBERG_EXPORT inline std::string_view ToString(FileFormatType format_type) {
5455
}
5556

5657
/// \brief Convert a string to a FileFormatType
57-
ICEBERG_EXPORT constexpr Result<FileFormatType> FileFormatTypeFromString(
58+
ICEBERG_EXPORT inline Result<FileFormatType> FileFormatTypeFromString(
5859
std::string_view str) noexcept {
59-
if (str == "parquet") return FileFormatType::kParquet;
60-
if (str == "avro") return FileFormatType::kAvro;
61-
if (str == "orc") return FileFormatType::kOrc;
62-
if (str == "puffin") return FileFormatType::kPuffin;
60+
auto lower = StringUtils::ToLower(str);
61+
if (lower == "parquet") return FileFormatType::kParquet;
62+
if (lower == "avro") return FileFormatType::kAvro;
63+
if (lower == "orc") return FileFormatType::kOrc;
64+
if (lower == "puffin") return FileFormatType::kPuffin;
6365
return InvalidArgument("Invalid file format type: {}", str);
6466
}
6567

src/iceberg/manifest_entry.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition
4040
kContent,
4141
kFilePath,
4242
kFileFormat,
43-
SchemaField::MakeRequired(102, "partition", std::move(partition_type)),
43+
SchemaField::MakeRequired(102, kPartitionField, std::move(partition_type)),
4444
kRecordCount,
4545
kFileSize,
4646
kColumnSizes,
@@ -68,7 +68,7 @@ std::shared_ptr<StructType> ManifestEntry::TypeFromDataFileType(
6868
std::shared_ptr<StructType> datafile_type) {
6969
return std::make_shared<StructType>(std::vector<SchemaField>{
7070
kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber,
71-
SchemaField::MakeRequired(2, "data_file", std::move(datafile_type))});
71+
SchemaField::MakeRequired(2, kDataFileField, std::move(datafile_type))});
7272
}
7373

7474
} // namespace iceberg

src/iceberg/manifest_entry.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ struct ICEBERG_EXPORT DataFile {
182182
inline static const SchemaField kFilePath = SchemaField::MakeRequired(
183183
100, "file_path", iceberg::string(), "Location URI with FS scheme");
184184
inline static const SchemaField kFileFormat = SchemaField::MakeRequired(
185-
101, "file_format", iceberg::int32(), "File format name: avro, orc, or parquet");
185+
101, "file_format", iceberg::string(), "File format name: avro, orc, or parquet");
186+
inline static const std::string kPartitionField = "partition";
186187
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
187188
103, "record_count", iceberg::int64(), "Number of records in the file");
188189
inline static const SchemaField kFileSize = SchemaField::MakeRequired(
@@ -299,6 +300,7 @@ struct ICEBERG_EXPORT ManifestEntry {
299300
SchemaField::MakeOptional(3, "sequence_number", iceberg::int64());
300301
inline static const SchemaField kFileSequenceNumber =
301302
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
303+
inline static const std::string kDataFileField = "data_file";
302304

303305
bool operator==(const ManifestEntry& other) const;
304306

0 commit comments

Comments
 (0)