2222#include < algorithm>
2323#include < cstdint>
2424#include < format>
25+ #include < ranges>
2526#include < regex>
2627#include < type_traits>
2728#include < unordered_set>
2829
2930#include < nlohmann/json.hpp>
3031
32+ #include " iceberg/constants.h"
33+ #include " iceberg/partition_field.h"
3134#include " iceberg/partition_spec.h"
3235#include " iceberg/result.h"
3336#include " iceberg/schema.h"
@@ -248,7 +251,7 @@ Result<std::vector<T>> FromJsonList(
248251 list.emplace_back (std::move (entry));
249252 }
250253 }
251- return {} ;
254+ return list ;
252255}
253256
254257// / \brief Parse a list of items from a JSON object.
@@ -471,7 +474,7 @@ nlohmann::json ToJson(const Type& type) {
471474
472475nlohmann::json ToJson (const Schema& schema) {
473476 nlohmann::json json = ToJson (static_cast <const Type&>(schema));
474- json[ kSchemaId ] = schema.schema_id ();
477+ SetOptionalField ( json, kSchemaId , schema.schema_id () );
475478 // TODO(gangwu): add identifier-field-ids.
476479 return json;
477480}
@@ -496,7 +499,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
496499 nlohmann::json json;
497500 json[kSnapshotId ] = snapshot.snapshot_id ;
498501 SetOptionalField (json, kParentSnapshotId , snapshot.parent_snapshot_id );
499- if (snapshot.sequence_number > TableMetadata:: kInitialSequenceNumber ) {
502+ if (snapshot.sequence_number > kInitialSequenceNumber ) {
500503 json[kSequenceNumber ] = snapshot.sequence_number ;
501504 }
502505 json[kTimestampMs ] = snapshot.timestamp_ms ;
@@ -625,7 +628,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
625628}
626629
627630Result<std::unique_ptr<Schema>> SchemaFromJson (const nlohmann::json& json) {
628- ICEBERG_ASSIGN_OR_RAISE (auto schema_id, GetJsonValue <int32_t >(json, kSchemaId ));
631+ ICEBERG_ASSIGN_OR_RAISE (auto schema_id, GetJsonValueOptional <int32_t >(json, kSchemaId ));
629632 ICEBERG_ASSIGN_OR_RAISE (auto type, TypeFromJson (json));
630633
631634 if (type->type_id () != TypeId::kStruct ) [[unlikely]] {
@@ -658,9 +661,16 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) {
658661}
659662
660663Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson (
661- const nlohmann::json& json) {
664+ const nlohmann::json& json, bool allow_field_id_missing ) {
662665 ICEBERG_ASSIGN_OR_RAISE (auto source_id, GetJsonValue<int32_t >(json, kSourceId ));
663- ICEBERG_ASSIGN_OR_RAISE (auto field_id, GetJsonValue<int32_t >(json, kFieldId ));
666+ int32_t field_id;
667+ if (allow_field_id_missing) {
668+ // Partition field id in v1 is not tracked, so we use -1 to indicate that.
669+ ICEBERG_ASSIGN_OR_RAISE (
670+ field_id, GetJsonValueOrDefault<int32_t >(json, kFieldId , kInvalidFieldId ));
671+ } else {
672+ ICEBERG_ASSIGN_OR_RAISE (field_id, GetJsonValue<int32_t >(json, kFieldId ));
673+ }
664674 ICEBERG_ASSIGN_OR_RAISE (
665675 auto transform,
666676 GetJsonValue<std::string>(json, kTransform ).and_then (TransformFromString));
@@ -755,9 +765,8 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
755765 ICEBERG_ASSIGN_OR_RAISE (auto schema_id, GetJsonValueOptional<int32_t >(json, kSchemaId ));
756766
757767 return std::make_unique<Snapshot>(
758- snapshot_id, parent_snapshot_id,
759- sequence_number.value_or (TableMetadata::kInitialSequenceNumber ), timestamp_ms,
760- manifest_list, std::move (summary), schema_id);
768+ snapshot_id, parent_snapshot_id, sequence_number.value_or (kInitialSequenceNumber ),
769+ timestamp_ms, manifest_list, std::move (summary), schema_id);
761770}
762771
763772nlohmann::json ToJson (const BlobMetadata& blob_metadata) {
@@ -905,7 +914,7 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
905914 }
906915
907916 // write the current schema ID and schema list
908- json[ kCurrentSchemaId ] = table_metadata.current_schema_id ;
917+ SetOptionalField ( json, kCurrentSchemaId , table_metadata.current_schema_id ) ;
909918 json[kSchemas ] = ToJsonList (table_metadata.schemas );
910919
911920 // for older readers, continue writing the default spec as "partition-spec"
@@ -963,7 +972,8 @@ namespace {
963972// /
964973// / \return The current schema or parse error.
965974Result<std::shared_ptr<Schema>> ParseSchemas (
966- const nlohmann::json& json, int8_t format_version, int32_t & current_schema_id,
975+ const nlohmann::json& json, int8_t format_version,
976+ std::optional<int32_t >& current_schema_id,
967977 std::vector<std::shared_ptr<Schema>>& schemas) {
968978 std::shared_ptr<Schema> current_schema;
969979 if (json.contains (kSchemas )) {
@@ -986,7 +996,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
986996 }
987997 if (!current_schema) {
988998 return JsonParseError (" Cannot find schema with {}={} from {}" , kCurrentSchemaId ,
989- current_schema_id, schema_array.dump ());
999+ current_schema_id. value () , schema_array.dump ());
9901000 }
9911001 } else {
9921002 if (format_version != 1 ) {
@@ -1031,13 +1041,30 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
10311041 return JsonParseError (" {} must exist in format v{}" , kPartitionSpecs ,
10321042 format_version);
10331043 }
1034- default_spec_id = TableMetadata::kInitialSpecId ;
10351044
1036- ICEBERG_ASSIGN_OR_RAISE (auto spec, GetJsonValue<nlohmann::json>(json, kPartitionSpec )
1037- .and_then ([current_schema](const auto & json) {
1038- return PartitionSpecFromJson (current_schema,
1039- json);
1040- }));
1045+ ICEBERG_ASSIGN_OR_RAISE (auto partition_spec_json,
1046+ GetJsonValue<nlohmann::json>(json, kPartitionSpec ));
1047+ if (!partition_spec_json.is_array ()) {
1048+ return JsonParseError (" Cannot parse v1 partition spec from non-array: {}" ,
1049+ partition_spec_json.dump ());
1050+ }
1051+
1052+ int32_t next_partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart ;
1053+ std::vector<PartitionField> fields;
1054+ for (const auto & entry_json : partition_spec_json) {
1055+ ICEBERG_ASSIGN_OR_RAISE (auto field, PartitionFieldFromJson (entry_json));
1056+ int32_t field_id = field->field_id ();
1057+ if (field_id == kInvalidFieldId ) {
1058+ // If the field ID is not set, we need to assign a new one
1059+ field_id = next_partition_field_id++;
1060+ }
1061+ fields.emplace_back (field->source_id (), field_id, std::string (field->name ()),
1062+ std::move (field->transform ()));
1063+ }
1064+
1065+ auto spec = std::make_unique<PartitionSpec>(current_schema, kInitialSpecId ,
1066+ std::move (fields));
1067+ default_spec_id = spec->spec_id ();
10411068 partition_specs.push_back (std::move (spec));
10421069 }
10431070
@@ -1066,7 +1093,9 @@ Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
10661093 if (format_version > 1 ) {
10671094 return JsonParseError (" {} must exist in format v{}" , kSortOrders , format_version);
10681095 }
1069- return NotImplementedError (" Assign a default sort order" );
1096+ auto sort_order = SortOrder::Unsorted ();
1097+ default_sort_order_id = sort_order->order_id ();
1098+ sort_orders.push_back (std::move (sort_order));
10701099 }
10711100 return {};
10721101}
@@ -1083,7 +1112,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
10831112 ICEBERG_ASSIGN_OR_RAISE (table_metadata->format_version ,
10841113 GetJsonValue<int8_t >(json, kFormatVersion ));
10851114 if (table_metadata->format_version < 1 ||
1086- table_metadata->format_version > TableMetadata:: kSupportedTableFormatVersion ) {
1115+ table_metadata->format_version > kSupportedTableFormatVersion ) {
10871116 return JsonParseError (" Cannot read unsupported version: {}" ,
10881117 table_metadata->format_version );
10891118 }
@@ -1097,7 +1126,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
10971126 ICEBERG_ASSIGN_OR_RAISE (table_metadata->last_sequence_number ,
10981127 GetJsonValue<int64_t >(json, kLastSequenceNumber ));
10991128 } else {
1100- table_metadata->last_sequence_number = TableMetadata:: kInitialSequenceNumber ;
1129+ table_metadata->last_sequence_number = kInitialSequenceNumber ;
11011130 }
11021131 ICEBERG_ASSIGN_OR_RAISE (table_metadata->last_column_id ,
11031132 GetJsonValue<int32_t >(json, kLastColumnId ));
@@ -1119,10 +1148,16 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11191148 return JsonParseError (" {} must exist in format v{}" , kLastPartitionId ,
11201149 table_metadata->format_version );
11211150 }
1122- // TODO(gangwu): iterate all partition specs to find the largest partition
1123- // field id or assign a default value for unpartitioned tables. However,
1124- // PartitionSpec::lastAssignedFieldId() is not implemented yet.
1125- return NotImplementedError (" Find the largest partition field id" );
1151+
1152+ if (table_metadata->partition_specs .empty ()) {
1153+ table_metadata->last_partition_id =
1154+ PartitionSpec::Unpartitioned ()->last_assigned_field_id ();
1155+ } else {
1156+ table_metadata->last_partition_id =
1157+ std::ranges::max (table_metadata->partition_specs , {}, [](const auto & spec) {
1158+ return spec->last_assigned_field_id ();
1159+ })->last_assigned_field_id ();
1160+ }
11261161 }
11271162
11281163 ICEBERG_RETURN_UNEXPECTED (ParseSortOrders (json, table_metadata->format_version ,
@@ -1136,14 +1171,13 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11361171 // This field is optional, but internally we set this to -1 when not set
11371172 ICEBERG_ASSIGN_OR_RAISE (
11381173 table_metadata->current_snapshot_id ,
1139- GetJsonValueOrDefault<int64_t >(json, kCurrentSnapshotId ,
1140- TableMetadata::kInvalidSnapshotId ));
1174+ GetJsonValueOrDefault<int64_t >(json, kCurrentSnapshotId , kInvalidSnapshotId ));
11411175
11421176 if (table_metadata->format_version >= 3 ) {
11431177 ICEBERG_ASSIGN_OR_RAISE (table_metadata->next_row_id ,
11441178 GetJsonValue<int64_t >(json, kNextRowId ));
11451179 } else {
1146- table_metadata->next_row_id = TableMetadata:: kInitialRowId ;
1180+ table_metadata->next_row_id = kInitialRowId ;
11471181 }
11481182
11491183 ICEBERG_ASSIGN_OR_RAISE (auto last_updated_ms,
@@ -1155,7 +1189,7 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11551189 ICEBERG_ASSIGN_OR_RAISE (
11561190 table_metadata->refs ,
11571191 FromJsonMap<std::shared_ptr<SnapshotRef>>(json, kRefs , SnapshotRefFromJson));
1158- } else if (table_metadata->current_snapshot_id != TableMetadata:: kInvalidSnapshotId ) {
1192+ } else if (table_metadata->current_snapshot_id != kInvalidSnapshotId ) {
11591193 table_metadata->refs [" main" ] = std::make_unique<SnapshotRef>(SnapshotRef{
11601194 .snapshot_id = table_metadata->current_snapshot_id ,
11611195 .retention = SnapshotRef::Branch{},
0 commit comments