diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index ec34be1b3..5fd76c60d 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -19,9 +19,11 @@ #include "iceberg/json_internal.h" +#include #include #include #include +#include #include #include @@ -32,6 +34,8 @@ #include "iceberg/schema_internal.h" #include "iceberg/snapshot.h" #include "iceberg/sort_order.h" +#include "iceberg/statistics_file.h" +#include "iceberg/table_metadata.h" #include "iceberg/transform.h" #include "iceberg/type.h" #include "iceberg/util/formatter.h" // IWYU pragma: keep @@ -41,17 +45,21 @@ namespace iceberg { namespace { +// Transform constants constexpr std::string_view kTransform = "transform"; constexpr std::string_view kSourceId = "source-id"; constexpr std::string_view kDirection = "direction"; constexpr std::string_view kNullOrder = "null-order"; +// Sort order constants constexpr std::string_view kOrderId = "order-id"; constexpr std::string_view kFields = "fields"; +// Schema constants constexpr std::string_view kSchemaId = "schema-id"; constexpr std::string_view kIdentifierFieldIds = "identifier-field-ids"; +// Type constants constexpr std::string_view kType = "type"; constexpr std::string_view kStruct = "struct"; constexpr std::string_view kList = "list"; @@ -71,6 +79,7 @@ constexpr std::string_view kRequired = "required"; constexpr std::string_view kElementRequired = "element-required"; constexpr std::string_view kValueRequired = "value-required"; +// Snapshot constants constexpr std::string_view kFieldId = "field-id"; constexpr std::string_view kSpecId = "spec-id"; constexpr std::string_view kSnapshotId = "snapshot-id"; @@ -83,8 +92,6 @@ constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep"; constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms"; constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms"; -constexpr int64_t kInitialSequenceNumber = 0; - const std::unordered_set kValidSnapshotSummaryFields = { SnapshotSummaryFields::kOperation, SnapshotSummaryFields::kAddedDataFiles, @@ -123,21 +130,51 @@ const std::unordered_set kValidDataOperation = { DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite, DataOperation::kDelete}; +// TableMetadata constants +constexpr std::string_view kFormatVersion = "format-version"; +constexpr std::string_view kTableUuid = "table-uuid"; +constexpr std::string_view kLocation = "location"; +constexpr std::string_view kLastSequenceNumber = "last-sequence-number"; +constexpr std::string_view kLastUpdatedMs = "last-updated-ms"; +constexpr std::string_view kLastColumnId = "last-column-id"; +constexpr std::string_view kSchema = "schema"; +constexpr std::string_view kSchemas = "schemas"; +constexpr std::string_view kCurrentSchemaId = "current-schema-id"; +constexpr std::string_view kPartitionSpec = "partition-spec"; +constexpr std::string_view kPartitionSpecs = "partition-specs"; +constexpr std::string_view kDefaultSpecId = "default-spec-id"; +constexpr std::string_view kLastPartitionId = "last-partition-id"; +constexpr std::string_view kProperties = "properties"; +constexpr std::string_view kCurrentSnapshotId = "current-snapshot-id"; +constexpr std::string_view kSnapshots = "snapshots"; +constexpr std::string_view kSnapshotLog = "snapshot-log"; +constexpr std::string_view kMetadataLog = "metadata-log"; +constexpr std::string_view kSortOrders = "sort-orders"; +constexpr std::string_view kDefaultSortOrderId = "default-sort-order-id"; +constexpr std::string_view kRefs = "refs"; +constexpr std::string_view kStatistics = "statistics"; +constexpr std::string_view kPartitionStatistics = "partition-statistics"; +constexpr std::string_view kNextRowId = "next-row-id"; +constexpr std::string_view kMetadataFile = "metadata-file"; +constexpr std::string_view kStatisticsPath = "statistics-path"; +constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes"; +constexpr std::string_view kFileFooterSizeInBytes = "file-footer-size-in-bytes"; +constexpr std::string_view kBlobMetadata = "blob-metadata"; + template -Result GetJsonValue(const nlohmann::json& json, std::string_view key) { - if (!json.contains(key)) { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Missing '{}' in {}", key, json.dump()), - }); +void SetOptionalField(nlohmann::json& json, std::string_view key, + const std::optional& value) { + if (value.has_value()) { + json[key] = *value; } +} + +template +Result GetJsonValueImpl(const nlohmann::json& json, std::string_view key) { try { return json.at(key).get(); } catch (const std::exception& ex) { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Failed to parse key '{}' in {}", key, json.dump()), - }); + return JsonParseError("Failed to parse key '{}' in {}", key, json.dump()); } } @@ -147,22 +184,154 @@ Result> GetJsonValueOptional(const nlohmann::json& json, if (!json.contains(key)) { return std::nullopt; } - try { - return json.at(key).get(); - } catch (const std::exception& ex) { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Failed to parse key '{}' in {}", key, json.dump()), - }); + return GetJsonValueImpl(json, key); +} + +template +Result GetJsonValue(const nlohmann::json& json, std::string_view key) { + if (!json.contains(key)) { + return JsonParseError("Missing '{}' in {}", key, json.dump()); } + return GetJsonValueImpl(json, key); } template -void SetOptionalField(nlohmann::json& json, std::string_view key, - const std::optional& value) { - if (value.has_value()) { - json[key] = *value; +Result GetJsonValueOrDefault(const nlohmann::json& json, std::string_view key, + T default_value = T{}) { + if (!json.contains(key)) { + return default_value; } + return GetJsonValueImpl(json, key); +} + +/// \brief Convert a list of items to a json array. +/// +/// Note that ToJson(const T&) is required for this function to work. +template +nlohmann::json::array_t ToJsonList(const std::vector& list) { + return std::accumulate(list.cbegin(), list.cend(), nlohmann::json::array(), + [](nlohmann::json::array_t arr, const T& item) { + arr.push_back(ToJson(item)); + return arr; + }); +} + +/// \brief Overload of the above function for a list of shared pointers. +template +nlohmann::json::array_t ToJsonList(const std::vector>& list) { + return std::accumulate(list.cbegin(), list.cend(), nlohmann::json::array(), + [](nlohmann::json::array_t arr, const std::shared_ptr& item) { + arr.push_back(ToJson(*item)); + return arr; + }); +} + +/// \brief Parse a list of items from a JSON object. +/// +/// \param[in] json The JSON object to parse. +/// \param[in] key The key to parse. +/// \param[in] from_json The function to parse an item from a JSON object. +/// \return The list of items. +template +Result> FromJsonList( + const nlohmann::json& json, std::string_view key, + const std::function(const nlohmann::json&)>& from_json) { + std::vector list{}; + if (json.contains(key)) { + ICEBERG_ASSIGN_OR_RAISE(auto list_json, GetJsonValue(json, key)); + if (!list_json.is_array()) { + return JsonParseError("Cannot parse '{}' from non-array: {}", key, + list_json.dump()); + } + for (const auto& entry_json : list_json) { + ICEBERG_ASSIGN_OR_RAISE(auto entry, from_json(entry_json)); + list.emplace_back(std::move(entry)); + } + } + return {}; +} + +/// \brief Parse a list of items from a JSON object. +/// +/// \param[in] json The JSON object to parse. +/// \param[in] key The key to parse. +/// \param[in] from_json The function to parse an item from a JSON object. +/// \return The list of items. +template +Result>> FromJsonList( + const nlohmann::json& json, std::string_view key, + const std::function>(const nlohmann::json&)>& from_json) { + std::vector> list{}; + if (json.contains(key)) { + ICEBERG_ASSIGN_OR_RAISE(auto list_json, GetJsonValue(json, key)); + if (!list_json.is_array()) { + return JsonParseError("Cannot parse '{}' from non-array: {}", key, + list_json.dump()); + } + for (const auto& entry_json : list_json) { + ICEBERG_ASSIGN_OR_RAISE(auto entry, from_json(entry_json)); + list.emplace_back(std::move(entry)); + } + } + return list; +} + +/// \brief Convert a map of type to a json object. +/// +/// Note that ToJson(const T&) is required for this function to work. +template +nlohmann::json::object_t ToJsonMap(const std::unordered_map& map) { + return std::accumulate(map.cbegin(), map.cend(), nlohmann::json::object(), + [](nlohmann::json::object_t obj, const auto& item) { + obj[item.first] = ToJson(item.second); + return obj; + }); +} + +/// \brief Overload of the above function for a map of type >. +template +nlohmann::json::object_t ToJsonMap( + const std::unordered_map>& map) { + return std::accumulate(map.cbegin(), map.cend(), nlohmann::json::object(), + [](nlohmann::json::object_t obj, const auto& item) { + obj[item.first] = ToJson(*item.second); + return obj; + }); +} + +/// \brief Parse a map of type from a JSON object. +/// +/// \param[in] json The JSON object to parse. +/// \param[in] key The key to parse. +/// \param[in] from_json The function to parse an item from a JSON object. +/// \return The map of items. +template +Result> FromJsonMap( + const nlohmann::json& json, std::string_view key, + const std::function(const nlohmann::json&)>& from_json = + [](const nlohmann::json& json) -> Result { + static_assert(std::is_same_v, "T must be std::string"); + try { + return json.get(); + } catch (const std::exception& ex) { + return JsonParseError("Cannot parse {} to a string value: {}", json.dump(), + ex.what()); + } + }) { + std::unordered_map map{}; + if (json.contains(key)) { + ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue(json, key)); + if (!map_json.is_object()) { + return JsonParseError("Cannot parse '{}' from non-object: {}", key, + map_json.dump()); + } + for (const auto& [key, value] : map_json.items()) { + ICEBERG_ASSIGN_OR_RAISE(auto entry, from_json(value)); + map[key] = std::move(entry); + } + } + return map; } } // namespace @@ -215,16 +384,16 @@ Result> SortOrderFromJson(const nlohmann::json& json) return std::make_unique(order_id, std::move(sort_fields)); } -nlohmann::json FieldToJson(const SchemaField& field) { +nlohmann::json ToJson(const SchemaField& field) { nlohmann::json json; json[kId] = field.field_id(); json[kName] = field.name(); json[kRequired] = !field.optional(); - json[kType] = TypeToJson(*field.type()); + json[kType] = ToJson(*field.type()); return json; } -nlohmann::json TypeToJson(const Type& type) { +nlohmann::json ToJson(const Type& type) { switch (type.type_id()) { case TypeId::kStruct: { const auto& struct_type = static_cast(type); @@ -232,7 +401,7 @@ nlohmann::json TypeToJson(const Type& type) { json[kType] = kStruct; nlohmann::json fields_json = nlohmann::json::array(); for (const auto& field : struct_type.fields()) { - fields_json.push_back(FieldToJson(field)); + fields_json.push_back(ToJson(field)); // TODO(gangwu): add default values } json[kFields] = fields_json; @@ -246,7 +415,7 @@ nlohmann::json TypeToJson(const Type& type) { const auto& element_field = list_type.fields().front(); json[kElementId] = element_field.field_id(); json[kElementRequired] = !element_field.optional(); - json[kElement] = TypeToJson(*element_field.type()); + json[kElement] = ToJson(*element_field.type()); return json; } case TypeId::kMap: { @@ -256,12 +425,12 @@ nlohmann::json TypeToJson(const Type& type) { const auto& key_field = map_type.key(); json[kKeyId] = key_field.field_id(); - json[kKey] = TypeToJson(*key_field.type()); + json[kKey] = ToJson(*key_field.type()); const auto& value_field = map_type.value(); json[kValueId] = value_field.field_id(); json[kValueRequired] = !value_field.optional(); - json[kValue] = TypeToJson(*value_field.type()); + json[kValue] = ToJson(*value_field.type()); return json; } case TypeId::kBoolean: @@ -300,8 +469,8 @@ nlohmann::json TypeToJson(const Type& type) { } } -nlohmann::json SchemaToJson(const Schema& schema) { - nlohmann::json json = TypeToJson(static_cast(schema)); +nlohmann::json ToJson(const Schema& schema) { + nlohmann::json json = ToJson(static_cast(schema)); json[kSchemaId] = schema.schema_id(); // TODO(gangwu): add identifier-field-ids. return json; @@ -327,7 +496,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) { nlohmann::json json; json[kSnapshotId] = snapshot.snapshot_id; SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id); - if (snapshot.sequence_number > kInitialSequenceNumber) { + if (snapshot.sequence_number > TableMetadata::kInitialSequenceNumber) { json[kSequenceNumber] = snapshot.sequence_number; } json[kTimestampMs] = snapshot.timestamp_ms; @@ -417,10 +586,7 @@ Result> TypeFromJson(const nlohmann::json& json) { if (std::regex_match(type_str, match, fixed_regex)) { return std::make_unique(std::stoi(match[1].str())); } - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Invalid fixed type: {}", type_str), - }); + return JsonParseError("Invalid fixed type: {}", type_str); } else if (type_str.starts_with("decimal")) { std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))"); std::smatch match; @@ -428,15 +594,9 @@ Result> TypeFromJson(const nlohmann::json& json) { return std::make_unique(std::stoi(match[1].str()), std::stoi(match[2].str())); } - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Invalid decimal type: {}", type_str), - }); + return JsonParseError("Invalid decimal type: {}", type_str); } else { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Unknown primitive type: {}", type_str), - }); + return JsonParseError("Unknown primitive type: {}", type_str); } } @@ -449,10 +609,7 @@ Result> TypeFromJson(const nlohmann::json& json) { } else if (type_str == kMap) { return MapTypeFromJson(json); } else { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Unknown complex type: {}", type_str), - }); + return JsonParseError("Unknown complex type: {}", type_str); } } @@ -472,10 +629,7 @@ Result> SchemaFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json)); if (type->type_id() != TypeId::kStruct) [[unlikely]] { - return unexpected({ - .kind = ErrorKind::kJsonParseError, - .message = std::format("Schema must be a struct type, but got {}", json.dump()), - }); + return JsonParseError("Schema must be a struct type, but got {}", json.dump()); } auto& struct_type = static_cast(*type); @@ -602,8 +756,429 @@ Result> SnapshotFromJson(const nlohmann::json& json) { return std::make_unique( snapshot_id, parent_snapshot_id, - sequence_number.has_value() ? *sequence_number : kInitialSequenceNumber, - timestamp_ms, manifest_list, std::move(summary), schema_id); + sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms, + manifest_list, std::move(summary), schema_id); +} + +nlohmann::json ToJson(const BlobMetadata& blob_metadata) { + nlohmann::json json; + json[kType] = blob_metadata.type; + json[kSnapshotId] = blob_metadata.source_snapshot_id; + json[kSequenceNumber] = blob_metadata.source_snapshot_sequence_number; + json[kFields] = blob_metadata.fields; + if (!blob_metadata.properties.empty()) { + json[kProperties] = blob_metadata.properties; + } + return json; +} + +Result BlobMetadataFromJson(const nlohmann::json& json) { + BlobMetadata blob_metadata; + ICEBERG_ASSIGN_OR_RAISE(blob_metadata.type, GetJsonValue(json, kType)); + ICEBERG_ASSIGN_OR_RAISE(blob_metadata.source_snapshot_id, + GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(blob_metadata.source_snapshot_sequence_number, + GetJsonValue(json, kSequenceNumber)); + ICEBERG_ASSIGN_OR_RAISE(blob_metadata.fields, + GetJsonValue>(json, kFields)); + ICEBERG_ASSIGN_OR_RAISE( + blob_metadata.properties, + (GetJsonValueOrDefault>(json, + kProperties))); + return blob_metadata; +} + +nlohmann::json ToJson(const StatisticsFile& statistics_file) { + nlohmann::json json; + json[kSnapshotId] = statistics_file.snapshot_id; + json[kStatisticsPath] = statistics_file.path; + json[kFileSizeInBytes] = statistics_file.file_size_in_bytes; + json[kFileFooterSizeInBytes] = statistics_file.file_footer_size_in_bytes; + + nlohmann::json blob_metadata_array = nlohmann::json::array(); + for (const auto& blob_metadata : statistics_file.blob_metadata) { + blob_metadata_array.push_back(ToJson(blob_metadata)); + } + json[kBlobMetadata] = blob_metadata_array; + + return json; +} + +Result> StatisticsFileFromJson( + const nlohmann::json& json) { + auto stats_file = std::make_unique(); + ICEBERG_ASSIGN_OR_RAISE(stats_file->snapshot_id, + GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(stats_file->path, + GetJsonValue(json, kStatisticsPath)); + ICEBERG_ASSIGN_OR_RAISE(stats_file->file_size_in_bytes, + GetJsonValue(json, kFileSizeInBytes)); + ICEBERG_ASSIGN_OR_RAISE(stats_file->file_footer_size_in_bytes, + GetJsonValue(json, kFileFooterSizeInBytes)); + + ICEBERG_ASSIGN_OR_RAISE(auto blob_metadata_array, + GetJsonValue(json, kBlobMetadata)); + for (const auto& blob_json : blob_metadata_array) { + ICEBERG_ASSIGN_OR_RAISE(auto blob, BlobMetadataFromJson(blob_json)); + stats_file->blob_metadata.push_back(std::move(blob)); + } + + return stats_file; +} + +nlohmann::json ToJson(const PartitionStatisticsFile& partition_statistics_file) { + nlohmann::json json; + json[kSnapshotId] = partition_statistics_file.snapshot_id; + json[kStatisticsPath] = partition_statistics_file.path; + json[kFileSizeInBytes] = partition_statistics_file.file_size_in_bytes; + return json; +} + +Result> PartitionStatisticsFileFromJson( + const nlohmann::json& json) { + auto stats_file = std::make_unique(); + ICEBERG_ASSIGN_OR_RAISE(stats_file->snapshot_id, + GetJsonValue(json, kSnapshotId)); + ICEBERG_ASSIGN_OR_RAISE(stats_file->path, + GetJsonValue(json, kStatisticsPath)); + ICEBERG_ASSIGN_OR_RAISE(stats_file->file_size_in_bytes, + GetJsonValue(json, kFileSizeInBytes)); + return stats_file; +} + +nlohmann::json ToJson(const SnapshotLogEntry& snapshot_log_entry) { + nlohmann::json json; + json[kTimestampMs] = UnixMsFromTimePointMs(snapshot_log_entry.timestamp_ms); + json[kSnapshotId] = snapshot_log_entry.snapshot_id; + return json; +} + +Result SnapshotLogEntryFromJson(const nlohmann::json& json) { + SnapshotLogEntry snapshot_log_entry; + ICEBERG_ASSIGN_OR_RAISE( + snapshot_log_entry.timestamp_ms, + GetJsonValue(json, kTimestampMs).and_then(TimePointMsFromUnixMs)); + ICEBERG_ASSIGN_OR_RAISE(snapshot_log_entry.snapshot_id, + GetJsonValue(json, kSnapshotId)); + return snapshot_log_entry; +} + +nlohmann::json ToJson(const MetadataLogEntry& metadata_log_entry) { + nlohmann::json json; + json[kTimestampMs] = UnixMsFromTimePointMs(metadata_log_entry.timestamp_ms); + json[kMetadataFile] = metadata_log_entry.metadata_file; + return json; +} + +Result MetadataLogEntryFromJson(const nlohmann::json& json) { + MetadataLogEntry metadata_log_entry; + ICEBERG_ASSIGN_OR_RAISE( + metadata_log_entry.timestamp_ms, + GetJsonValue(json, kTimestampMs).and_then(TimePointMsFromUnixMs)); + ICEBERG_ASSIGN_OR_RAISE(metadata_log_entry.metadata_file, + GetJsonValue(json, kMetadataFile)); + return metadata_log_entry; +} + +nlohmann::json ToJson(const TableMetadata& table_metadata) { + nlohmann::json json; + + json[kFormatVersion] = table_metadata.format_version; + json[kTableUuid] = table_metadata.table_uuid; + json[kLocation] = table_metadata.location; + if (table_metadata.format_version > 1) { + json[kLastSequenceNumber] = table_metadata.last_sequence_number; + } + json[kLastUpdatedMs] = UnixMsFromTimePointMs(table_metadata.last_updated_ms); + json[kLastColumnId] = table_metadata.last_column_id; + + // for older readers, continue writing the current schema as "schema". + // this is only needed for v1 because support for schemas and current-schema-id + // is required in v2 and later. + if (table_metadata.format_version == 1) { + for (const auto& schema : table_metadata.schemas) { + if (schema->schema_id() == table_metadata.current_schema_id) { + json[kSchema] = ToJson(*schema); + break; + } + } + } + + // write the current schema ID and schema list + json[kCurrentSchemaId] = table_metadata.current_schema_id; + json[kSchemas] = ToJsonList(table_metadata.schemas); + + // for older readers, continue writing the default spec as "partition-spec" + if (table_metadata.format_version == 1) { + for (const auto& partition_spec : table_metadata.partition_specs) { + if (partition_spec->spec_id() == table_metadata.default_spec_id) { + json[kPartitionSpec] = ToJson(*partition_spec); + break; + } + } + } + + // write the default spec ID and spec list + json[kDefaultSpecId] = table_metadata.default_spec_id; + json[kPartitionSpecs] = ToJsonList(table_metadata.partition_specs); + json[kLastPartitionId] = table_metadata.last_partition_id; + + // write the default order ID and sort order list + json[kDefaultSortOrderId] = table_metadata.default_sort_order_id; + json[kSortOrders] = ToJsonList(table_metadata.sort_orders); + + // write properties map + json[kProperties] = table_metadata.properties; + + if (std::ranges::find_if(table_metadata.snapshots, [&](const auto& snapshot) { + return snapshot->snapshot_id == table_metadata.current_snapshot_id; + }) != table_metadata.snapshots.cend()) { + json[kCurrentSnapshotId] = table_metadata.current_snapshot_id; + } else { + json[kCurrentSnapshotId] = nlohmann::json::value_t::null; + } + + if (table_metadata.format_version >= 3) { + json[kNextRowId] = table_metadata.next_row_id; + } + + json[kRefs] = ToJsonMap(table_metadata.refs); + json[kSnapshots] = ToJsonList(table_metadata.snapshots); + json[kStatistics] = ToJsonList(table_metadata.statistics); + json[kPartitionStatistics] = ToJsonList(table_metadata.partition_statistics); + json[kSnapshotLog] = ToJsonList(table_metadata.snapshot_log); + json[kMetadataLog] = ToJsonList(table_metadata.metadata_log); + + return json; +} + +namespace { + +/// \brief Parse the schemas from the JSON object. +/// +/// \param[in] json The JSON object to parse. +/// \param[in] format_version The format version of the table. +/// \param[out] current_schema_id The current schema ID. +/// \param[out] schemas The list of schemas. +/// +/// \return The current schema or parse error. +Result> ParseSchemas( + const nlohmann::json& json, int8_t format_version, int32_t& current_schema_id, + std::vector>& schemas) { + std::shared_ptr current_schema; + if (json.contains(kSchemas)) { + ICEBERG_ASSIGN_OR_RAISE(auto schema_array, + GetJsonValue(json, kSchemas)); + if (!schema_array.is_array()) { + return JsonParseError("Cannot parse schemas from non-array: {}", + schema_array.dump()); + } + + ICEBERG_ASSIGN_OR_RAISE(current_schema_id, + GetJsonValue(json, kCurrentSchemaId)); + for (const auto& schema_json : schema_array) { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr schema, + SchemaFromJson(schema_json)); + if (schema->schema_id() == current_schema_id) { + current_schema = schema; + } + schemas.push_back(std::move(schema)); + } + if (!current_schema) { + return JsonParseError("Cannot find schema with {}={} from {}", kCurrentSchemaId, + current_schema_id, schema_array.dump()); + } + } else { + if (format_version != 1) { + return JsonParseError("{} must exist in format v{}", kSchemas, format_version); + } + ICEBERG_ASSIGN_OR_RAISE(auto schema_json, + GetJsonValue(json, kSchema)); + ICEBERG_ASSIGN_OR_RAISE(current_schema, SchemaFromJson(schema_json)); + current_schema_id = current_schema->schema_id(); + schemas.push_back(current_schema); + } + return current_schema; +} + +/// \brief Parse the partition specs from the JSON object. +/// +/// \param[in] json The JSON object to parse. +/// \param[in] format_version The format version of the table. +/// \param[in] current_schema The current schema. +/// \param[out] default_spec_id The default partition spec ID. +/// \param[out] partition_specs The list of partition specs. +Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version, + const std::shared_ptr& current_schema, + int32_t& default_spec_id, + std::vector>& partition_specs) { + if (json.contains(kPartitionSpecs)) { + ICEBERG_ASSIGN_OR_RAISE(auto spec_array, + GetJsonValue(json, kPartitionSpecs)); + if (!spec_array.is_array()) { + return JsonParseError("Cannot parse partition specs from non-array: {}", + spec_array.dump()); + } + ICEBERG_ASSIGN_OR_RAISE(default_spec_id, GetJsonValue(json, kDefaultSpecId)); + + for (const auto& spec_json : spec_array) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, + PartitionSpecFromJson(current_schema, spec_json)); + partition_specs.push_back(std::move(spec)); + } + } else { + if (format_version != 1) { + return JsonParseError("{} must exist in format v{}", kPartitionSpecs, + format_version); + } + default_spec_id = TableMetadata::kInitialSpecId; + + ICEBERG_ASSIGN_OR_RAISE(auto spec, GetJsonValue(json, kPartitionSpec) + .and_then([current_schema](const auto& json) { + return PartitionSpecFromJson(current_schema, + json); + })); + partition_specs.push_back(std::move(spec)); + } + + return {}; +} + +/// \brief Parse the sort orders from the JSON object. +/// +/// \param[in] json The JSON object to parse. +/// \param[in] format_version The format version of the table. +/// \param[out] default_sort_order_id The default sort order ID. +/// \param[out] sort_orders The list of sort orders. +Status ParseSortOrders(const nlohmann::json& json, int8_t format_version, + int32_t& default_sort_order_id, + std::vector>& sort_orders) { + if (json.contains(kSortOrders)) { + ICEBERG_ASSIGN_OR_RAISE(default_sort_order_id, + GetJsonValue(json, kDefaultSortOrderId)); + ICEBERG_ASSIGN_OR_RAISE(auto sort_order_array, + GetJsonValue(json, kSortOrders)); + for (const auto& sort_order_json : sort_order_array) { + ICEBERG_ASSIGN_OR_RAISE(auto sort_order, SortOrderFromJson(sort_order_json)); + sort_orders.push_back(std::move(sort_order)); + } + } else { + if (format_version > 1) { + return JsonParseError("{} must exist in format v{}", kSortOrders, format_version); + } + return NotImplementedError("Assign a default sort order"); + } + return {}; +} + +} // namespace + +Result> TableMetadataFromJson(const nlohmann::json& json) { + if (!json.is_object()) { + return JsonParseError("Cannot parse metadata from a non-object: {}", json.dump()); + } + + auto table_metadata = std::make_unique(); + + ICEBERG_ASSIGN_OR_RAISE(table_metadata->format_version, + GetJsonValue(json, kFormatVersion)); + if (table_metadata->format_version < 1 || + table_metadata->format_version > TableMetadata::kSupportedTableFormatVersion) { + return JsonParseError("Cannot read unsupported version: {}", + table_metadata->format_version); + } + + ICEBERG_ASSIGN_OR_RAISE(table_metadata->table_uuid, + GetJsonValueOrDefault(json, kTableUuid)); + ICEBERG_ASSIGN_OR_RAISE(table_metadata->location, + GetJsonValue(json, kLocation)); + + if (table_metadata->format_version > 1) { + ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_sequence_number, + GetJsonValue(json, kLastSequenceNumber)); + } else { + table_metadata->last_sequence_number = TableMetadata::kInitialSequenceNumber; + } + ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_column_id, + GetJsonValue(json, kLastColumnId)); + + ICEBERG_ASSIGN_OR_RAISE( + auto current_schema, + ParseSchemas(json, table_metadata->format_version, + table_metadata->current_schema_id, table_metadata->schemas)); + + ICEBERG_RETURN_UNEXPECTED(ParsePartitionSpecs( + json, table_metadata->format_version, current_schema, + table_metadata->default_spec_id, table_metadata->partition_specs)); + + if (json.contains(kLastPartitionId)) { + ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_partition_id, + GetJsonValue(json, kLastPartitionId)); + } else { + if (table_metadata->format_version > 1) { + return JsonParseError("{} must exist in format v{}", kLastPartitionId, + table_metadata->format_version); + } + // TODO(gangwu): iterate all partition specs to find the largest partition + // field id or assign a default value for unpartitioned tables. However, + // PartitionSpec::lastAssignedFieldId() is not implemented yet. + return NotImplementedError("Find the largest partition field id"); + } + + ICEBERG_RETURN_UNEXPECTED(ParseSortOrders(json, table_metadata->format_version, + table_metadata->default_sort_order_id, + table_metadata->sort_orders)); + + if (json.contains(kProperties)) { + ICEBERG_ASSIGN_OR_RAISE(table_metadata->properties, FromJsonMap(json, kProperties)); + } + + // This field is optional, but internally we set this to -1 when not set + ICEBERG_ASSIGN_OR_RAISE( + table_metadata->current_snapshot_id, + GetJsonValueOrDefault(json, kCurrentSnapshotId, + TableMetadata::kInvalidSnapshotId)); + + if (table_metadata->format_version >= 3) { + ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id, + GetJsonValue(json, kNextRowId)); + } else { + table_metadata->next_row_id = TableMetadata::kInitialRowId; + } + + ICEBERG_ASSIGN_OR_RAISE(auto last_updated_ms, + GetJsonValue(json, kLastUpdatedMs)); + table_metadata->last_updated_ms = + TimePointMs{std::chrono::milliseconds(last_updated_ms)}; + + if (json.contains(kRefs)) { + ICEBERG_ASSIGN_OR_RAISE( + table_metadata->refs, + FromJsonMap>(json, kRefs, SnapshotRefFromJson)); + } else if (table_metadata->current_snapshot_id != TableMetadata::kInvalidSnapshotId) { + table_metadata->refs["main"] = std::make_unique(SnapshotRef{ + .snapshot_id = table_metadata->current_snapshot_id, + .retention = SnapshotRef::Branch{}, + }); + } + + ICEBERG_ASSIGN_OR_RAISE(table_metadata->snapshots, + FromJsonList(json, kSnapshots, SnapshotFromJson)); + ICEBERG_ASSIGN_OR_RAISE( + table_metadata->statistics, + FromJsonList(json, kStatistics, StatisticsFileFromJson)); + ICEBERG_ASSIGN_OR_RAISE( + table_metadata->partition_statistics, + FromJsonList(json, kPartitionStatistics, + PartitionStatisticsFileFromJson)); + ICEBERG_ASSIGN_OR_RAISE( + table_metadata->snapshot_log, + FromJsonList(json, kSnapshotLog, SnapshotLogEntryFromJson)); + ICEBERG_ASSIGN_OR_RAISE( + table_metadata->metadata_log, + FromJsonList(json, kMetadataLog, MetadataLogEntryFromJson)); + + return table_metadata; } } // namespace iceberg diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h index 450578ff1..8881d5e0c 100644 --- a/src/iceberg/json_internal.h +++ b/src/iceberg/json_internal.h @@ -24,6 +24,8 @@ #include #include "iceberg/result.h" +#include "iceberg/statistics_file.h" +#include "iceberg/table_metadata.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -38,16 +40,6 @@ namespace iceberg { /// \return A JSON object representing the `SortField` in the form of key-value pairs. nlohmann::json ToJson(const SortField& sort_field); -/// \brief Serializes a `SortOrder` object to JSON. -/// -/// This function converts a `SortOrder` object into a JSON representation. -/// The resulting JSON includes the order ID and a list of `SortField` objects. -/// Each `SortField` is serialized as described in the `ToJson(SortField)` function. -/// -/// \param sort_order The `SortOrder` object to be serialized. -/// \return A JSON object representing the `SortOrder` with its order ID and fields array. -nlohmann::json ToJson(const SortOrder& sort_order); - /// \brief Deserializes a JSON object into a `SortField` object. /// /// This function parses the provided JSON and creates a `SortField` object. @@ -59,6 +51,16 @@ nlohmann::json ToJson(const SortOrder& sort_order); /// JSON is malformed or missing expected fields, an error will be returned. Result> SortFieldFromJson(const nlohmann::json& json); +/// \brief Serializes a `SortOrder` object to JSON. +/// +/// This function converts a `SortOrder` object into a JSON representation. +/// The resulting JSON includes the order ID and a list of `SortField` objects. +/// Each `SortField` is serialized as described in the `ToJson(SortField)` function. +/// +/// \param sort_order The `SortOrder` object to be serialized. +/// \return A JSON object representing the `SortOrder` with its order ID and fields array. +nlohmann::json ToJson(const SortOrder& sort_order); + /// \brief Deserializes a JSON object into a `SortOrder` object. /// /// This function parses the provided JSON and creates a `SortOrder` object. @@ -74,31 +76,7 @@ Result> SortOrderFromJson(const nlohmann::json& json) /// /// \param[in] schema The Iceberg schema to convert. /// \return The JSON representation of the schema. -nlohmann::json SchemaToJson(const Schema& schema); - -/// \brief Convert an Iceberg Type to JSON. -/// -/// \param[in] type The Iceberg type to convert. -/// \return The JSON representation of the type. -nlohmann::json TypeToJson(const Type& type); - -/// \brief Convert an Iceberg SchemaField to JSON. -/// -/// \param[in] field The Iceberg field to convert. -/// \return The JSON representation of the field. -nlohmann::json FieldToJson(const SchemaField& field); - -/// \brief Serializes a `SnapshotRef` object to JSON. -/// -/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized. -/// \return A JSON object representing the `SnapshotRef`. -nlohmann::json ToJson(const SnapshotRef& snapshot_ref); - -/// \brief Serializes a `Snapshot` object to JSON. -/// -/// \param[in] snapshot The `Snapshot` object to be serialized. -/// \return A JSON object representing the `snapshot`. -nlohmann::json ToJson(const Snapshot& snapshot); +nlohmann::json ToJson(const Schema& schema); /// \brief Convert JSON to an Iceberg Schema. /// @@ -106,12 +84,24 @@ nlohmann::json ToJson(const Snapshot& snapshot); /// \return The Iceberg schema or an error if the conversion fails. Result> SchemaFromJson(const nlohmann::json& json); +/// \brief Convert an Iceberg Type to JSON. +/// +/// \param[in] type The Iceberg type to convert. +/// \return The JSON representation of the type. +nlohmann::json ToJson(const Type& type); + /// \brief Convert JSON to an Iceberg Type. /// /// \param[in] json The JSON representation of the type. /// \return The Iceberg type or an error if the conversion fails. Result> TypeFromJson(const nlohmann::json& json); +/// \brief Convert an Iceberg SchemaField to JSON. +/// +/// \param[in] field The Iceberg field to convert. +/// \return The JSON representation of the field. +nlohmann::json ToJson(const SchemaField& field); + /// \brief Convert JSON to an Iceberg SchemaField. /// /// \param[in] json The JSON representation of the field. @@ -129,18 +119,6 @@ Result> FieldFromJson(const nlohmann::json& json); /// pairs. nlohmann::json ToJson(const PartitionField& partition_field); -/// \brief Serializes a `PartitionSpec` object to JSON. -/// -/// This function converts a `PartitionSpec` object into a JSON representation. -/// The resulting JSON includes the spec ID and a list of `PartitionField` objects. -/// Each `PartitionField` is serialized as described in the `ToJson(PartitionField)` -/// function. -/// -/// \param partition_spec The `PartitionSpec` object to be serialized. -/// \return A JSON object representing the `PartitionSpec` with its order ID and fields -/// array. -nlohmann::json ToJson(const PartitionSpec& partition_spec); - /// \brief Deserializes a JSON object into a `PartitionField` object. /// /// This function parses the provided JSON and creates a `PartitionField` object. @@ -153,6 +131,18 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec); Result> PartitionFieldFromJson( const nlohmann::json& json); +/// \brief Serializes a `PartitionSpec` object to JSON. +/// +/// This function converts a `PartitionSpec` object into a JSON representation. +/// The resulting JSON includes the spec ID and a list of `PartitionField` objects. +/// Each `PartitionField` is serialized as described in the `ToJson(PartitionField)` +/// function. +/// +/// \param partition_spec The `PartitionSpec` object to be serialized. +/// \return A JSON object representing the `PartitionSpec` with its order ID and fields +/// array. +nlohmann::json ToJson(const PartitionSpec& partition_spec); + /// \brief Deserializes a JSON object into a `PartitionSpec` object. /// /// This function parses the provided JSON and creates a `PartitionSpec` object. @@ -166,16 +156,90 @@ Result> PartitionFieldFromJson( Result> PartitionSpecFromJson( const std::shared_ptr& schema, const nlohmann::json& json); +/// \brief Serializes a `SnapshotRef` object to JSON. +/// +/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized. +/// \return A JSON object representing the `SnapshotRef`. +nlohmann::json ToJson(const SnapshotRef& snapshot_ref); + /// \brief Deserializes a JSON object into a `SnapshotRef` object. /// /// \param[in] json The JSON object representing a `SnapshotRef`. /// \return A `SnapshotRef` object or an error if the conversion fails. Result> SnapshotRefFromJson(const nlohmann::json& json); +/// \brief Serializes a `Snapshot` object to JSON. +/// +/// \param[in] snapshot The `Snapshot` object to be serialized. +/// \return A JSON object representing the `snapshot`. +nlohmann::json ToJson(const Snapshot& snapshot); + /// \brief Deserializes a JSON object into a `Snapshot` object. /// /// \param[in] json The JSON representation of the snapshot. /// \return A `Snapshot` object or an error if the conversion fails. Result> SnapshotFromJson(const nlohmann::json& json); +/// \brief Serializes a `StatisticsFile` object to JSON. +/// +/// \param statistics_file The `StatisticsFile` object to be serialized. +/// \return A JSON object representing the `StatisticsFile`. +nlohmann::json ToJson(const StatisticsFile& statistics_file); + +/// \brief Deserializes a JSON object into a `StatisticsFile` object. +/// +/// \param json The JSON object representing a `StatisticsFile`. +/// \return A `StatisticsFile` object or an error if the conversion fails. +Result> StatisticsFileFromJson( + const nlohmann::json& json); + +/// \brief Serializes a `PartitionStatisticsFile` object to JSON. +/// +/// \param partition_statistics_file The `PartitionStatisticsFile` object to be +/// serialized. \return A JSON object representing the `PartitionStatisticsFile`. +nlohmann::json ToJson(const PartitionStatisticsFile& partition_statistics_file); + +/// \brief Deserializes a JSON object into a `PartitionStatisticsFile` object. +/// +/// \param json The JSON object representing a `PartitionStatisticsFile`. +/// \return A `PartitionStatisticsFile` object or an error if the conversion fails. +Result> PartitionStatisticsFileFromJson( + const nlohmann::json& json); + +/// \brief Serializes a `SnapshotLogEntry` object to JSON. +/// +/// \param snapshot_log_entry The `SnapshotLogEntry` object to be serialized. +/// \return A JSON object representing the `SnapshotLogEntry`. +nlohmann::json ToJson(const SnapshotLogEntry& snapshot_log_entry); + +/// \brief Deserializes a JSON object into a `SnapshotLogEntry` object. +/// +/// \param json The JSON object representing a `SnapshotLogEntry`. +/// \return A `SnapshotLogEntry` object or an error if the conversion fails. +Result SnapshotLogEntryFromJson(const nlohmann::json& json); + +/// \brief Serializes a `MetadataLogEntry` object to JSON. +/// +/// \param metadata_log_entry The `MetadataLogEntry` object to be serialized. +/// \return A JSON object representing the `MetadataLogEntry`. +nlohmann::json ToJson(const MetadataLogEntry& metadata_log_entry); + +/// \brief Deserializes a JSON object into a `MetadataLogEntry` object. +/// +/// \param json The JSON object representing a `MetadataLogEntry`. +/// \return A `MetadataLogEntry` object or an error if the conversion fails. +Result MetadataLogEntryFromJson(const nlohmann::json& json); + +/// \brief Serializes a `TableMetadata` object to JSON. +/// +/// \param table_metadata The `TableMetadata` object to be serialized. +/// \return A JSON object representing the `TableMetadata`. +nlohmann::json ToJson(const TableMetadata& table_metadata); + +/// \brief Deserializes a JSON object into a `TableMetadata` object. +/// +/// \param json The JSON object representing a `TableMetadata`. +/// \return A `TableMetadata` object or an error if the conversion fails. +Result> TableMetadataFromJson(const nlohmann::json& json); + } // namespace iceberg diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 7d12f367d..4549ea724 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include "iceberg/expected.h" @@ -60,4 +61,20 @@ using Result = expected; using Status = Result; +/// \brief Create an unexpected error with kNotImplemented +template +auto NotImplementedError(const std::format_string fmt, Args&&... args) + -> unexpected { + return unexpected({.kind = ErrorKind::kNotImplemented, + .message = std::format(fmt, std::forward(args)...)}); +} + +/// \brief Create an unexpected error with kJsonParseError +template +auto JsonParseError(const std::format_string fmt, Args&&... args) + -> unexpected { + return unexpected({.kind = ErrorKind::kJsonParseError, + .message = std::format(fmt, std::forward(args)...)}); +} + } // namespace iceberg diff --git a/src/iceberg/statistics_file.cc b/src/iceberg/statistics_file.cc index 07f4f531d..5ae753a69 100644 --- a/src/iceberg/statistics_file.cc +++ b/src/iceberg/statistics_file.cc @@ -23,29 +23,26 @@ namespace iceberg { -bool BlobMetadata::Equals(const BlobMetadata& other) const { - return type == other.type && source_snapshot_id == other.source_snapshot_id && - source_snapshot_sequence_number == other.source_snapshot_sequence_number && - fields == other.fields && properties == other.properties; -} - -std::string BlobMetadata::ToString() const { +std::string ToString(const BlobMetadata& blob_metadata) { std::string repr = "BlobMetadata["; std::format_to(std::back_inserter(repr), - "type='{}',sourceSnapshotId={},sourceSnapshotSequenceNumber={},", type, - source_snapshot_id, source_snapshot_sequence_number); + "type='{}',sourceSnapshotId={},sourceSnapshotSequenceNumber={},", + blob_metadata.type, blob_metadata.source_snapshot_id, + blob_metadata.source_snapshot_sequence_number); std::format_to(std::back_inserter(repr), "fields=["); - for (auto iter = fields.cbegin(); iter != fields.cend(); ++iter) { - if (iter != fields.cbegin()) { + for (auto iter = blob_metadata.fields.cbegin(); iter != blob_metadata.fields.cend(); + ++iter) { + if (iter != blob_metadata.fields.cbegin()) { std::format_to(std::back_inserter(repr), ",{}", *iter); } else { std::format_to(std::back_inserter(repr), "{}", *iter); } } std::format_to(std::back_inserter(repr), "],properties=["); - for (auto iter = properties.cbegin(); iter != properties.cend(); ++iter) { + for (auto iter = blob_metadata.properties.cbegin(); + iter != blob_metadata.properties.cend(); ++iter) { const auto& [key, value] = *iter; - if (iter != properties.cbegin()) { + if (iter != blob_metadata.properties.cbegin()) { std::format_to(std::back_inserter(repr), ",{}:{}", key, value); } else { std::format_to(std::back_inserter(repr), "{}:{}", key, value); @@ -55,28 +52,32 @@ std::string BlobMetadata::ToString() const { return repr; } -bool StatisticsFile::Equals(const StatisticsFile& other) const { - return snapshot_id == other.snapshot_id && path == other.path && - file_size_in_bytes == other.file_size_in_bytes && - file_footer_size_in_bytes == other.file_footer_size_in_bytes && - blob_metadata == other.blob_metadata; -} - -std::string StatisticsFile::ToString() const { +std::string ToString(const StatisticsFile& statistics_file) { std::string repr = "StatisticsFile["; std::format_to(std::back_inserter(repr), "snapshotId={},path={},fileSizeInBytes={},fileFooterSizeInBytes={},", - snapshot_id, path, file_size_in_bytes, file_footer_size_in_bytes); + statistics_file.snapshot_id, statistics_file.path, + statistics_file.file_size_in_bytes, + statistics_file.file_footer_size_in_bytes); std::format_to(std::back_inserter(repr), "blobMetadata=["); - for (auto iter = blob_metadata.cbegin(); iter != blob_metadata.cend(); ++iter) { - if (iter != blob_metadata.cbegin()) { - std::format_to(std::back_inserter(repr), ",{}", iter->ToString()); + for (auto iter = statistics_file.blob_metadata.cbegin(); + iter != statistics_file.blob_metadata.cend(); ++iter) { + if (iter != statistics_file.blob_metadata.cbegin()) { + std::format_to(std::back_inserter(repr), ",{}", ToString(*iter)); } else { - std::format_to(std::back_inserter(repr), "{}", iter->ToString()); + std::format_to(std::back_inserter(repr), "{}", ToString(*iter)); } } repr += "]]"; return repr; } +std::string ToString(const PartitionStatisticsFile& partition_statistics_file) { + std::string repr = "PartitionStatisticsFile["; + std::format_to(std::back_inserter(repr), "snapshotId={},path={},fileSizeInBytes={},", + partition_statistics_file.snapshot_id, partition_statistics_file.path, + partition_statistics_file.file_size_in_bytes); + return repr; +} + } // namespace iceberg diff --git a/src/iceberg/statistics_file.h b/src/iceberg/statistics_file.h index 0de9587cd..5bdc1c14c 100644 --- a/src/iceberg/statistics_file.h +++ b/src/iceberg/statistics_file.h @@ -28,12 +28,11 @@ #include #include "iceberg/iceberg_export.h" -#include "iceberg/util/formattable.h" namespace iceberg { /// \brief A metadata about a statistics or indices blob -struct ICEBERG_EXPORT BlobMetadata : public util::Formattable { +struct ICEBERG_EXPORT BlobMetadata { /// Type of the blob std::string type; /// ID of the Iceberg table's snapshot the blob was computed from @@ -47,22 +46,19 @@ struct ICEBERG_EXPORT BlobMetadata : public util::Formattable { /// \brief Compare two BlobMetadatas for equality. friend bool operator==(const BlobMetadata& lhs, const BlobMetadata& rhs) { - return lhs.Equals(rhs); + return lhs.type == rhs.type && lhs.source_snapshot_id == rhs.source_snapshot_id && + lhs.source_snapshot_sequence_number == rhs.source_snapshot_sequence_number && + lhs.fields == rhs.fields && lhs.properties == rhs.properties; } /// \brief Compare two BlobMetadatas for inequality. friend bool operator!=(const BlobMetadata& lhs, const BlobMetadata& rhs) { return !(lhs == rhs); } - - std::string ToString() const override; - - private: - bool Equals(const BlobMetadata& other) const; }; /// \brief Represents a statistics file in the Puffin format -struct ICEBERG_EXPORT StatisticsFile : public util::Formattable { +struct ICEBERG_EXPORT StatisticsFile { /// ID of the Iceberg table's snapshot the statistics file is associated with int64_t snapshot_id; /// Fully qualified path to the file @@ -76,18 +72,16 @@ struct ICEBERG_EXPORT StatisticsFile : public util::Formattable { /// \brief Compare two StatisticsFiles for equality. friend bool operator==(const StatisticsFile& lhs, const StatisticsFile& rhs) { - return lhs.Equals(rhs); + return lhs.snapshot_id == rhs.snapshot_id && lhs.path == rhs.path && + lhs.file_size_in_bytes == rhs.file_size_in_bytes && + lhs.file_footer_size_in_bytes == rhs.file_footer_size_in_bytes && + lhs.blob_metadata == rhs.blob_metadata; } /// \brief Compare two StatisticsFiles for inequality. friend bool operator!=(const StatisticsFile& lhs, const StatisticsFile& rhs) { return !(lhs == rhs); } - - std::string ToString() const override; - - private: - bool Equals(const StatisticsFile& other) const; }; /// \brief Represents a partition statistics file @@ -99,6 +93,29 @@ struct ICEBERG_EXPORT PartitionStatisticsFile { std::string path; /// The size of the partition statistics file in bytes int64_t file_size_in_bytes; + + /// \brief Compare two PartitionStatisticsFiles for equality. + friend bool operator==(const PartitionStatisticsFile& lhs, + const PartitionStatisticsFile& rhs) { + return lhs.snapshot_id == rhs.snapshot_id && lhs.path == rhs.path && + lhs.file_size_in_bytes == rhs.file_size_in_bytes; + } + + /// \brief Compare two PartitionStatisticsFiles for inequality. + friend bool operator!=(const PartitionStatisticsFile& lhs, + const PartitionStatisticsFile& rhs) { + return !(lhs == rhs); + } }; +/// \brief Returns a string representation of a BlobMetadata +ICEBERG_EXPORT std::string ToString(const BlobMetadata& blob_metadata); + +/// \brief Returns a string representation of a StatisticsFile +ICEBERG_EXPORT std::string ToString(const StatisticsFile& statistics_file); + +/// \brief Returns a string representation of a PartitionStatisticsFile +ICEBERG_EXPORT std::string ToString( + const PartitionStatisticsFile& partition_statistics_file); + } // namespace iceberg diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index c30ddecd6..171021445 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -22,17 +22,26 @@ #include #include -#include "iceberg/statistics_file.h" - namespace iceberg { -std::string SnapshotLogEntry::ToString() const { - return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]", timestamp_ms, - snapshot_id); +std::string ToString(const SnapshotLogEntry& entry) { + return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]", + entry.timestamp_ms, entry.snapshot_id); +} + +std::string ToString(const MetadataLogEntry& entry) { + return std::format("MetadataLogEntry[timestampMillis={},file={}]", entry.timestamp_ms, + entry.metadata_file); +} + +Result TimePointMsFromUnixMs(int64_t unix_ms) { + return TimePointMs{std::chrono::milliseconds(unix_ms)}; } -std::string MetadataLogEntry::ToString() const { - return std::format("MetadataLogEntry[timestampMillis={},file={}]", timestamp_ms, file); +int64_t UnixMsFromTimePointMs(const TimePointMs& time_point_ms) { + return std::chrono::duration_cast( + time_point_ms.time_since_epoch()) + .count(); } } // namespace iceberg diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index e1665dce1..2a1afcbfc 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -29,32 +29,51 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" #include "iceberg/type_fwd.h" -#include "iceberg/util/formattable.h" namespace iceberg { +/// \brief A time point in milliseconds using TimePointMs = std::chrono::time_point; +/// \brief Returns a TimePointMs from a Unix timestamp in milliseconds +ICEBERG_EXPORT Result TimePointMsFromUnixMs(int64_t unix_ms); + +/// \brief Returns a Unix timestamp in milliseconds from a TimePointMs +ICEBERG_EXPORT int64_t UnixMsFromTimePointMs(const TimePointMs& time_point_ms); + /// \brief Represents a snapshot log entry -struct ICEBERG_EXPORT SnapshotLogEntry : public util::Formattable { +struct ICEBERG_EXPORT SnapshotLogEntry { /// The timestamp in milliseconds of the change TimePointMs timestamp_ms; /// ID of the snapshot int64_t snapshot_id; - std::string ToString() const override; + friend bool operator==(const SnapshotLogEntry& lhs, const SnapshotLogEntry& rhs) { + return lhs.timestamp_ms == rhs.timestamp_ms && lhs.snapshot_id == rhs.snapshot_id; + } + + friend bool operator!=(const SnapshotLogEntry& lhs, const SnapshotLogEntry& rhs) { + return !(lhs == rhs); + } }; /// \brief Represents a metadata log entry -struct ICEBERG_EXPORT MetadataLogEntry : public util::Formattable { +struct ICEBERG_EXPORT MetadataLogEntry { /// The timestamp in milliseconds of the change TimePointMs timestamp_ms; /// Metadata file location - std::string file; + std::string metadata_file; + + friend bool operator==(const MetadataLogEntry& lhs, const MetadataLogEntry& rhs) { + return lhs.timestamp_ms == rhs.timestamp_ms && lhs.metadata_file == rhs.metadata_file; + } - std::string ToString() const override; + friend bool operator!=(const MetadataLogEntry& lhs, const MetadataLogEntry& rhs) { + return !(lhs == rhs); + } }; /// \brief Represents the metadata for an Iceberg table @@ -63,10 +82,18 @@ struct ICEBERG_EXPORT MetadataLogEntry : public util::Formattable { /// implementation, missing pieces including: 1) Map 2) List 3) Map 4) /// Map -/// -/// TODO(wgtmac): Implement Equals and ToString once SortOrder and Snapshot are -/// implemented. struct ICEBERG_EXPORT TableMetadata { + static constexpr int8_t kDefaultTableFormatVersion = 2; + static constexpr int8_t kSupportedTableFormatVersion = 3; + static constexpr int8_t kMinFormatVersionRowLineage = 3; + static constexpr int32_t kInitialSpecId = 0; + static constexpr int32_t kInitialSortOrderId = 1; + static constexpr int32_t kInitialSchemaId = 0; + static constexpr int64_t kInitialRowId = 0; + static constexpr int64_t kInitialSequenceNumber = 0; + static constexpr int64_t kInvalidSequenceNumber = -1; + static constexpr int64_t kInvalidSnapshotId = -1; + /// An integer version number for the format int8_t format_version; /// A UUID that identifies the table @@ -76,7 +103,7 @@ struct ICEBERG_EXPORT TableMetadata { /// The table's highest assigned sequence number int64_t last_sequence_number; /// Timestamp in milliseconds from the unix epoch when the table was last updated. - int64_t last_updated_ms; + TimePointMs last_updated_ms; /// The highest assigned column ID for the table int32_t last_column_id; /// A list of schemas @@ -106,7 +133,7 @@ struct ICEBERG_EXPORT TableMetadata { /// Default sort order id of the table int32_t default_sort_order_id; /// A map of snapshot references - std::unordered_map refs; + std::unordered_map> refs; /// A list of table statistics std::vector> statistics; /// A list of partition statistics @@ -115,4 +142,10 @@ struct ICEBERG_EXPORT TableMetadata { int64_t next_row_id; }; +/// \brief Returns a string representation of a SnapshotLogEntry +ICEBERG_EXPORT std::string ToString(const SnapshotLogEntry& entry); + +/// \brief Returns a string representation of a MetadataLogEntry +ICEBERG_EXPORT std::string ToString(const MetadataLogEntry& entry); + } // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 44db1c096..ed8cc5f23 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -88,24 +88,28 @@ struct TableIdentifier; class Catalog; class LocationProvider; +class SortField; +class SortOrder; class Table; class Transaction; +class Transform; +class TransformFunction; + +struct PartitionStatisticsFile; +struct Snapshot; +struct SnapshotRef; +struct StatisticsFile; +struct TableMetadata; + +enum class SnapshotRefType; +enum class TransformType; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. /// ---------------------------------------------------------------------------- class HistoryEntry; -class Snapshot; -struct SnapshotRef; -enum class SnapshotRefType; -class SortField; -class SortOrder; class StructLike; -struct TableMetadata; -class Transform; -enum class TransformType; -class TransformFunction; class MetadataUpdate; class UpdateRequirement; diff --git a/test/schema_json_test.cc b/test/schema_json_test.cc index 32c50da74..c6549ab20 100644 --- a/test/schema_json_test.cc +++ b/test/schema_json_test.cc @@ -41,7 +41,7 @@ class TypeJsonTest : public ::testing::TestWithParam {}; TEST_P(TypeJsonTest, SingleTypeRoundTrip) { // To Json const auto& param = GetParam(); - auto json = TypeToJson(*param.type).dump(); + auto json = ToJson(*param.type).dump(); ASSERT_EQ(param.json, json); // From Json @@ -131,7 +131,7 @@ TEST(SchemaJsonTest, RoundTrip) { ASSERT_EQ(field2.type()->type_id(), TypeId::kString); ASSERT_TRUE(field2.optional()); - auto dumped_json = SchemaToJson(*schema).dump(); + auto dumped_json = ToJson(*schema).dump(); ASSERT_EQ(dumped_json, json); }