Skip to content

Commit d4dc829

Browse files
author
xiao.dong
committed
merge same macros and seperate avro type register
1 parent a116bcf commit d4dc829

File tree

10 files changed

+117
-88
lines changed

10 files changed

+117
-88
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE)
108108
avro/avro_data_util.cc
109109
avro/avro_reader.cc
110110
avro/avro_schema_util.cc
111+
avro/avro_register.cc
111112
avro/avro_stream_internal.cc)
112113

113114
# Libraries to link with exported libiceberg_bundle.{so,a}.

src/iceberg/avro/avro_register.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "avro_register.h"
21+
22+
namespace iceberg::avro {
23+
24+
void RegisterLogicalTypes() {
25+
::avro::CustomLogicalTypeRegistry::instance().registerType(
26+
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
27+
}
28+
29+
} // namespace iceberg::avro

src/iceberg/avro/avro_register.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <avro/LogicalType.hh>
23+
24+
namespace iceberg::avro {
25+
26+
struct MapLogicalType : public ::avro::CustomLogicalType {
27+
MapLogicalType() : ::avro::CustomLogicalType("map") {}
28+
};
29+
30+
void RegisterLogicalTypes();
31+
32+
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util.cc

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
#include <string_view>
2323

2424
#include <arrow/type.h>
25-
#include <arrow/util/decimal.h>
2625
#include <avro/CustomAttributes.hh>
2726
#include <avro/LogicalType.hh>
2827
#include <avro/NodeImpl.hh>
2928
#include <avro/Schema.hh>
3029
#include <avro/Types.hh>
3130
#include <avro/ValidSchema.hh>
3231

32+
#include "iceberg/avro/avro_register.h"
3333
#include "iceberg/avro/avro_schema_util_internal.h"
3434
#include "iceberg/metadata_columns.h"
3535
#include "iceberg/schema.h"
@@ -49,10 +49,6 @@ constexpr std::string_view kValueIdProp = "value-id";
4949
constexpr std::string_view kElementIdProp = "element-id";
5050
constexpr std::string_view kAdjustToUtcProp = "adjust-to-utc";
5151

52-
struct MapLogicalType : public ::avro::CustomLogicalType {
53-
MapLogicalType() : ::avro::CustomLogicalType("map") {}
54-
};
55-
5652
::avro::LogicalType GetMapLogicalType() {
5753
static std::once_flag flag{};
5854
std::call_once(flag, []() {
@@ -72,11 +68,6 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
7268

7369
} // namespace
7470

75-
void RegisterLogicalTypes() {
76-
::avro::CustomLogicalTypeRegistry::instance().registerType(
77-
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
78-
}
79-
8071
std::string ToString(const ::avro::NodePtr& node) {
8172
std::stringstream ss;
8273
ss << *node;

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,4 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
144144
/// \return True if the node has a map logical type, false otherwise.
145145
bool HasMapLogicalType(const ::avro::NodePtr& node);
146146

147-
void RegisterLogicalTypes();
148-
149147
} // namespace iceberg::avro

src/iceberg/file_format.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
#include "iceberg/iceberg_export.h"
2929
#include "iceberg/result.h"
30-
#include "iceberg/util/string_utils.h"
30+
#include "iceberg/util/string_utils_internal.h"
3131

3232
namespace iceberg {
3333

src/iceberg/manifest_reader_internal.cc

Lines changed: 47 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,11 @@ namespace iceberg {
4949
} \
5050
}
5151

52-
#define PARSE_ENUM_FIELD(item, array_view, type) \
53-
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
54-
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
55-
auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx); \
56-
item = static_cast<type>(value); \
57-
} else if (required) { \
58-
return InvalidManifestList("Field {} is required but null at row {}", field_name, \
59-
row_idx); \
60-
} \
61-
}
62-
6352
#define PARSE_STRING_FIELD(item, array_view) \
6453
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
6554
if (!ArrowArrayViewIsNull(array_view, row_idx)) { \
6655
auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx); \
67-
std::string path_str(value.data, value.size_bytes); \
68-
item = path_str; \
56+
item = std::string(value.data, value.size_bytes); \
6957
} else if (required) { \
7058
return InvalidManifestList("Field {} is required but null at row {}", field_name, \
7159
row_idx); \
@@ -75,16 +63,14 @@ namespace iceberg {
7563
#define PARSE_BINARY_FIELD(item, array_view) \
7664
for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) { \
7765
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) { \
78-
auto buffer = ArrowArrayViewGetBytesUnsafe(array_view, row_idx); \
79-
item = std::vector<uint8_t>(buffer.data.as_char, \
80-
buffer.data.as_char + buffer.size_bytes); \
66+
item = ArrowArrayViewGetInt8Vector(array_view, row_idx); \
8167
} else if (required) { \
8268
return InvalidManifestList("Field {} is required but null at row {}", field_name, \
8369
row_idx); \
8470
} \
8571
}
8672

87-
#define PARSE_PRIMITIVE_VECTOR_FIELD(item, count, array_view, type) \
73+
#define PARSE_INTEGER_VECTOR_FIELD(item, count, array_view, type) \
8874
for (int64_t manifest_idx = 0; manifest_idx < count; manifest_idx++) { \
8975
auto offset = ArrowArrayViewListChildOffset(array_view, manifest_idx); \
9076
auto next_offset = ArrowArrayViewListChildOffset(array_view, manifest_idx + 1); \
@@ -94,50 +80,36 @@ namespace iceberg {
9480
} \
9581
}
9682

97-
#define PARSE_PRIMITIVE_MAP_FIELD(item, count, array_view) \
83+
#define PARSE_MAP_FIELD(item, count, array_view, key_type, value_type, assignment) \
9884
do { \
9985
if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) { \
10086
return InvalidManifest("Field:{} should be a map.", field_name); \
10187
} \
10288
auto view_of_map = array_view->children[0]; \
10389
ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, ArrowType::NANOARROW_TYPE_STRUCT, 2); \
10490
auto view_of_map_key = view_of_map->children[0]; \
105-
ASSERT_VIEW_TYPE(view_of_map_key, ArrowType::NANOARROW_TYPE_INT32); \
91+
ASSERT_VIEW_TYPE(view_of_map_key, key_type); \
10692
auto view_of_map_value = view_of_map->children[1]; \
107-
ASSERT_VIEW_TYPE(view_of_map_value, ArrowType::NANOARROW_TYPE_INT64); \
93+
ASSERT_VIEW_TYPE(view_of_map_value, value_type); \
10894
for (int64_t row_idx = 0; row_idx < count; row_idx++) { \
10995
auto offset = array_view->buffer_views[1].data.as_int32[row_idx]; \
11096
auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 1]; \
11197
for (int32_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \
11298
auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx); \
113-
auto value = ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx); \
114-
item[key] = value; \
99+
item[key] = assignment; \
115100
} \
116101
} \
117102
} while (0)
118103

119-
#define PARSE_BINARY_MAP_FIELD(item, count, array_view) \
120-
do { \
121-
if (array_view->storage_type != ArrowType::NANOARROW_TYPE_MAP) { \
122-
return InvalidManifest("Field:{} should be a map.", field_name); \
123-
} \
124-
auto view_of_map = array_view->children[0]; \
125-
ASSERT_VIEW_TYPE_AND_CHILDREN(view_of_map, ArrowType::NANOARROW_TYPE_STRUCT, 2); \
126-
auto view_of_map_key = view_of_map->children[0]; \
127-
ASSERT_VIEW_TYPE(view_of_map_key, ArrowType::NANOARROW_TYPE_INT32); \
128-
auto view_of_map_value = view_of_map->children[1]; \
129-
ASSERT_VIEW_TYPE(view_of_map_value, ArrowType::NANOARROW_TYPE_BINARY); \
130-
for (int64_t row_idx = 0; row_idx < count; row_idx++) { \
131-
auto offset = array_view->buffer_views[1].data.as_int32[row_idx]; \
132-
auto next_offset = array_view->buffer_views[1].data.as_int32[row_idx + 1]; \
133-
for (int32_t offset_idx = offset; offset_idx < next_offset; offset_idx++) { \
134-
auto key = ArrowArrayViewGetIntUnsafe(view_of_map_key, offset_idx); \
135-
auto buffer = ArrowArrayViewGetBytesUnsafe(view_of_map_value, offset_idx); \
136-
item[key] = std::vector<uint8_t>(buffer.data.as_char, \
137-
buffer.data.as_char + buffer.size_bytes); \
138-
} \
139-
} \
140-
} while (0)
104+
#define PARSE_INT_LONG_MAP_FIELD(item, count, array_view) \
105+
PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
106+
ArrowType::NANOARROW_TYPE_INT64, \
107+
ArrowArrayViewGetIntUnsafe(view_of_map_value, offset_idx));
108+
109+
#define PARSE_INT_BINARY_MAP_FIELD(item, count, array_view) \
110+
PARSE_MAP_FIELD(item, count, array_view, ArrowType::NANOARROW_TYPE_INT32, \
111+
ArrowType::NANOARROW_TYPE_BINARY, \
112+
ArrowArrayViewGetInt8Vector(view_of_map_value, offset_idx));
141113

142114
#define ASSERT_VIEW_TYPE(view, type) \
143115
if (view->storage_type != type) { \
@@ -153,6 +125,12 @@ namespace iceberg {
153125
field_name, n_child); \
154126
}
155127

128+
std::vector<uint8_t> ArrowArrayViewGetInt8Vector(const ArrowArrayView* view,
129+
int32_t offset_idx) {
130+
auto buffer = ArrowArrayViewGetBytesUnsafe(view, offset_idx);
131+
return {buffer.data.as_char, buffer.data.as_char + buffer.size_bytes};
132+
}
133+
156134
Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column,
157135
std::vector<ManifestFile>& manifest_files) {
158136
auto manifest_count = view_of_column->length;
@@ -202,14 +180,12 @@ Status ParsePartitionFieldSummaryList(ArrowArrayView* view_of_column,
202180
ArrowArrayViewGetIntUnsafe(contains_nan, partition_idx);
203181
}
204182
if (!ArrowArrayViewIsNull(lower_bound_list, partition_idx)) {
205-
auto buffer = ArrowArrayViewGetBytesUnsafe(lower_bound_list, partition_idx);
206-
partition_field_summary.lower_bound = std::vector<uint8_t>(
207-
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
183+
partition_field_summary.lower_bound =
184+
ArrowArrayViewGetInt8Vector(lower_bound_list, partition_idx);
208185
}
209186
if (!ArrowArrayViewIsNull(upper_bound_list, partition_idx)) {
210-
auto buffer = ArrowArrayViewGetBytesUnsafe(upper_bound_list, partition_idx);
211-
partition_field_summary.upper_bound = std::vector<uint8_t>(
212-
buffer.data.as_char, buffer.data.as_char + buffer.size_bytes);
187+
partition_field_summary.upper_bound =
188+
ArrowArrayViewGetInt8Vector(upper_bound_list, partition_idx);
213189
}
214190

215191
manifest_file.partitions.emplace_back(partition_field_summary);
@@ -264,8 +240,8 @@ Result<std::vector<ManifestFile>> ParseManifestList(ArrowSchema* schema,
264240
int32_t);
265241
break;
266242
case 3:
267-
PARSE_ENUM_FIELD(manifest_files[row_idx].content, view_of_column,
268-
ManifestFile::Content);
243+
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
244+
ManifestFile::Content);
269245
break;
270246
case 4:
271247
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, view_of_column,
@@ -373,8 +349,8 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
373349

374350
switch (col_idx) {
375351
case 0:
376-
PARSE_ENUM_FIELD(manifest_entries[row_idx].data_file->content, view_of_file_field,
377-
DataFile::Content);
352+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->content,
353+
view_of_file_field, DataFile::Content);
378354
break;
379355
case 1:
380356
PARSE_STRING_FIELD(manifest_entries[row_idx].data_file->file_path,
@@ -415,42 +391,41 @@ Status ParseDataFile(const std::shared_ptr<StructType>& data_file_schema,
415391
// key&value should have the same offset
416392
// HACK(xiao.dong) workaround for arrow bug:
417393
// ArrowArrayViewListChildOffset can not get the correct offset for map
418-
PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
419-
manifest_entry_count, view_of_file_field);
394+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->column_sizes,
395+
manifest_entry_count, view_of_file_field);
420396
break;
421397
case 7:
422-
PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
423-
manifest_entry_count, view_of_file_field);
398+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->value_counts,
399+
manifest_entry_count, view_of_file_field);
424400
break;
425401
case 8:
426-
PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
427-
manifest_entry_count, view_of_file_field);
402+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->null_value_counts,
403+
manifest_entry_count, view_of_file_field);
428404
break;
429405
case 9:
430-
PARSE_PRIMITIVE_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
431-
manifest_entry_count, view_of_file_field);
406+
PARSE_INT_LONG_MAP_FIELD(manifest_entries[row_idx].data_file->nan_value_counts,
407+
manifest_entry_count, view_of_file_field);
432408
break;
433409
case 10:
434-
PARSE_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
435-
manifest_entry_count, view_of_file_field);
410+
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->lower_bounds,
411+
manifest_entry_count, view_of_file_field);
436412
break;
437413
case 11:
438-
PARSE_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
439-
manifest_entry_count, view_of_file_field);
414+
PARSE_INT_BINARY_MAP_FIELD(manifest_entries[row_idx].data_file->upper_bounds,
415+
manifest_entry_count, view_of_file_field);
440416
break;
441417
case 12:
442418
PARSE_BINARY_FIELD(manifest_entries[row_idx].data_file->key_metadata,
443419
view_of_file_field);
444420
break;
445421
case 13:
446-
PARSE_PRIMITIVE_VECTOR_FIELD(
422+
PARSE_INTEGER_VECTOR_FIELD(
447423
manifest_entries[manifest_idx].data_file->split_offsets, manifest_entry_count,
448424
view_of_file_field, int64_t);
449425
break;
450426
case 14:
451-
PARSE_PRIMITIVE_VECTOR_FIELD(
452-
manifest_entries[manifest_idx].data_file->equality_ids, manifest_entry_count,
453-
view_of_file_field, int32_t);
427+
PARSE_INTEGER_VECTOR_FIELD(manifest_entries[manifest_idx].data_file->equality_ids,
428+
manifest_entry_count, view_of_file_field, int32_t);
454429
break;
455430
case 15:
456431
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].data_file->sort_order_id,
@@ -518,8 +493,8 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(ArrowSchema* schema,
518493

519494
switch (idx) {
520495
case 0:
521-
PARSE_ENUM_FIELD(manifest_entries[row_idx].status, view_of_column,
522-
ManifestStatus);
496+
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].status, view_of_column,
497+
ManifestStatus);
523498
break;
524499
case 1:
525500
PARSE_PRIMITIVE_FIELD(manifest_entries[row_idx].snapshot_id, view_of_column,

src/iceberg/util/string_utils.h renamed to src/iceberg/util/string_utils_internal.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <ranges>
2424
#include <string>
2525

26+
#include "iceberg/iceberg_export.h"
27+
2628
namespace iceberg::internal {
2729

2830
class StringUtils {
@@ -33,7 +35,7 @@ class StringUtils {
3335
std::transform(input.begin(), input.end(), input.begin(), // NOLINT
3436
[](char c) { return std::tolower(c); }); // NOLINT
3537
return input;
36-
}
38+
} // namespace iceberg::internal
3739

3840
static std::string ToUpper(std::string_view str) {
3941
std::string input(str);

test/manifest_reader_test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "iceberg/arrow/arrow_fs_file_io.h"
2626
#include "iceberg/avro/avro_reader.h"
27+
#include "iceberg/avro/avro_register.h"
2728
#include "iceberg/avro/avro_schema_util_internal.h"
2829
#include "iceberg/manifest_entry.h"
2930
#include "iceberg/schema.h"

test/string_utils_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
* under the License.
1818
*/
1919

20-
#include "iceberg/util/string_utils.h"
21-
2220
#include <gtest/gtest.h>
2321

22+
#include "iceberg/util/string_utils_internal.h"
23+
2424
namespace iceberg {
2525

2626
TEST(StringUtilsTest, ToLower) {

0 commit comments

Comments
 (0)