Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
snapshot.cc
util/murmurhash3_internal.cc)
util/murmurhash3_internal.cc
util/timepoint.cc)

set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)
set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/expression/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ std::string And::ToString() const {

Result<std::shared_ptr<Expression>> And::Negate() const {
// TODO(yingcai-cy): Implement Or expression
return InvalidExpressionError("And negation not yet implemented");
return InvalidExpression("And negation not yet implemented");
}

bool And::Equals(const Expression& expr) const {
Expand Down
3 changes: 1 addition & 2 deletions src/iceberg/expression/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class ICEBERG_EXPORT Expression {

/// \brief Returns the negation of this expression, equivalent to not(this).
virtual Result<std::shared_ptr<Expression>> Negate() const {
return unexpected(
Error(ErrorKind::kInvalidExpression, "Expression cannot be negated"));
return InvalidExpression("Expression cannot be negated");
}

/// \brief Returns whether this expression will accept the same values as another.
Expand Down
9 changes: 3 additions & 6 deletions src/iceberg/file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class ICEBERG_EXPORT FileIO {
virtual Result<std::string> ReadFile(const std::string& file_location,
std::optional<size_t> length) {
// We provide a default implementation to avoid Windows linker error LNK2019.
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "ReadFile not implemented"}};
return NotImplemented("ReadFile not implemented");
}

/// \brief Write the given content to the file at the given location.
Expand All @@ -64,17 +63,15 @@ class ICEBERG_EXPORT FileIO {
/// file exists.
/// \return void if the write succeeded, an error code if the write failed.
virtual Status WriteFile(const std::string& file_location, std::string_view content) {
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "WriteFile not implemented"}};
return NotImplemented("WriteFile not implemented");
}

/// \brief Delete a file at the given location.
///
/// \param file_location The location of the file to delete.
/// \return void if the delete succeeded, an error code if the delete failed.
virtual Status DeleteFile(const std::string& file_location) {
return unexpected<Error>{
{.kind = ErrorKind::kNotImplemented, .message = "DeleteFile not implemented"}};
return NotImplemented("DeleteFile not implemented");
}
};

Expand Down
25 changes: 9 additions & 16 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <type_traits>
#include <unordered_set>

#include <iceberg/table.h>
#include <nlohmann/json.hpp>

#include "iceberg/partition_field.h"
Expand All @@ -38,11 +37,13 @@
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/transform.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"
#include "iceberg/util/timepoint.h"

namespace iceberg {

Expand Down Expand Up @@ -502,7 +503,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
if (snapshot.sequence_number > TableMetadata::kInitialSequenceNumber) {
json[kSequenceNumber] = snapshot.sequence_number;
}
json[kTimestampMs] = snapshot.timestamp_ms;
json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms);
json[kManifestList] = snapshot.manifest_list;
// If there is an operation, write the summary map
if (snapshot.operation().has_value()) {
Expand Down Expand Up @@ -722,7 +723,9 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
GetJsonValueOptional<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(auto timestamp_ms, GetJsonValue<int64_t>(json, kTimestampMs));
ICEBERG_ASSIGN_OR_RAISE(
auto timestamp_ms,
GetJsonValue<int64_t>(json, kTimestampMs).and_then(TimePointMsFromUnixMs));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
GetJsonValue<std::string>(json, kManifestList));

Expand All @@ -735,24 +738,14 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
if (summary_json.has_value()) {
for (const auto& [key, value] : summary_json->items()) {
if (!kValidSnapshotSummaryFields.contains(key)) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid snapshot summary field: {}", key),
});
return JsonParseError("Invalid snapshot summary field: {}", key);
}
if (!value.is_string()) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message =
std::format("Invalid snapshot summary field value: {}", value.dump()),
});
return JsonParseError("Invalid snapshot summary field value: {}", value.dump());
}
if (key == SnapshotSummaryFields::kOperation &&
!kValidDataOperation.contains(value.get<std::string>())) {
return unexpected<Error>({
.kind = ErrorKind::kJsonParseError,
.message = std::format("Invalid snapshot operation: {}", value.dump()),
});
return JsonParseError("Invalid snapshot operation: {}", value.dump());
}
summary[key] = value.get<std::string>();
}
Expand Down
58 changes: 29 additions & 29 deletions src/iceberg/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,20 @@
namespace iceberg {

/// \brief Error types for iceberg.
/// TODO: add more and sort them based on some rules.
enum class ErrorKind {
kNoSuchNamespace,
kAlreadyExists,
kNoSuchTable,
kCommitStateUnknown,
kInvalidSchema,
kInvalidArgument,
kIOError,
kNotImplemented,
kUnknownError,
kNotSupported,
kInvalidExpression,
kInvalidSchema,
kIOError,
kJsonParseError,
kNoSuchNamespace,
kNoSuchTable,
kNotFound,
kNotImplemented,
kNotSupported,
kUnknownError,
};

/// \brief Error with a kind and a message.
Expand All @@ -63,28 +62,29 @@ using Result = expected<T, E>;

using Status = Result<void>;

/// \brief Create an unexpected error with kNotImplemented
template <typename... Args>
auto NotImplementedError(const std::format_string<Args...> fmt, Args&&... args)
-> unexpected<Error> {
return unexpected<Error>({.kind = ErrorKind::kNotImplemented,
.message = std::format(fmt, std::forward<Args>(args)...)});
}
/// \brief Macro to define error creation functions
#define DEFINE_ERROR_FUNCTION(name) \
template <typename... Args> \
inline auto name(const std::format_string<Args...> fmt, Args&&... args) \
-> unexpected<Error> { \
return unexpected<Error>( \
{ErrorKind::k##name, std::format(fmt, std::forward<Args>(args)...)}); \
}

/// \brief Create an unexpected error with kJsonParseError
template <typename... Args>
auto JsonParseError(const std::format_string<Args...> fmt, Args&&... args)
-> unexpected<Error> {
return unexpected<Error>({.kind = ErrorKind::kJsonParseError,
.message = std::format(fmt, std::forward<Args>(args)...)});
}
DEFINE_ERROR_FUNCTION(AlreadyExists)
DEFINE_ERROR_FUNCTION(CommitStateUnknown)
DEFINE_ERROR_FUNCTION(InvalidArgument)
DEFINE_ERROR_FUNCTION(InvalidExpression)
DEFINE_ERROR_FUNCTION(InvalidSchema)
DEFINE_ERROR_FUNCTION(IOError)
DEFINE_ERROR_FUNCTION(JsonParseError)
DEFINE_ERROR_FUNCTION(NoSuchNamespace)
DEFINE_ERROR_FUNCTION(NoSuchTable)
DEFINE_ERROR_FUNCTION(NotFound)
DEFINE_ERROR_FUNCTION(NotImplemented)
DEFINE_ERROR_FUNCTION(NotSupported)
DEFINE_ERROR_FUNCTION(UnknownError)

/// \brief Create an unexpected error with kInvalidExpression
template <typename... Args>
auto InvalidExpressionError(const std::format_string<Args...> fmt, Args&&... args)
-> unexpected<Error> {
return unexpected<Error>({.kind = ErrorKind::kInvalidExpression,
.message = std::format(fmt, std::forward<Args>(args)...)});
}
#undef DEFINE_ERROR_FUNCTION

} // namespace iceberg
89 changes: 28 additions & 61 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
#include "iceberg/schema_internal.h"

#include <cstring>
#include <format>
#include <optional>
#include <string>

#include "iceberg/schema.h"
#include "iceberg/type.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Expand Down Expand Up @@ -170,18 +170,14 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n

Status ToArrowSchema(const Schema& schema, ArrowSchema* out) {
if (out == nullptr) [[unlikely]] {
return unexpected<Error>{{.kind = ErrorKind::kInvalidArgument,
.message = "Output Arrow schema cannot be null"}};
return InvalidArgument("Output Arrow schema cannot be null");
}

if (ArrowErrorCode errorCode = ToArrowSchema(schema, /*optional=*/false, /*name=*/"",
/*field_id=*/std::nullopt, out);
errorCode != NANOARROW_OK) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format(
"Failed to convert Iceberg schema to Arrow schema, error code: {}",
errorCode)}};
return InvalidSchema(
"Failed to convert Iceberg schema to Arrow schema, error code: {}", errorCode);
}

return {};
Expand All @@ -208,15 +204,12 @@ int32_t GetFieldId(const ArrowSchema& schema) {
Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
auto to_schema_field =
[](const ArrowSchema& schema) -> Result<std::unique_ptr<SchemaField>> {
auto field_type_result = FromArrowSchema(schema);
if (!field_type_result) {
return unexpected<Error>(field_type_result.error());
}
ICEBERG_ASSIGN_OR_RAISE(auto field_type, FromArrowSchema(schema));

auto field_id = GetFieldId(schema);
bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0;
return std::make_unique<SchemaField>(
field_id, schema.name, std::move(field_type_result.value()), is_optional);
return std::make_unique<SchemaField>(field_id, schema.name, std::move(field_type),
is_optional);
};

ArrowError arrow_error;
Expand All @@ -225,10 +218,8 @@ Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
ArrowSchemaView schema_view;
if (auto error_code = ArrowSchemaViewInit(&schema_view, &schema, &arrow_error);
error_code != NANOARROW_OK) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Failed to read Arrow schema, code: {}, message: {}",
error_code, arrow_error.message)}};
return InvalidSchema("Failed to read Arrow schema, code: {}, message: {}", error_code,
arrow_error.message);
}

switch (schema_view.type) {
Expand All @@ -237,35 +228,24 @@ Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
fields.reserve(schema.n_children);

for (int i = 0; i < schema.n_children; i++) {
auto field_result = to_schema_field(*schema.children[i]);
if (!field_result) {
return unexpected<Error>(field_result.error());
}
fields.emplace_back(std::move(*field_result.value()));
ICEBERG_ASSIGN_OR_RAISE(auto field, to_schema_field(*schema.children[i]));
fields.emplace_back(std::move(*field));
}

return std::make_shared<StructType>(std::move(fields));
}
case NANOARROW_TYPE_LIST: {
auto element_field_result = to_schema_field(*schema.children[0]);
if (!element_field_result) {
return unexpected<Error>(element_field_result.error());
}
return std::make_shared<ListType>(std::move(*element_field_result.value()));
ICEBERG_ASSIGN_OR_RAISE(auto element_field_result,
to_schema_field(*schema.children[0]));
return std::make_shared<ListType>(std::move(*element_field_result));
}
case NANOARROW_TYPE_MAP: {
auto key_field_result = to_schema_field(*schema.children[0]->children[0]);
if (!key_field_result) {
return unexpected<Error>(key_field_result.error());
}
ICEBERG_ASSIGN_OR_RAISE(auto key_field,
to_schema_field(*schema.children[0]->children[0]));
ICEBERG_ASSIGN_OR_RAISE(auto value_field,
to_schema_field(*schema.children[0]->children[1]));

auto value_field_result = to_schema_field(*schema.children[0]->children[1]);
if (!value_field_result) {
return unexpected<Error>(value_field_result.error());
}

return std::make_shared<MapType>(std::move(*key_field_result.value()),
std::move(*value_field_result.value()));
return std::make_shared<MapType>(std::move(*key_field), std::move(*value_field));
}
case NANOARROW_TYPE_BOOL:
return std::make_shared<BooleanType>();
Expand All @@ -284,20 +264,16 @@ Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
return std::make_shared<DateType>();
case NANOARROW_TYPE_TIME64:
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Unsupported time unit for Arrow time type: {}",
static_cast<int>(schema_view.time_unit))}};
return InvalidSchema("Unsupported time unit for Arrow time type: {}",
static_cast<int>(schema_view.time_unit));
}
return std::make_shared<TimeType>();
case NANOARROW_TYPE_TIMESTAMP: {
bool with_timezone =
schema_view.timezone != nullptr && std::strlen(schema_view.timezone) > 0;
if (schema_view.time_unit != NANOARROW_TIME_UNIT_MICRO) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Unsupported time unit for Arrow timestamp type: {}",
static_cast<int>(schema_view.time_unit))}};
return InvalidSchema("Unsupported time unit for Arrow timestamp type: {}",
static_cast<int>(schema_view.time_unit));
}
if (with_timezone) {
return std::make_shared<TimestampTzType>();
Expand All @@ -314,18 +290,15 @@ Result<std::shared_ptr<Type>> FromArrowSchema(const ArrowSchema& schema) {
schema_view.extension_name.size_bytes);
extension_name == kArrowUuidExtensionName) {
if (schema_view.fixed_size != 16) {
return unexpected<Error>{{.kind = ErrorKind::kInvalidSchema,
.message = "UUID type must have a fixed size of 16"}};
return InvalidSchema("UUID type must have a fixed size of 16");
}
return std::make_shared<UuidType>();
}
return std::make_shared<FixedType>(schema_view.fixed_size);
}
default:
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = std::format("Unsupported Arrow type: {}",
ArrowTypeString(schema_view.type))}};
return InvalidSchema("Unsupported Arrow type: {}",
ArrowTypeString(schema_view.type));
}
}

Expand All @@ -343,16 +316,10 @@ std::unique_ptr<Schema> FromStructType(StructType&& struct_type,

Result<std::unique_ptr<Schema>> FromArrowSchema(const ArrowSchema& schema,
std::optional<int32_t> schema_id) {
auto type_result = FromArrowSchema(schema);
if (!type_result) {
return unexpected<Error>(type_result.error());
}
ICEBERG_ASSIGN_OR_RAISE(auto type, FromArrowSchema(schema));

auto& type = type_result.value();
if (type->type_id() != TypeId::kStruct) {
return unexpected<Error>{
{.kind = ErrorKind::kInvalidSchema,
.message = "Arrow schema must be a struct type for Iceberg schema"}};
return InvalidSchema("Arrow schema must be a struct type for Iceberg schema");
}

auto& struct_type = static_cast<StructType&>(*type);
Expand Down
Loading
Loading