Skip to content

Commit a859eac

Browse files
committed
fix review
1 parent bb499ad commit a859eac

13 files changed

+41
-45
lines changed

src/iceberg/json_internal.cc

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,8 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
519519
std::move(transform));
520520
}
521521

522-
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(const nlohmann::json& json) {
522+
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
523+
const std::shared_ptr<Schema>& schema, const nlohmann::json& json) {
523524
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
524525
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
525526

@@ -528,6 +529,8 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(const nlohmann::jso
528529
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
529530
partition_fields.push_back(std::move(*partition_field));
530531
}
532+
// TODO(Li Feiyang):use a new PartitionSpec::Make to find the source field of each
533+
// partition field from schema and then verify it
531534
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));
532535
}
533536

@@ -855,6 +858,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
855858
/// \param[out] default_spec_id The default partition spec ID.
856859
/// \param[out] partition_specs The list of partition specs.
857860
Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
861+
const std::shared_ptr<Schema>& current_schema,
858862
int32_t& default_spec_id,
859863
std::vector<std::shared_ptr<PartitionSpec>>& partition_specs) {
860864
if (json.contains(kPartitionSpecs)) {
@@ -867,7 +871,8 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
867871
ICEBERG_ASSIGN_OR_RAISE(default_spec_id, GetJsonValue<int32_t>(json, kDefaultSpecId));
868872

869873
for (const auto& spec_json : spec_array) {
870-
ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecFromJson(spec_json));
874+
ICEBERG_ASSIGN_OR_RAISE(auto spec,
875+
PartitionSpecFromJson(current_schema, spec_json));
871876
partition_specs.push_back(std::move(spec));
872877
}
873878
} else {
@@ -973,9 +978,9 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
973978
ParseSchemas(json, table_metadata->format_version,
974979
table_metadata->current_schema_id, table_metadata->schemas));
975980

976-
ICEBERG_RETURN_UNEXPECTED(ParsePartitionSpecs(json, table_metadata->format_version,
977-
table_metadata->default_spec_id,
978-
table_metadata->partition_specs));
981+
ICEBERG_RETURN_UNEXPECTED(ParsePartitionSpecs(
982+
json, table_metadata->format_version, current_schema,
983+
table_metadata->default_spec_id, table_metadata->partition_specs));
979984

980985
if (json.contains(kLastPartitionId)) {
981986
ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_partition_id,

src/iceberg/json_internal.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,12 @@ ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_s
173173
/// objects. Each `PartitionField` will be parsed using the `PartitionFieldFromJson`
174174
/// function.
175175
///
176+
/// \param schema The table schema.
176177
/// \param json The JSON object representing a `PartitionSpec`.
177178
/// \return An `expected` value containing either a `PartitionSpec` object or an error. If
178179
/// the JSON is malformed or missing expected fields, an error will be returned.
179180
ICEBERG_EXPORT Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
180-
const nlohmann::json& json);
181+
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);
181182

182183
/// \brief Serializes a `SnapshotRef` object to JSON.
183184
///

src/iceberg/manifest_adapter.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,9 @@ Result<std::shared_ptr<StructType>> ManifestEntryAdapter::GetManifestEntryType()
153153
return ManifestEntry::TypeFromPartitionType(nullptr);
154154
}
155155
std::shared_ptr<StructType> partition_type = nullptr;
156-
if (table_schema_ != nullptr) {
156+
if (current_schema_ != nullptr) {
157157
ICEBERG_ASSIGN_OR_RAISE(partition_type,
158-
partition_spec_->PartitionType(table_schema_));
158+
partition_spec_->PartitionType(*current_schema_));
159159
}
160160
return ManifestEntry::TypeFromPartitionType(std::move(partition_type));
161161
}

src/iceberg/manifest_adapter.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ class ICEBERG_EXPORT ManifestAdapter {
6262
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6363
public:
6464
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
65-
std::shared_ptr<Schema> table_schema)
65+
std::shared_ptr<Schema> current_schema_)
6666
: partition_spec_(std::move(partition_spec)),
67-
table_schema_(std::move(table_schema)) {}
67+
current_schema_(std::move(current_schema_)) {}
6868
~ManifestEntryAdapter() override;
6969

7070
virtual Status Append(const ManifestEntry& entry) = 0;
@@ -98,7 +98,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
9898

9999
protected:
100100
std::shared_ptr<PartitionSpec> partition_spec_;
101-
std::shared_ptr<Schema> table_schema_;
101+
std::shared_ptr<Schema> current_schema_;
102102
std::shared_ptr<Schema> manifest_schema_;
103103
};
104104

src/iceberg/manifest_writer.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(
6969
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
7070
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
7171
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
72-
std::shared_ptr<Schema> table_schema) {
72+
std::shared_ptr<Schema> current_schema_) {
7373
auto adapter = std::make_unique<ManifestEntryAdapterV1>(
74-
snapshot_id, std::move(partition_spec), std::move(table_schema));
74+
snapshot_id, std::move(partition_spec), std::move(current_schema_));
7575
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
7676
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
7777

src/iceberg/partition_spec.cc

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,16 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
5757

5858
std::span<const PartitionField> PartitionSpec::fields() const { return fields_; }
5959

60-
Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType(
61-
std::shared_ptr<Schema> schema) {
60+
Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(const Schema& schema) {
6261
if (fields_.empty()) {
6362
return nullptr;
6463
}
65-
{
66-
std::scoped_lock<std::mutex> lock(mutex_);
67-
if (partition_type_ != nullptr) {
68-
return partition_type_;
69-
}
70-
}
7164

7265
std::vector<SchemaField> partition_fields;
7366
for (const auto& partition_field : fields_) {
7467
// Get the source field from the original schema by source_id
7568
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
76-
schema->FindFieldById(partition_field.source_id()));
69+
schema.FindFieldById(partition_field.source_id()));
7770
if (!source_field.has_value()) {
7871
// TODO(xiao.dong) when source field is missing,
7972
// should return an error or just use UNKNOWN type
@@ -95,11 +88,7 @@ Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType(
9588
/*optional=*/true);
9689
}
9790

98-
std::scoped_lock<std::mutex> lock(mutex_);
99-
if (partition_type_ == nullptr) {
100-
partition_type_ = std::make_shared<StructType>(std::move(partition_fields));
101-
}
102-
return partition_type_;
91+
return std::make_unique<StructType>(std::move(partition_fields));
10392
}
10493

10594
std::string PartitionSpec::ToString() const {

src/iceberg/partition_spec.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
7171
std::span<const PartitionField> fields() const;
7272

7373
/// \brief Get the partition type.
74-
Result<std::shared_ptr<StructType>> PartitionType(std::shared_ptr<Schema> schema);
74+
Result<std::unique_ptr<StructType>> PartitionType(const Schema&);
7575

7676
std::string ToString() const override;
7777

@@ -88,10 +88,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
8888
const int32_t spec_id_;
8989
std::vector<PartitionField> fields_;
9090
int32_t last_assigned_field_id_;
91-
92-
// FIXME: use similar lazy initialization pattern as in StructType
93-
std::mutex mutex_;
94-
std::shared_ptr<StructType> partition_type_;
9591
};
9692

9793
} // namespace iceberg

src/iceberg/table_scan.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "iceberg/schema_field.h"
3232
#include "iceberg/snapshot.h"
3333
#include "iceberg/table_metadata.h"
34+
#include "iceberg/type.h"
3435
#include "iceberg/util/macros.h"
3536

3637
namespace iceberg {
@@ -272,14 +273,15 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
272273
// auto partition_schema = context_.table_metadata->Schema().value();
273274

274275
// Get the table schema and partition type
275-
ICEBERG_ASSIGN_OR_RAISE(auto table_schema, context_.table_metadata->Schema());
276+
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, context_.table_metadata->Schema());
276277
ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
277-
partition_spec->PartitionType(table_schema));
278+
partition_spec->PartitionType(*current_schema));
279+
auto partition_schema = std::shared_ptr<StructType>(std::move(partition_type));
278280

279281
for (const auto& manifest_file : manifest_files) {
280282
ICEBERG_ASSIGN_OR_RAISE(
281283
auto manifest_reader,
282-
ManifestReader::Make(manifest_file, file_io_, partition_type));
284+
ManifestReader::Make(manifest_file, file_io_, partition_schema));
283285
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
284286

285287
// TODO(gty404): filter manifests using partition spec and filter expression

src/iceberg/test/json_internal_test.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ TEST(JsonPartitionTest, PartitionFieldFromJsonMissingField) {
140140
}
141141

142142
TEST(JsonPartitionTest, PartitionSpec) {
143+
auto schema = std::make_shared<Schema>(
144+
std::vector<SchemaField>{SchemaField(3, "region", iceberg::string(), false),
145+
SchemaField(5, "ts", iceberg::int64(), false)},
146+
/*schema_id=*/100);
143147
auto identity_transform = Transform::Identity();
144148
PartitionSpec spec(1, {PartitionField(3, 101, "region", identity_transform),
145149
PartitionField(5, 102, "ts", identity_transform)});
@@ -157,7 +161,7 @@ TEST(JsonPartitionTest, PartitionSpec) {
157161

158162
EXPECT_EQ(json, expected_json);
159163

160-
auto parsed_spec_result = PartitionSpecFromJson(json);
164+
auto parsed_spec_result = PartitionSpecFromJson(schema, json);
161165
ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message;
162166
EXPECT_EQ(spec, *parsed_spec_result.value());
163167
}

src/iceberg/test/partition_spec_test.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,7 @@ TEST(PartitionSpecTest, Equality) {
8888
TEST(PartitionSpecTest, PartitionSchemaTest) {
8989
SchemaField field1(5, "ts", iceberg::timestamp(), true);
9090
SchemaField field2(7, "bar", iceberg::string(), true);
91-
auto const schema =
92-
std::make_shared<Schema>(std::vector<SchemaField>{field1, field2}, 100);
91+
Schema schema({field1, field2}, 100);
9392
auto identity_transform = Transform::Identity();
9493
PartitionField pt_field1(5, 1000, "day", identity_transform);
9594
PartitionField pt_field2(7, 1001, "hour", identity_transform);
@@ -136,10 +135,10 @@ TEST(PartitionSpecTest, PartitionTypeTest) {
136135
std::vector<SchemaField>{field1, field2, field3, field4, field5, field6},
137136
Schema::kInitialSchemaId);
138137

139-
auto parsed_spec_result = PartitionSpecFromJson(json);
138+
auto parsed_spec_result = PartitionSpecFromJson(schema, json);
140139
ASSERT_TRUE(parsed_spec_result.has_value()) << parsed_spec_result.error().message;
141140

142-
auto partition_schema = parsed_spec_result.value()->PartitionType(schema);
141+
auto partition_schema = parsed_spec_result.value()->PartitionType(*schema);
143142

144143
SchemaField pt_field1(1000, "ts_day", date(), true);
145144
SchemaField pt_field2(1001, "id_bucket", int32(), true);

0 commit comments

Comments
 (0)