Skip to content

Commit faf9cc8

Browse files
authored
feat: convert iceberg schema to arrow schema (#53)
1 parent 829dc65 commit faf9cc8

File tree

10 files changed

+676
-9
lines changed

10 files changed

+676
-9
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ set(ICEBERG_ARROW_INSTALL_INTERFACE_LIBS)
2323
# ----------------------------------------------------------------------
2424
# Versions and URLs for toolchain builds
2525

26-
set(ICEBERG_ARROW_BUILD_VERSION "18.1.0")
26+
set(ICEBERG_ARROW_BUILD_VERSION "19.0.1")
2727
set(ICEBERG_ARROW_BUILD_SHA256_CHECKSUM
28-
"2dc8da5f8796afe213ecc5e5aba85bb82d91520eff3cf315784a52d0fa61d7fc")
28+
"acb76266e8b0c2fbb7eb15d542fbb462a73b3fd1e32b80fad6c2fafd95a51160")
2929

3030
if(DEFINED ENV{ICEBERG_ARROW_URL})
3131
set(ARROW_SOURCE_URL "$ENV{ICEBERG_ARROW_URL}")

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ set(ICEBERG_SOURCES
2222
demo.cc
2323
schema.cc
2424
schema_field.cc
25+
schema_internal.cc
2526
type.cc)
2627

2728
set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/arrow_c_data.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929

3030
#include <cstdint>
3131

32+
extern "C" {
33+
3234
#ifndef ARROW_C_DATA_INTERFACE
3335
# define ARROW_C_DATA_INTERFACE
3436

35-
extern "C" {
37+
# define ARROW_FLAG_DICTIONARY_ORDERED 1
38+
# define ARROW_FLAG_NULLABLE 2
39+
# define ARROW_FLAG_MAP_KEYS_SORTED 4
40+
3641
struct ArrowSchema {
3742
// Array type description
3843
const char* format;
@@ -66,6 +71,6 @@ struct ArrowArray {
6671
void* private_data;
6772
};
6873

69-
} // extern "C"
70-
7174
#endif // ARROW_C_DATA_INTERFACE
75+
76+
} // extern "C"

src/iceberg/error.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ enum class ErrorKind {
3232
kAlreadyExists,
3333
kNoSuchTable,
3434
kCommitStateUnknown,
35+
kInvalidSchema,
36+
kInvalidArgument,
3537
};
3638

3739
/// \brief Error with a kind and a message.

src/iceberg/schema_field.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
6464
/// \brief Get whether the field is optional.
6565
[[nodiscard]] bool optional() const;
6666

67-
[[nodiscard]] std::string ToString() const;
67+
[[nodiscard]] std::string ToString() const override;
6868

6969
friend bool operator==(const SchemaField& lhs, const SchemaField& rhs) {
7070
return lhs.Equals(rhs);

src/iceberg/schema_internal.cc

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/schema_internal.h"
21+
22+
#include <format>
23+
#include <optional>
24+
#include <string>
25+
26+
#include "iceberg/expected.h"
27+
#include "iceberg/schema.h"
28+
29+
namespace iceberg {
30+
31+
namespace {
32+
33+
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
34+
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
35+
36+
// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
37+
ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view name,
38+
std::optional<int32_t> field_id, ArrowSchema* schema) {
39+
ArrowBuffer metadata_buffer;
40+
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr));
41+
if (field_id.has_value()) {
42+
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend(
43+
&metadata_buffer, ArrowCharView(std::string(kFieldIdKey).c_str()),
44+
ArrowCharView(std::to_string(field_id.value()).c_str())));
45+
}
46+
47+
switch (type.type_id()) {
48+
case TypeId::kStruct: {
49+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_STRUCT));
50+
51+
const auto& struct_type = static_cast<const StructType&>(type);
52+
const auto& fields = struct_type.fields();
53+
NANOARROW_RETURN_NOT_OK(ArrowSchemaAllocateChildren(schema, fields.size()));
54+
55+
for (size_t i = 0; i < fields.size(); i++) {
56+
const auto& field = fields[i];
57+
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*field.type(), field.optional(),
58+
field.name(), field.field_id(),
59+
schema->children[i]));
60+
}
61+
} break;
62+
case TypeId::kList: {
63+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_LIST));
64+
65+
const auto& list_type = static_cast<const ListType&>(type);
66+
const auto& elem_field = list_type.fields()[0];
67+
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*elem_field.type(), elem_field.optional(),
68+
elem_field.name(), elem_field.field_id(),
69+
schema->children[0]));
70+
} break;
71+
case TypeId::kMap: {
72+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_MAP));
73+
74+
const auto& map_type = static_cast<const MapType&>(type);
75+
const auto& key_field = map_type.key();
76+
const auto& value_field = map_type.value();
77+
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*key_field.type(), key_field.optional(),
78+
key_field.name(), key_field.field_id(),
79+
schema->children[0]->children[0]));
80+
NANOARROW_RETURN_NOT_OK(ToArrowSchema(*value_field.type(), value_field.optional(),
81+
value_field.name(), value_field.field_id(),
82+
schema->children[0]->children[1]));
83+
} break;
84+
case TypeId::kBoolean:
85+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_BOOL));
86+
break;
87+
case TypeId::kInt:
88+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_INT32));
89+
break;
90+
case TypeId::kLong:
91+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_INT64));
92+
break;
93+
case TypeId::kFloat:
94+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_FLOAT));
95+
break;
96+
case TypeId::kDouble:
97+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_DOUBLE));
98+
break;
99+
case TypeId::kDecimal: {
100+
ArrowSchemaInit(schema);
101+
const auto& decimal_type = static_cast<const DecimalType&>(type);
102+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDecimal(schema, NANOARROW_TYPE_DECIMAL128,
103+
decimal_type.precision(),
104+
decimal_type.scale()));
105+
} break;
106+
case TypeId::kDate:
107+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_DATE32));
108+
break;
109+
case TypeId::kTime: {
110+
ArrowSchemaInit(schema);
111+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIME64,
112+
NANOARROW_TIME_UNIT_MICRO,
113+
/*timezone=*/nullptr));
114+
} break;
115+
case TypeId::kTimestamp: {
116+
ArrowSchemaInit(schema);
117+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIMESTAMP,
118+
NANOARROW_TIME_UNIT_MICRO,
119+
/*timezone=*/nullptr));
120+
} break;
121+
case TypeId::kTimestampTz: {
122+
ArrowSchemaInit(schema);
123+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(
124+
schema, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO, "UTC"));
125+
} break;
126+
case TypeId::kString:
127+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_STRING));
128+
break;
129+
case TypeId::kBinary:
130+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_BINARY));
131+
break;
132+
case TypeId::kFixed: {
133+
ArrowSchemaInit(schema);
134+
const auto& fixed_type = static_cast<const FixedType&>(type);
135+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
136+
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_type.length()));
137+
} break;
138+
case TypeId::kUuid: {
139+
ArrowSchemaInit(schema);
140+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
141+
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
142+
NANOARROW_RETURN_NOT_OK(
143+
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
144+
ArrowCharView("arrow.uuid")));
145+
} break;
146+
}
147+
148+
if (!name.empty()) {
149+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(schema, std::string(name).c_str()));
150+
}
151+
152+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(
153+
schema, reinterpret_cast<const char*>(metadata_buffer.data)));
154+
ArrowBufferReset(&metadata_buffer);
155+
156+
if (optional) {
157+
schema->flags |= ARROW_FLAG_NULLABLE;
158+
} else {
159+
schema->flags &= ~ARROW_FLAG_NULLABLE;
160+
}
161+
162+
return NANOARROW_OK;
163+
}
164+
165+
} // namespace
166+
167+
expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out) {
168+
if (out == nullptr) [[unlikely]] {
169+
return unexpected<Error>{{.kind = ErrorKind::kInvalidArgument,
170+
.message = "Output Arrow schema cannot be null"}};
171+
}
172+
173+
if (ArrowErrorCode errorCode = ToArrowSchema(schema, /*optional=*/false, /*name=*/"",
174+
/*field_id=*/std::nullopt, out);
175+
errorCode != NANOARROW_OK) {
176+
return unexpected<Error>{
177+
{.kind = ErrorKind::kInvalidSchema,
178+
.message = std::format(
179+
"Failed to convert Iceberg schema to Arrow schema, error code: {}",
180+
errorCode)}};
181+
}
182+
183+
return {};
184+
}
185+
186+
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
187+
int32_t schema_id) {
188+
// TODO(wgtmac): Implement this
189+
return unexpected<Error>{
190+
{.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}};
191+
}
192+
193+
} // namespace iceberg

src/iceberg/schema_internal.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
24+
#include <nanoarrow/nanoarrow.h>
25+
26+
#include "iceberg/error.h"
27+
#include "iceberg/expected.h"
28+
#include "iceberg/type_fwd.h"
29+
30+
namespace iceberg {
31+
32+
// Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet.
33+
// Here we follow a similar convention for Iceberg but we might also add
34+
// "PARQUET:field_id" in the future once we implement a Parquet writer.
35+
constexpr std::string_view kFieldIdKey = "ICEBERG:field_id";
36+
37+
/// \brief Convert an Iceberg schema to an Arrow schema.
38+
///
39+
/// \param[in] schema The Iceberg schema to convert.
40+
/// \param[out] out The Arrow schema to convert to.
41+
/// \return An error if the conversion fails.
42+
expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out);
43+
44+
/// \brief Convert an Arrow schema to an Iceberg schema.
45+
///
46+
/// \param[in] schema The Arrow schema to convert.
47+
/// \param[in] schema_id The schema ID of the Iceberg schema.
48+
/// \return The Iceberg schema or an error if the conversion fails.
49+
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
50+
int32_t schema_id);
51+
52+
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,19 @@ add_test(NAME schema_test COMMAND schema_test)
3030

3131
add_executable(expected_test)
3232
target_sources(expected_test PRIVATE expected_test.cc)
33-
target_link_libraries(expected_test PRIVATE iceberg_static GTest::gtest_main)
33+
target_link_libraries(expected_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
3434
add_test(NAME expected_test COMMAND expected_test)
3535

3636
if(ICEBERG_BUILD_BUNDLE)
3737
add_executable(avro_test)
3838
target_sources(avro_test PRIVATE avro_test.cc)
39-
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main)
39+
target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main
40+
GTest::gmock)
4041
add_test(NAME avro_test COMMAND avro_test)
4142

4243
add_executable(arrow_test)
4344
target_sources(arrow_test PRIVATE arrow_test.cc)
4445
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static Arrow::arrow_static
45-
GTest::gtest_main)
46+
GTest::gtest_main GTest::gmock)
4647
add_test(NAME arrow_test COMMAND arrow_test)
4748
endif()

0 commit comments

Comments
 (0)