Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmake_modules/IcebergThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ set(ICEBERG_ARROW_INSTALL_INTERFACE_LIBS)
# ----------------------------------------------------------------------
# Versions and URLs for toolchain builds

set(ICEBERG_ARROW_BUILD_VERSION "18.1.0")
set(ICEBERG_ARROW_BUILD_VERSION "19.0.1")
set(ICEBERG_ARROW_BUILD_SHA256_CHECKSUM
"2dc8da5f8796afe213ecc5e5aba85bb82d91520eff3cf315784a52d0fa61d7fc")
"acb76266e8b0c2fbb7eb15d542fbb462a73b3fd1e32b80fad6c2fafd95a51160")

if(DEFINED ENV{ICEBERG_ARROW_URL})
set(ARROW_SOURCE_URL "$ENV{ICEBERG_ARROW_URL}")
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ set(ICEBERG_SOURCES
demo.cc
schema.cc
schema_field.cc
schema_internal.cc
type.cc)

set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)
Expand Down
52 changes: 49 additions & 3 deletions src/iceberg/arrow_c_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@

#include <cstdint>

extern "C" {

#ifndef ARROW_C_DATA_INTERFACE
# define ARROW_C_DATA_INTERFACE

extern "C" {
# define ARROW_FLAG_DICTIONARY_ORDERED 1
# define ARROW_FLAG_NULLABLE 2
# define ARROW_FLAG_MAP_KEYS_SORTED 4

struct ArrowSchema {
// Array type description
const char* format;
Expand Down Expand Up @@ -66,6 +71,47 @@ struct ArrowArray {
void* private_data;
};

} // extern "C"

#endif // ARROW_C_DATA_INTERFACE

#ifndef ARROW_C_STREAM_INTERFACE
# define ARROW_C_STREAM_INTERFACE

struct ArrowArrayStream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention [0] in the \file description?

I have a feeling that this is not related to PR $titile, so maybe another PR for this?

[0] https://arrow.apache.org/docs/format/CStreamInterface.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to add this, otherwise compiling files with headers from nanoarrow will complain missing definition of ArrowArrayStream.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then the patch LGTM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed ArrowArrayStream. I think internally we can just include nanoarrow.h so we don't have to add ArrowArrayStream and confuse the downstream.

// Callback to get the stream type
// (will be the same for all arrays in the stream).
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowSchema must be released independently from the stream.
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);

// Callback to get the next array
// (if no error and the array is released, the stream has ended)
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowArray must be released independently from the stream.
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);

// Callback to get optional detailed error information.
// This must only be called if the last stream operation failed
// with a non-0 return code.
//
// Return value: pointer to a null-terminated character array describing
// the last error, or NULL if no description is available.
//
// The returned pointer is only valid until the next operation on this stream
// (including release).
const char* (*get_last_error)(struct ArrowArrayStream*);

// Release callback: release the stream's own resources.
// Note that arrays returned by `get_next` must be individually released.
void (*release)(struct ArrowArrayStream*);

// Opaque producer-specific data
void* private_data;
};

#endif // ARROW_C_STREAM_INTERFACE

} // extern "C"
2 changes: 2 additions & 0 deletions src/iceberg/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ enum class ErrorKind {
kAlreadyExists,
kNoSuchTable,
kCommitStateUnknown,
kInvalidSchema,
kInvalidArgument,
};

/// \brief Error with a kind and a message.
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/schema_field.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
/// \brief Get whether the field is optional.
[[nodiscard]] bool optional() const;

[[nodiscard]] std::string ToString() const;
[[nodiscard]] std::string ToString() const override;

friend bool operator==(const SchemaField& lhs, const SchemaField& rhs) {
return lhs.Equals(rhs);
Expand Down
193 changes: 193 additions & 0 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/schema_internal.h"

#include <format>
#include <string>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/expected.h"
#include "iceberg/schema.h"

namespace iceberg {

namespace {

constexpr const char* kArrowExtensionName = "ARROW:extension:name";
constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata";

// Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code.
ArrowErrorCode ConvertToArrowSchema(const Type& type, ArrowSchema* schema, bool optional,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConvertToArrowSchema and ToArrowSchema are a bit confusing to me. Do we need to make the API naming more clear or split them in different namespaces?


By the way, do we have a policy for our API args' order?

input_field_a, out, input_field_b, input_field_c seems not good to me and hard to follow.

Is it widely adopted in cpp community?

Copy link
Member Author

@wgtmac wgtmac Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Actually they are in different namespaces (the former one is in the anonymous namespace). I have renamed them to ToArrowSchema to be consistent.

W.r.t. the order of arguments, those with default values must be placed at the end. Since they are all internal functions, I have just removed any default argument and put the out param to the end.

std::string_view name = "", int32_t field_id = -1) {
ArrowBuffer metadata_buffer;
NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderInit(&metadata_buffer, nullptr));
if (field_id > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, where in the Iceberg spec does it say that field IDs must be nonzero? (I don't even see a bit width defined for field ID in the spec...)

optional might be a more idiomatic way to write the type but this works (if we can find a reference for the field ID type)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! In practice, parquet-cpp rejects negative field_id: https://github.com/apache/arrow/blob/618ef501a21375abfaeee19e393eb64dee83ef0d/cpp/src/parquet/arrow/schema.cc#L248-L278

@Fokko @rdblue Has this been discussed before? Should we restrict field_id to be non-negative from the spec?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should field ID 0 be valid then though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched to use std::optional<int32_t> for now.

NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend(
&metadata_buffer, ArrowCharView(std::string(kFieldIdKey).c_str()),
ArrowCharView(std::to_string(field_id).c_str())));
}

switch (type.type_id()) {
case TypeId::kStruct: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_STRUCT));

const auto& struct_type = static_cast<const StructType&>(type);
const auto& fields = struct_type.fields();
NANOARROW_RETURN_NOT_OK(ArrowSchemaAllocateChildren(schema, fields.size()));

for (size_t i = 0; i < fields.size(); i++) {
const auto& field = fields[i];
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(*field.type(), schema->children[i],
field.optional(), field.name(),
field.field_id()));
}
} break;
case TypeId::kList: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_LIST));

const auto& list_type = static_cast<const ListType&>(type);
const auto& elem_field = list_type.fields()[0];
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(
*elem_field.type(), schema->children[0], elem_field.optional(),
elem_field.name(), elem_field.field_id()));
} break;
case TypeId::kMap: {
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_MAP));

const auto& map_type = static_cast<const MapType&>(type);
const auto& key_field = map_type.key();
const auto& value_field = map_type.value();
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(
*key_field.type(), schema->children[0]->children[0], key_field.optional(),
key_field.name(), key_field.field_id()));
NANOARROW_RETURN_NOT_OK(ConvertToArrowSchema(
*value_field.type(), schema->children[0]->children[1], value_field.optional(),
value_field.name(), value_field.field_id()));
} break;
case TypeId::kBoolean:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_BOOL));
break;
case TypeId::kInt:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_INT32));
break;
case TypeId::kLong:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_INT64));
break;
case TypeId::kFloat:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_FLOAT));
break;
case TypeId::kDouble:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_DOUBLE));
break;
case TypeId::kDecimal: {
ArrowSchemaInit(schema);
const auto& decimal_type = static_cast<const DecimalType&>(type);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDecimal(schema, NANOARROW_TYPE_DECIMAL128,
decimal_type.precision(),
decimal_type.scale()));
} break;
case TypeId::kDate:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_DATE32));
break;
case TypeId::kTime: {
ArrowSchemaInit(schema);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIME64,
NANOARROW_TIME_UNIT_MICRO,
/*timezone=*/nullptr));
} break;
case TypeId::kTimestamp: {
ArrowSchemaInit(schema);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIMESTAMP,
NANOARROW_TIME_UNIT_MICRO,
/*timezone=*/nullptr));
} break;
case TypeId::kTimestampTz: {
ArrowSchemaInit(schema);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(
schema, NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO, "UTC"));
} break;
case TypeId::kString:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_STRING));
break;
case TypeId::kBinary:
NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(schema, NANOARROW_TYPE_BINARY));
break;
case TypeId::kFixed: {
ArrowSchemaInit(schema);
const auto& fixed_type = static_cast<const FixedType&>(type);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_type.length()));
} break;
case TypeId::kUuid: {
ArrowSchemaInit(schema);
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeFixedSize(
schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, /*fixed_size=*/16));
NANOARROW_RETURN_NOT_OK(
ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName),
ArrowCharView("arrow.uuid")));
} break;
}

if (!name.empty()) {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetName(schema, std::string(name).c_str()));
}

NANOARROW_RETURN_NOT_OK(ArrowSchemaSetMetadata(
schema, reinterpret_cast<const char*>(metadata_buffer.data)));
ArrowBufferReset(&metadata_buffer);

if (optional) {
schema->flags |= ARROW_FLAG_NULLABLE;
} else {
schema->flags &= ~ARROW_FLAG_NULLABLE;
}

return NANOARROW_OK;
}

} // namespace

expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out) {
if (out == nullptr) [[unlikely]] {
return unexpected<Error>{{.kind = ErrorKind::kInvalidArgument,
.message = "Output Arrow schema cannot be null"}};
}

if (ArrowErrorCode errorCode = ConvertToArrowSchema(schema, out, "");
errorCode != NANOARROW_OK) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format(
"Failed to convert Iceberg schema to Arrow schema, error code: {}",
errorCode)}};
}

return {};
}

expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
int32_t schema_id) {
// TODO(wgtmac): Implement this
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema, .message = "Not implemented yet"}};
}

} // namespace iceberg
51 changes: 51 additions & 0 deletions src/iceberg/schema_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>

#include "iceberg/arrow_c_data.h"
#include "iceberg/error.h"
#include "iceberg/expected.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

// Apache Arrow C++ uses "PARQUET:field_id" to store field IDs for Parquet.
// Here we follow a similar convention for Iceberg but we might also add
// "PARQUET:field_id" in the future once we implement a Parquet writer.
constexpr std::string_view kFieldIdKey = "ICEBERG:field_id";

/// \brief Convert an Iceberg schema to an Arrow schema.
///
/// \param[in] schema The Iceberg schema to convert.
/// \param[out] out The Arrow schema to convert to.
/// \return An error if the conversion fails.
expected<void, Error> ToArrowSchema(const Schema& schema, ArrowSchema* out);

/// \brief Convert an Arrow schema to an Iceberg schema.
///
/// \param[in] schema The Arrow schema to convert.
/// \param[in] schema_id The schema ID of the Iceberg schema.
/// \return The Iceberg schema or an error if the conversion fails.
expected<std::unique_ptr<Schema>, Error> FromArrowSchema(const ArrowSchema& schema,
int32_t schema_id);

} // namespace iceberg
Loading
Loading