Skip to content

Commit f0d8e61

Browse files
committed
feat: convert iceberg schema to arrow schema
1 parent 829dc65 commit f0d8e61

File tree

9 files changed

+522
-6
lines changed

9 files changed

+522
-6
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ set(ICEBERG_ARROW_INSTALL_INTERFACE_LIBS)
2323
# ----------------------------------------------------------------------
2424
# Versions and URLs for toolchain builds
2525

26-
set(ICEBERG_ARROW_BUILD_VERSION "18.1.0")
26+
set(ICEBERG_ARROW_BUILD_VERSION "19.0.1")
2727
set(ICEBERG_ARROW_BUILD_SHA256_CHECKSUM
28-
"2dc8da5f8796afe213ecc5e5aba85bb82d91520eff3cf315784a52d0fa61d7fc")
28+
"acb76266e8b0c2fbb7eb15d542fbb462a73b3fd1e32b80fad6c2fafd95a51160")
2929

3030
if(DEFINED ENV{ICEBERG_ARROW_URL})
3131
set(ARROW_SOURCE_URL "$ENV{ICEBERG_ARROW_URL}")

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ set(ICEBERG_SOURCES
2222
demo.cc
2323
schema.cc
2424
schema_field.cc
25+
schema_internal.cc
2526
type.cc)
2627

2728
set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)

src/iceberg/arrow_c_data.h

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929

3030
#include <cstdint>
3131

32+
extern "C" {
33+
3234
#ifndef ARROW_C_DATA_INTERFACE
3335
# define ARROW_C_DATA_INTERFACE
3436

35-
extern "C" {
37+
# define ARROW_FLAG_DICTIONARY_ORDERED 1
38+
# define ARROW_FLAG_NULLABLE 2
39+
# define ARROW_FLAG_MAP_KEYS_SORTED 4
40+
3641
struct ArrowSchema {
3742
// Array type description
3843
const char* format;
@@ -66,6 +71,47 @@ struct ArrowArray {
6671
void* private_data;
6772
};
6873

69-
} // extern "C"
70-
7174
#endif // ARROW_C_DATA_INTERFACE
75+
76+
#ifndef ARROW_C_STREAM_INTERFACE
77+
# define ARROW_C_STREAM_INTERFACE
78+
79+
struct ArrowArrayStream {
80+
// Callback to get the stream type
81+
// (will be the same for all arrays in the stream).
82+
//
83+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
84+
//
85+
// If successful, the ArrowSchema must be released independently from the stream.
86+
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
87+
88+
// Callback to get the next array
89+
// (if no error and the array is released, the stream has ended)
90+
//
91+
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
92+
//
93+
// If successful, the ArrowArray must be released independently from the stream.
94+
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
95+
96+
// Callback to get optional detailed error information.
97+
// This must only be called if the last stream operation failed
98+
// with a non-0 return code.
99+
//
100+
// Return value: pointer to a null-terminated character array describing
101+
// the last error, or NULL if no description is available.
102+
//
103+
// The returned pointer is only valid until the next operation on this stream
104+
// (including release).
105+
const char* (*get_last_error)(struct ArrowArrayStream*);
106+
107+
// Release callback: release the stream's own resources.
108+
// Note that arrays returned by `get_next` must be individually released.
109+
void (*release)(struct ArrowArrayStream*);
110+
111+
// Opaque producer-specific data
112+
void* private_data;
113+
};
114+
115+
#endif // ARROW_C_STREAM_INTERFACE
116+
117+
} // extern "C"

src/iceberg/error.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ enum class ErrorKind {
3232
kAlreadyExists,
3333
kNoSuchTable,
3434
kCommitStateUnknown,
35+
kInvalidSchema,
36+
kInvalidArgument,
3537
};
3638

3739
/// \brief Error with a kind and a message.

src/iceberg/schema_field.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
6464
/// \brief Get whether the field is optional.
6565
[[nodiscard]] bool optional() const;
6666

67-
[[nodiscard]] std::string ToString() const;
67+
[[nodiscard]] std::string ToString() const override;
6868

6969
friend bool operator==(const SchemaField& lhs, const SchemaField& rhs) {
7070
return lhs.Equals(rhs);

src/iceberg/schema_internal.cc

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/schema_internal.h"
21+
22+
#include <format>
23+
#include <string>
24+
25+
#include <nanoarrow/nanoarrow.h>
26+
27+
#include "iceberg/expected.h"
28+
#include "iceberg/schema.h"
29+
#include "nanoarrow/common/inline_types.h"
30+
31+
namespace iceberg {
32+
33+
namespace {
34+
35+
constexpr const char* kArrowExtensionName = "ARROW:extension:name";
36+
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";
37+
38+
// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
39+
ArrowErrorCode ConvertToArrowSchema(const Type& type, ArrowSchema* schema, bool optional,
40+
std::string_view name = "", int32_t field_id = -1) {
41+
ArrowBuffer metadata_buffer;
42+
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr));
43+
if (field_id > 0) {
44+
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend(
45+
&metadata_buffer, ArrowCharView(std::string(kFieldIdKey).c_str()),
46+
ArrowCharView(std::to_string(field_id).c_str())));
47+
}
48+
49+
switch (type.type_id()) {
50+
case TypeId::kStruct: {
51+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_STRUCT));
52+
53+
const auto& struct_type = static_cast<const StructType&>(type);
54+
const auto& fields = struct_type.fields();
55+
NANOARROW_RETURN_NOT_OK(ArrowSchemaAllocateChildren(schema, fields.size()));
56+
57+
for (size_t i = 0; i < fields.size(); i++) {
58+
const auto& field = fields[i];
59+
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(*field.type(), schema->children[i],
60+
field.optional(), field.name(),
61+
field.field_id()));
62+
}
63+
} break;
64+
case TypeId::kList: {
65+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_LIST));
66+
67+
const auto& list_type = static_cast<const ListType&>(type);
68+
const auto& elem_field = list_type.fields()[0];
69+
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(
70+
*elem_field.type(), schema->children[0], elem_field.optional(),
71+
elem_field.name(), elem_field.field_id()));
72+
} break;
73+
case TypeId::kMap: {
74+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_MAP));
75+
76+
const auto& map_type = static_cast<const MapType&>(type);
77+
const auto& key_field = map_type.key();
78+
const auto& value_field = map_type.value();
79+
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(
80+
*key_field.type(), schema->children[0]->children[0], key_field.optional(),
81+
key_field.name(), key_field.field_id()));
82+
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(
83+
*value_field.type(), schema->children[0]->children[1], value_field.optional(),
84+
value_field.name(), value_field.field_id()));
85+
} break;
86+
case TypeId::kBoolean:
87+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_BOOL));
88+
break;
89+
case TypeId::kInt:
90+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_INT32));
91+
break;
92+
case TypeId::kLong:
93+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_INT64));
94+
break;
95+
case TypeId::kFloat:
96+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_FLOAT));
97+
break;
98+
case TypeId::kDouble:
99+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_DOUBLE));
100+
break;
101+
case TypeId::kDecimal: {
102+
ArrowSchemaInit(schema);
103+
const auto& decimal_type = static_cast<const DecimalType&>(type);
104+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDecimal(schema, NANOARROW_TYPE_DECIMAL128,
105+
decimal_type.precision(),
106+
decimal_type.scale()));
107+
} break;
108+
case TypeId::kDate:
109+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_DATE32));
110+
break;
111+
case TypeId::kTime: {
112+
ArrowSchemaInit(schema);
113+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIME64,
114+
NANOARROW_TIME_UNIT_MICRO,
115+
/*timezone=*/nullptr));
116+
} break;
117+
case TypeId::kTimestamp: {
118+
ArrowSchemaInit(schema);
119+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIMESTAMP,
120+
NANOARROW_TIME_UNIT_MICRO,
121+
/*timezone=*/nullptr));
122+
} break;
123+
case TypeId::kTimestampTz: {
124+
ArrowSchemaInit(schema);
125+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(
126+
schema, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO, "UTC"));
127+
} break;
128+
case TypeId::kString:
129+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_STRING));
130+
break;
131+
case TypeId::kBinary:
132+
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_BINARY));
133+
break;
134+
case TypeId::kFixed: {
135+
ArrowSchemaInit(schema);
136+
const auto& fixed_type = static_cast<const FixedType&>(type);
137+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
138+
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_type.length()));
139+
} break;
140+
case TypeId::kUuid: {
141+
ArrowSchemaInit(schema);
142+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
143+
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
144+
NANOARROW_RETURN_NOT_OK(
145+
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
146+
ArrowCharView("arrow.uuid")));
147+
} break;
148+
}
149+
150+
if (!name.empty()) {
151+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(schema, std::string(name).c_str()));
152+
}
153+
154+
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(
155+
schema, reinterpret_cast<const char*>(metadata_buffer.data)));
156+
ArrowBufferReset(&metadata_buffer);
157+
158+
if (optional) {
159+
schema->flags |= ARROW_FLAG_NULLABLE;
160+
} else {
161+
schema->flags &= ~ARROW_FLAG_NULLABLE;
162+
}
163+
164+
return NANOARROW_OK;
165+
}
166+
167+
} // namespace
168+
169+
expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out) {
170+
if (out == nullptr) [[unlikely]] {
171+
return unexpected<Error>{{.kind = ErrorKind::kInvalidArgument,
172+
.message = "Output Arrow schema cannot be null"}};
173+
}
174+
175+
if (ArrowErrorCode errorCode = ConvertToArrowSchema(schema, out, "");
176+
errorCode != NANOARROW_OK) {
177+
return unexpected<Error>{
178+
{.kind = ErrorKind::kInvalidSchema,
179+
.message = std::format(
180+
"Failed to convert Iceberg schema to Arrow schema, error code: {}",
181+
errorCode)}};
182+
}
183+
184+
return {};
185+
}
186+
187+
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
188+
int32_t schema_id) {
189+
// TODO(wgtmac): Implement this
190+
return unexpected<Error>{
191+
{.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}};
192+
}
193+
194+
} // namespace iceberg

src/iceberg/schema_internal.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
24+
#include "iceberg/arrow_c_data.h"
25+
#include "iceberg/error.h"
26+
#include "iceberg/expected.h"
27+
#include "iceberg/type_fwd.h"
28+
29+
namespace iceberg {
30+
31+
// Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet.
32+
// Here we follow a similar convention for Iceberg but we might also add
33+
// "PARQUET:field_id" in the future once we implement a Parquet writer.
34+
constexpr std::string_view kFieldIdKey = "ICEBERG:field_id";
35+
36+
/// \brief Convert an Iceberg schema to an Arrow schema.
37+
///
38+
/// \param[in] schema The Iceberg schema to convert.
39+
/// \param[out] out The Arrow schema to convert to.
40+
/// \return An error if the conversion fails.
41+
expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out);
42+
43+
/// \brief Convert an Arrow schema to an Iceberg schema.
44+
///
45+
/// \param[in] schema The Arrow schema to convert.
46+
/// \param[in] schema_id The schema ID of the Iceberg schema.
47+
/// \return The Iceberg schema or an error if the conversion fails.
48+
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
49+
int32_t schema_id);
50+
51+
} // namespace iceberg

0 commit comments

Comments
 (0)