Skip to content

Commit bb02a15

Browse files
authored
refactor: change PartitionSpec to be independent of Schema (#299)
- Remove `schema_` member from `PartitionSpec`, Making `PartitionType()` method require a Schema parameter instead of using the stored schema. - Add `table_schema_` in `ManifestEntryAdapter`. - Fix bug in ManifestEntryAdapter::init(v1/v2/v3) to use `table_schema` in `metadata_["schema"]`. - Fix error in related test.
1 parent abd6b3f commit bb02a15

19 files changed

+169
-114
lines changed

src/iceberg/json_internal.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,9 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
529529
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
530530
partition_fields.push_back(std::move(*partition_field));
531531
}
532-
return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields));
532+
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
533+
// partition field from schema and then verify it
534+
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));
533535
}
534536

535537
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
@@ -902,8 +904,10 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
902904
std::move(field->transform()));
903905
}
904906

905-
auto spec = std::make_unique<PartitionSpec>(
906-
current_schema, PartitionSpec::kInitialSpecId, std::move(fields));
907+
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
908+
// partition field from schema and then verify it
909+
auto spec =
910+
std::make_unique<PartitionSpec>(PartitionSpec::kInitialSpecId, std::move(fields));
907911
default_spec_id = spec->spec_id();
908912
partition_specs.push_back(std::move(spec));
909913
}

src/iceberg/json_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_s
173173
/// objects. Each `PartitionField` will be parsed using the `PartitionFieldFromJson`
174174
/// function.
175175
///
176+
/// \param schema The current schema.
176177
/// \param json The JSON object representing a `PartitionSpec`.
177178
/// \return An `expected` value containing either a `PartitionSpec` object or an error. If
178179
/// the JSON is malformed or missing expected fields, an error will be returned.

src/iceberg/manifest_adapter.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include "iceberg/manifest_adapter.h"
2121

22+
#include <utility>
23+
2224
#include <nanoarrow/nanoarrow.h>
2325

2426
#include "iceberg/arrow/nanoarrow_status_internal.h"
@@ -140,8 +142,11 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
140142
}
141143

142144
ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
145+
std::shared_ptr<Schema> current_schema,
143146
ManifestContent content)
144-
: partition_spec_(std::move(partition_spec)), content_(content) {
147+
: partition_spec_(std::move(partition_spec)),
148+
current_schema_(std::move(current_schema)),
149+
content_(content) {
145150
if (!partition_spec_) {
146151
partition_spec_ = PartitionSpec::Unpartitioned();
147152
}

src/iceberg/manifest_adapter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class ICEBERG_EXPORT ManifestAdapter {
6262
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6363
public:
6464
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
65-
ManifestContent content);
65+
std::shared_ptr<Schema> current_schema, ManifestContent content);
6666

6767
~ManifestEntryAdapter() override;
6868

@@ -92,6 +92,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
9292

9393
protected:
9494
std::shared_ptr<PartitionSpec> partition_spec_;
95+
std::shared_ptr<Schema> current_schema_;
9596
std::shared_ptr<Schema> manifest_schema_;
9697
const ManifestContent content_;
9798
};

src/iceberg/manifest_writer.cc

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,23 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(
7070

7171
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
7272
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
73-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec) {
74-
auto adapter =
75-
std::make_unique<ManifestEntryAdapterV1>(snapshot_id, std::move(partition_spec));
73+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
74+
std::shared_ptr<Schema> current_schema) {
75+
if (manifest_location.empty()) {
76+
return InvalidArgument("Manifest location cannot be empty");
77+
}
78+
if (!file_io) {
79+
return InvalidArgument("FileIO cannot be null");
80+
}
81+
if (!partition_spec) {
82+
return InvalidArgument("PartitionSpec cannot be null");
83+
}
84+
if (!current_schema) {
85+
return InvalidArgument("Current schema cannot be null");
86+
}
87+
88+
auto adapter = std::make_unique<ManifestEntryAdapterV1>(
89+
snapshot_id, std::move(partition_spec), std::move(current_schema));
7690
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
7791
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
7892

@@ -86,9 +100,21 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
86100
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
87101
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
88102
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
89-
ManifestContent content) {
103+
std::shared_ptr<Schema> current_schema, ManifestContent content) {
104+
if (manifest_location.empty()) {
105+
return InvalidArgument("Manifest location cannot be empty");
106+
}
107+
if (!file_io) {
108+
return InvalidArgument("FileIO cannot be null");
109+
}
110+
if (!partition_spec) {
111+
return InvalidArgument("PartitionSpec cannot be null");
112+
}
113+
if (!current_schema) {
114+
return InvalidArgument("Current schema cannot be null");
115+
}
90116
auto adapter = std::make_unique<ManifestEntryAdapterV2>(
91-
snapshot_id, std::move(partition_spec), content);
117+
snapshot_id, std::move(partition_spec), std::move(current_schema), content);
92118
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
93119
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
94120

@@ -102,9 +128,23 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
102128
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
103129
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
104130
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
105-
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content) {
131+
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
132+
ManifestContent content) {
133+
if (manifest_location.empty()) {
134+
return InvalidArgument("Manifest location cannot be empty");
135+
}
136+
if (!file_io) {
137+
return InvalidArgument("FileIO cannot be null");
138+
}
139+
if (!partition_spec) {
140+
return InvalidArgument("PartitionSpec cannot be null");
141+
}
142+
if (!current_schema) {
143+
return InvalidArgument("Current schema cannot be null");
144+
}
106145
auto adapter = std::make_unique<ManifestEntryAdapterV3>(
107-
snapshot_id, first_row_id, std::move(partition_spec), content);
146+
snapshot_id, first_row_id, std::move(partition_spec), std::move(current_schema),
147+
content);
108148
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
109149
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
110150

src/iceberg/manifest_writer.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,35 +62,42 @@ class ICEBERG_EXPORT ManifestWriter {
6262
/// \param manifest_location Path to the manifest file.
6363
/// \param file_io File IO implementation to use.
6464
/// \param partition_spec Partition spec for the manifest.
65+
/// \param current_schema Current table schema.
6566
/// \return A Result containing the writer or an error.
6667
static Result<std::unique_ptr<ManifestWriter>> MakeV1Writer(
6768
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
68-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec);
69+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
70+
std::shared_ptr<Schema> current_schema);
6971

7072
/// \brief Creates a writer for a manifest file.
7173
/// \param snapshot_id ID of the snapshot.
7274
/// \param manifest_location Path to the manifest file.
7375
/// \param file_io File IO implementation to use.
7476
/// \param partition_spec Partition spec for the manifest.
77+
/// \param current_schema Schema containing the source fields referenced by partition
78+
/// spec.
7579
/// \param content Content of the manifest.
7680
/// \return A Result containing the writer or an error.
7781
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
7882
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
7983
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
80-
ManifestContent content);
84+
std::shared_ptr<Schema> current_schema, ManifestContent content);
8185

8286
/// \brief Creates a writer for a manifest file.
8387
/// \param snapshot_id ID of the snapshot.
8488
/// \param first_row_id First row ID of the snapshot.
8589
/// \param manifest_location Path to the manifest file.
8690
/// \param file_io File IO implementation to use.
8791
/// \param partition_spec Partition spec for the manifest.
92+
/// \param current_schema Schema containing the source fields referenced by partition
93+
/// spec.
8894
/// \param content Content of the manifest.
8995
/// \return A Result containing the writer or an error.
9096
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
9197
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
9298
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
93-
std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content);
99+
std::shared_ptr<PartitionSpec> partition_spec,
100+
std::shared_ptr<Schema> current_schema, ManifestContent content);
94101

95102
private:
96103
static constexpr int64_t kBatchSize = 1024;

src/iceberg/partition_spec.cc

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <algorithm>
2323
#include <format>
24+
#include <memory>
2425
#include <ranges>
2526

2627
#include "iceberg/schema.h"
@@ -31,10 +32,9 @@
3132

3233
namespace iceberg {
3334

34-
PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
35-
std::vector<PartitionField> fields,
35+
PartitionSpec::PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
3636
std::optional<int32_t> last_assigned_field_id)
37-
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {
37+
: spec_id_(spec_id), fields_(std::move(fields)) {
3838
if (last_assigned_field_id) {
3939
last_assigned_field_id_ = last_assigned_field_id.value();
4040
} else if (fields_.empty()) {
@@ -48,34 +48,25 @@ PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
4848

4949
const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
5050
static const std::shared_ptr<PartitionSpec> unpartitioned =
51-
std::make_shared<PartitionSpec>(
52-
/*schema=*/std::make_shared<Schema>(std::vector<SchemaField>{}), kInitialSpecId,
53-
std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1);
51+
std::make_shared<PartitionSpec>(kInitialSpecId, std::vector<PartitionField>{},
52+
kLegacyPartitionDataIdStart - 1);
5453
return unpartitioned;
5554
}
5655

57-
const std::shared_ptr<Schema>& PartitionSpec::schema() const { return schema_; }
58-
5956
int32_t PartitionSpec::spec_id() const { return spec_id_; }
6057

6158
std::span<const PartitionField> PartitionSpec::fields() const { return fields_; }
6259

63-
Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
60+
Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(const Schema& schema) {
6461
if (fields_.empty()) {
65-
return nullptr;
66-
}
67-
{
68-
std::scoped_lock<std::mutex> lock(mutex_);
69-
if (partition_type_ != nullptr) {
70-
return partition_type_;
71-
}
62+
return std::make_unique<StructType>(std::vector<SchemaField>{});
7263
}
7364

7465
std::vector<SchemaField> partition_fields;
7566
for (const auto& partition_field : fields_) {
7667
// Get the source field from the original schema by source_id
7768
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
78-
schema_->FindFieldById(partition_field.source_id()));
69+
schema.FindFieldById(partition_field.source_id()));
7970
if (!source_field.has_value()) {
8071
// TODO(xiao.dong) when source field is missing,
8172
// should return an error or just use UNKNOWN type
@@ -97,11 +88,7 @@ Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
9788
/*optional=*/true);
9889
}
9990

100-
std::scoped_lock<std::mutex> lock(mutex_);
101-
if (partition_type_ == nullptr) {
102-
partition_type_ = std::make_shared<StructType>(std::move(partition_fields));
103-
}
104-
return partition_type_;
91+
return std::make_unique<StructType>(std::move(partition_fields));
10592
}
10693

10794
std::string PartitionSpec::ToString() const {

src/iceberg/partition_spec.h

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
/// Partition specs for Iceberg tables.
2424

2525
#include <cstdint>
26-
#include <mutex>
26+
#include <memory>
2727
#include <optional>
2828
#include <span>
2929
#include <string>
@@ -32,6 +32,7 @@
3232
#include "iceberg/iceberg_export.h"
3333
#include "iceberg/partition_field.h"
3434
#include "iceberg/result.h"
35+
#include "iceberg/type_fwd.h"
3536
#include "iceberg/util/formattable.h"
3637

3738
namespace iceberg {
@@ -56,24 +57,20 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
5657
/// \param fields The partition fields.
5758
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
5859
/// be calculated from the fields.
59-
PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
60-
std::vector<PartitionField> fields,
60+
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
6161
std::optional<int32_t> last_assigned_field_id = std::nullopt);
6262

6363
/// \brief Get an unsorted partition spec singleton.
6464
static const std::shared_ptr<PartitionSpec>& Unpartitioned();
6565

66-
/// \brief Get the table schema
67-
const std::shared_ptr<Schema>& schema() const;
68-
6966
/// \brief Get the spec ID.
7067
int32_t spec_id() const;
7168

7269
/// \brief Get a list view of the partition fields.
7370
std::span<const PartitionField> fields() const;
7471

75-
/// \brief Get the partition type.
76-
Result<std::shared_ptr<StructType>> PartitionType();
72+
/// \brief Get the partition type binding to the input schema.
73+
Result<std::unique_ptr<StructType>> PartitionType(const Schema&);
7774

7875
std::string ToString() const override;
7976

@@ -87,14 +84,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
8784
/// \brief Compare two partition specs for equality.
8885
bool Equals(const PartitionSpec& other) const;
8986

90-
std::shared_ptr<Schema> schema_;
9187
const int32_t spec_id_;
9288
std::vector<PartitionField> fields_;
9389
int32_t last_assigned_field_id_;
94-
95-
// FIXME: use similar lazy initialization pattern as in StructType
96-
std::mutex mutex_;
97-
std::shared_ptr<StructType> partition_type_;
9890
};
9991

10092
} // namespace iceberg

src/iceberg/table_scan.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "iceberg/schema_field.h"
3232
#include "iceberg/snapshot.h"
3333
#include "iceberg/table_metadata.h"
34+
#include "iceberg/type.h"
3435
#include "iceberg/util/macros.h"
3536

3637
namespace iceberg {
@@ -269,7 +270,11 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
269270

270271
std::vector<std::shared_ptr<FileScanTask>> tasks;
271272
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, context_.table_metadata->PartitionSpec());
272-
auto partition_schema = partition_spec->schema();
273+
274+
// Get the table schema and partition type
275+
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, context_.table_metadata->Schema());
276+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructType> partition_schema,
277+
partition_spec->PartitionType(*current_schema));
273278

274279
for (const auto& manifest_file : manifest_files) {
275280
ICEBERG_ASSIGN_OR_RAISE(

src/iceberg/test/json_internal_test.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,9 @@ TEST(JsonPartitionTest, PartitionSpec) {
144144
std::vector<SchemaField>{SchemaField(3, "region", iceberg::string(), false),
145145
SchemaField(5, "ts", iceberg::int64(), false)},
146146
/*schema_id=*/100);
147-
148147
auto identity_transform = Transform::Identity();
149-
PartitionSpec spec(schema, 1,
150-
{PartitionField(3, 101, "region", identity_transform),
151-
PartitionField(5, 102, "ts", identity_transform)});
148+
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
149+
PartitionField(5, 102, "ts", identity_transform)});
152150
auto json = ToJson(spec);
153151
nlohmann::json expected_json = R"({"spec-id": 1,
154152
"fields": [

0 commit comments

Comments
 (0)