Skip to content

Commit 3a4dd3b

Browse files
author
nullccxsy
committed
feat: implement thread-safe lazy initialization for Schema and StructType
- Added move and copy constructors and assignment operators for Schema and StructType to manage resource ownership and improve performance. - Refactored field lookup methods to utilize lazy initialization with thread safety, ensuring safe concurrent access. - Introduced unit tests for thread safety in Schema and StructType, validating concurrent operations and access patterns.
1 parent 78d06bf commit 3a4dd3b

File tree

6 files changed

+312
-11
lines changed

6 files changed

+312
-11
lines changed

src/iceberg/schema.cc

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,42 @@ class NameToIdVisitor {
7171
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
7272
: StructType(std::move(fields)), schema_id_(schema_id) {}
7373

74+
Schema::Schema(Schema&& other) noexcept
75+
: StructType(std::move(other)),
76+
schema_id_(other.schema_id_),
77+
id_to_field_(std::move(other.id_to_field_)),
78+
name_to_id_(std::move(other.name_to_id_)),
79+
lowercase_name_to_id_(std::move(other.lowercase_name_to_id_)) {}
80+
81+
Schema& Schema::operator=(const Schema& other) {
82+
if (this != &other) {
83+
StructType::operator=(other);
84+
schema_id_ = other.schema_id_;
85+
id_to_field_ = other.id_to_field_;
86+
name_to_id_ = other.name_to_id_;
87+
lowercase_name_to_id_ = other.lowercase_name_to_id_;
88+
}
89+
return *this;
90+
}
91+
92+
Schema::Schema(const Schema& other)
93+
: StructType(other),
94+
schema_id_(other.schema_id_),
95+
id_to_field_(other.id_to_field_),
96+
name_to_id_(other.name_to_id_),
97+
lowercase_name_to_id_(other.lowercase_name_to_id_) {}
98+
99+
Schema& Schema::operator=(Schema&& other) noexcept {
100+
if (this != &other) {
101+
StructType::operator=(std::move(other));
102+
schema_id_ = other.schema_id_;
103+
id_to_field_ = std::move(other.id_to_field_);
104+
name_to_id_ = std::move(other.name_to_id_);
105+
lowercase_name_to_id_ = std::move(other.lowercase_name_to_id_);
106+
}
107+
return *this;
108+
}
109+
74110
std::optional<int32_t> Schema::schema_id() const { return schema_id_; }
75111

76112
std::string Schema::ToString() const {
@@ -89,12 +125,14 @@ bool Schema::Equals(const Schema& other) const {
89125
Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFieldByName(
90126
std::string_view name, bool case_sensitive) const {
91127
if (case_sensitive) {
92-
ICEBERG_RETURN_UNEXPECTED(InitNameToIdMap());
128+
ICEBERG_RETURN_UNEXPECTED(
129+
LazyInitWithCallOnce(name_flag_, [this]() { return InitNameToIdMap(); }));
93130
auto it = name_to_id_.find(name);
94131
if (it == name_to_id_.end()) return std::nullopt;
95132
return FindFieldById(it->second);
96133
}
97-
ICEBERG_RETURN_UNEXPECTED(InitLowerCaseNameToIdMap());
134+
ICEBERG_RETURN_UNEXPECTED(LazyInitWithCallOnce(
135+
lowercase_name_flag_, [this]() { return InitLowerCaseNameToIdMap(); }));
98136
auto it = lowercase_name_to_id_.find(StringUtils::ToLower(name));
99137
if (it == lowercase_name_to_id_.end()) return std::nullopt;
100138
return FindFieldById(it->second);
@@ -133,7 +171,8 @@ Status Schema::InitLowerCaseNameToIdMap() const {
133171

134172
Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFieldById(
135173
int32_t field_id) const {
136-
ICEBERG_RETURN_UNEXPECTED(InitIdToFieldMap());
174+
ICEBERG_RETURN_UNEXPECTED(
175+
LazyInitWithCallOnce(id_flag_, [this]() { return InitIdToFieldMap(); }));
137176
auto it = id_to_field_.find(field_id);
138177
if (it == id_to_field_.end()) {
139178
return std::nullopt;

src/iceberg/schema.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
/// and any utility functions. See iceberg/type.h and iceberg/field.h as well.
2525

2626
#include <cstdint>
27+
#include <mutex>
2728
#include <optional>
2829
#include <string>
2930
#include <vector>
@@ -48,6 +49,10 @@ class ICEBERG_EXPORT Schema : public StructType {
4849
explicit Schema(std::vector<SchemaField> fields,
4950
std::optional<int32_t> schema_id = std::nullopt);
5051

52+
Schema(const Schema& other);
53+
Schema(Schema&& other) noexcept;
54+
Schema& operator=(const Schema& other);
55+
Schema& operator=(Schema&& other) noexcept;
5156
/// \brief Get the schema ID.
5257
///
5358
/// A schema is identified by a unique ID for the purposes of schema
@@ -78,13 +83,11 @@ class ICEBERG_EXPORT Schema : public StructType {
7883
/// \brief Compare two schemas for equality.
7984
[[nodiscard]] bool Equals(const Schema& other) const;
8085

81-
// TODO(nullccxsy): Address potential concurrency issues in lazy initialization (e.g.,
82-
// use std::call_once)
8386
Status InitIdToFieldMap() const;
8487
Status InitNameToIdMap() const;
8588
Status InitLowerCaseNameToIdMap() const;
8689

87-
const std::optional<int32_t> schema_id_;
90+
std::optional<int32_t> schema_id_;
8891
/// Mapping from field id to field.
8992
mutable std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>
9093
id_to_field_;
@@ -94,6 +97,10 @@ class ICEBERG_EXPORT Schema : public StructType {
9497
/// Mapping from lowercased field name to field id
9598
mutable std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>
9699
lowercase_name_to_id_;
100+
101+
mutable std::once_flag id_flag_;
102+
mutable std::once_flag name_flag_;
103+
mutable std::once_flag lowercase_name_flag_;
97104
};
98105

99106
} // namespace iceberg

src/iceberg/type.cc

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,38 @@ std::string StructType::ToString() const {
4848
return repr;
4949
}
5050
std::span<const SchemaField> StructType::fields() const { return fields_; }
51+
StructType::StructType(const StructType& other)
52+
: fields_(other.fields_),
53+
field_by_id_(other.field_by_id_),
54+
field_by_name_(other.field_by_name_),
55+
field_by_lowercase_name_(other.field_by_lowercase_name_) {}
56+
StructType::StructType(StructType&& other) noexcept
57+
: fields_(std::move(other.fields_)),
58+
field_by_id_(std::move(other.field_by_id_)),
59+
field_by_name_(std::move(other.field_by_name_)),
60+
field_by_lowercase_name_(std::move(other.field_by_lowercase_name_)) {}
61+
StructType& StructType::operator=(const StructType& other) {
62+
if (*this != other) {
63+
fields_ = other.fields_;
64+
field_by_id_ = other.field_by_id_;
65+
field_by_name_ = other.field_by_name_;
66+
field_by_lowercase_name_ = other.field_by_lowercase_name_;
67+
}
68+
return *this;
69+
}
70+
StructType& StructType::operator=(StructType&& other) noexcept {
71+
if (*this != other) {
72+
fields_ = std::move(other.fields_);
73+
field_by_id_ = std::move(other.field_by_id_);
74+
field_by_name_ = std::move(other.field_by_name_);
75+
field_by_lowercase_name_ = std::move(other.field_by_lowercase_name_);
76+
}
77+
return *this;
78+
}
5179
Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldById(
5280
int32_t field_id) const {
53-
ICEBERG_RETURN_UNEXPECTED(InitFieldById());
81+
ICEBERG_RETURN_UNEXPECTED(
82+
LazyInitWithCallOnce(field_by_id_flag_, [this]() { return InitFieldById(); }));
5483
auto it = field_by_id_.find(field_id);
5584
if (it == field_by_id_.end()) return std::nullopt;
5685
return it->second;
@@ -65,14 +94,16 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldByInd
6594
Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldByName(
6695
std::string_view name, bool case_sensitive) const {
6796
if (case_sensitive) {
68-
ICEBERG_RETURN_UNEXPECTED(InitFieldByName());
97+
ICEBERG_RETURN_UNEXPECTED(LazyInitWithCallOnce(
98+
field_by_name_flag_, [this]() { return InitFieldByName(); }));
6999
auto it = field_by_name_.find(name);
70100
if (it != field_by_name_.end()) {
71101
return it->second;
72102
}
73103
return std::nullopt;
74104
}
75-
ICEBERG_RETURN_UNEXPECTED(InitFieldByLowerCaseName());
105+
ICEBERG_RETURN_UNEXPECTED(LazyInitWithCallOnce(
106+
field_by_lowercase_name_flag_, [this]() { return InitFieldByLowerCaseName(); }));
76107
auto it = field_by_lowercase_name_.find(StringUtils::ToLower(name));
77108
if (it != field_by_lowercase_name_.end()) {
78109
return it->second;

src/iceberg/type.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <array>
2727
#include <cstdint>
2828
#include <memory>
29+
#include <mutex>
2930
#include <optional>
3031
#include <span>
3132
#include <string>
@@ -39,6 +40,13 @@
3940

4041
namespace iceberg {
4142

43+
template <typename Func>
44+
Status LazyInitWithCallOnce(std::once_flag& flag, Func&& func) {
45+
Status status;
46+
std::call_once(flag, [&status, &func]() { status = func(); });
47+
return status;
48+
}
49+
4250
/// \brief Interface for a data type for a field.
4351
class ICEBERG_EXPORT Type : public iceberg::util::Formattable {
4452
public:
@@ -109,6 +117,10 @@ class ICEBERG_EXPORT StructType : public NestedType {
109117
constexpr static TypeId kTypeId = TypeId::kStruct;
110118
explicit StructType(std::vector<SchemaField> fields);
111119
~StructType() override = default;
120+
StructType(const StructType&);
121+
StructType(StructType&&) noexcept;
122+
StructType& operator=(const StructType&);
123+
StructType& operator=(StructType&&) noexcept;
112124

113125
TypeId type_id() const override;
114126
std::string ToString() const override;
@@ -124,8 +136,7 @@ class ICEBERG_EXPORT StructType : public NestedType {
124136

125137
protected:
126138
bool Equals(const Type& other) const override;
127-
// TODO(nullccxsy): Lazy initialization has concurrency issues, need to add proper
128-
// synchronization mechanism
139+
129140
Status InitFieldById() const;
130141
Status InitFieldByName() const;
131142
Status InitFieldByLowerCaseName() const;
@@ -134,6 +145,10 @@ class ICEBERG_EXPORT StructType : public NestedType {
134145
mutable std::unordered_map<int32_t, SchemaFieldConstRef> field_by_id_;
135146
mutable std::unordered_map<std::string_view, SchemaFieldConstRef> field_by_name_;
136147
mutable std::unordered_map<std::string, SchemaFieldConstRef> field_by_lowercase_name_;
148+
149+
mutable std::once_flag field_by_id_flag_;
150+
mutable std::once_flag field_by_name_flag_;
151+
mutable std::once_flag field_by_lowercase_name_flag_;
137152
};
138153

139154
/// \brief A data type representing a list of values.

test/schema_test.cc

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <format>
2323
#include <memory>
24+
#include <thread>
2425

2526
#include <gmock/gmock.h>
2627
#include <gtest/gtest.h>
@@ -490,3 +491,94 @@ TEST(SchemaTest, NestedDuplicateFieldIdError) {
490491
EXPECT_THAT(result.error().message,
491492
::testing::HasSubstr("Duplicate field id found: 1"));
492493
}
494+
495+
// Thread safety tests for Lazy Init
496+
class SchemaThreadSafetyTest : public ::testing::Test {
497+
protected:
498+
void SetUp() override {
499+
field1_ = std::make_unique<iceberg::SchemaField>(1, "id", iceberg::int32(), true);
500+
field2_ = std::make_unique<iceberg::SchemaField>(2, "name", iceberg::string(), true);
501+
field3_ = std::make_unique<iceberg::SchemaField>(3, "age", iceberg::int32(), true);
502+
schema_ = std::make_unique<iceberg::Schema>(
503+
std::vector<iceberg::SchemaField>{*field1_, *field2_, *field3_}, 100);
504+
}
505+
506+
std::unique_ptr<iceberg::Schema> schema_;
507+
std::unique_ptr<iceberg::SchemaField> field1_;
508+
std::unique_ptr<iceberg::SchemaField> field2_;
509+
std::unique_ptr<iceberg::SchemaField> field3_;
510+
};
511+
512+
TEST_F(SchemaThreadSafetyTest, ConcurrentFindFieldById) {
513+
const int num_threads = 10;
514+
const int iterations_per_thread = 100;
515+
std::vector<std::thread> threads;
516+
517+
for (int i = 0; i < num_threads; ++i) {
518+
threads.emplace_back([this, iterations_per_thread]() {
519+
for (int j = 0; j < iterations_per_thread; ++j) {
520+
ASSERT_THAT(schema_->FindFieldById(1), ::testing::Optional(*field1_));
521+
ASSERT_THAT(schema_->FindFieldById(999), ::testing::Optional(std::nullopt));
522+
}
523+
});
524+
}
525+
526+
for (auto& thread : threads) {
527+
thread.join();
528+
}
529+
}
530+
531+
TEST_F(SchemaThreadSafetyTest, MixedConcurrentOperations) {
532+
const int num_threads = 8;
533+
const int iterations_per_thread = 50;
534+
std::vector<std::thread> threads;
535+
536+
for (int i = 0; i < num_threads; ++i) {
537+
threads.emplace_back([this, iterations_per_thread, i]() {
538+
for (int j = 0; j < iterations_per_thread; ++j) {
539+
if (i % 4 == 0) {
540+
ASSERT_THAT(schema_->FindFieldById(1), ::testing::Optional(*field1_));
541+
} else if (i % 4 == 1) {
542+
ASSERT_THAT(schema_->FindFieldByName("name", true),
543+
::testing::Optional(*field2_));
544+
} else if (i % 4 == 2) {
545+
ASSERT_THAT(schema_->FindFieldByName("AGE", false),
546+
::testing::Optional(*field3_));
547+
} else {
548+
ASSERT_THAT(schema_->FindFieldById(2), ::testing::Optional(*field2_));
549+
ASSERT_THAT(schema_->FindFieldByName("id", true),
550+
::testing::Optional(*field1_));
551+
ASSERT_THAT(schema_->FindFieldByName("age", false),
552+
::testing::Optional(*field3_));
553+
}
554+
}
555+
});
556+
}
557+
558+
for (auto& thread : threads) {
559+
thread.join();
560+
}
561+
}
562+
563+
TEST_F(SchemaThreadSafetyTest, CopyAndConcurrentAccess) {
564+
const int num_threads = 5;
565+
const int iterations_per_thread = 20;
566+
std::vector<std::thread> threads;
567+
auto schema_copy = *schema_;
568+
569+
for (int i = 0; i < num_threads; ++i) {
570+
threads.emplace_back([this, &schema_copy, iterations_per_thread, i]() {
571+
for (int j = 0; j < iterations_per_thread; ++j) {
572+
if (i % 2 == 0) {
573+
ASSERT_THAT(schema_->FindFieldById(1), ::testing::Optional(*field1_));
574+
} else {
575+
ASSERT_THAT(schema_copy.FindFieldById(1), ::testing::Optional(*field1_));
576+
}
577+
}
578+
});
579+
}
580+
581+
for (auto& thread : threads) {
582+
thread.join();
583+
}
584+
}

0 commit comments

Comments
 (0)