Skip to content

Commit af746c7

Browse files
authored
refactor: unify lazy init for table (#344)
Introduced a `TableMetadataCache` to support lazy initialization of mappings for schemas, partition specs, sort orders and snapshots.
1 parent 3855012 commit af746c7

File tree

5 files changed

+125
-55
lines changed

5 files changed

+125
-55
lines changed

src/iceberg/table.cc

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ Table::Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata
4242
metadata_location_(std::move(metadata_location)),
4343
io_(std::move(io)),
4444
catalog_(std::move(catalog)),
45-
properties_(TableProperties::FromMap(metadata_->properties)) {}
45+
properties_(TableProperties::FromMap(metadata_->properties)),
46+
metadata_cache_(std::make_unique<TableMetadataCache>(metadata_.get())) {}
4647

4748
const std::string& Table::uuid() const { return metadata_->table_uuid; }
4849

@@ -57,60 +58,36 @@ Status Table::Refresh() {
5758
metadata_location_ = std::move(refreshed_table->metadata_location_);
5859
io_ = std::move(refreshed_table->io_);
5960
properties_ = std::move(refreshed_table->properties_);
60-
61-
schemas_map_.reset();
62-
partition_spec_map_.reset();
63-
sort_orders_map_.reset();
61+
metadata_cache_ = std::make_unique<TableMetadataCache>(metadata_.get());
6462
}
6563
return {};
6664
}
6765

6866
Result<std::shared_ptr<Schema>> Table::schema() const { return metadata_->Schema(); }
6967

70-
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>&
68+
Result<std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<Schema>>>>
7169
Table::schemas() const {
72-
if (!schemas_map_) {
73-
schemas_map_ =
74-
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<Schema>>>();
75-
for (const auto& schema : metadata_->schemas) {
76-
if (schema->schema_id()) {
77-
schemas_map_->emplace(schema->schema_id().value(), schema);
78-
}
79-
}
80-
}
81-
return schemas_map_;
70+
return metadata_cache_->GetSchemasById();
8271
}
8372

8473
Result<std::shared_ptr<PartitionSpec>> Table::spec() const {
8574
return metadata_->PartitionSpec();
8675
}
8776

88-
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
77+
Result<std::reference_wrapper<
78+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
8979
Table::specs() const {
90-
if (!partition_spec_map_) {
91-
partition_spec_map_ =
92-
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>();
93-
for (const auto& spec : metadata_->partition_specs) {
94-
partition_spec_map_->emplace(spec->spec_id(), spec);
95-
}
96-
}
97-
return partition_spec_map_;
80+
return metadata_cache_->GetPartitionSpecsById();
9881
}
9982

10083
Result<std::shared_ptr<SortOrder>> Table::sort_order() const {
10184
return metadata_->SortOrder();
10285
}
10386

104-
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
87+
Result<
88+
std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
10589
Table::sort_orders() const {
106-
if (!sort_orders_map_) {
107-
sort_orders_map_ =
108-
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>();
109-
for (const auto& order : metadata_->sort_orders) {
110-
sort_orders_map_->emplace(order->order_id(), order);
111-
}
112-
}
113-
return sort_orders_map_;
90+
return metadata_cache_->GetSortOrdersById();
11491
}
11592

11693
const TableProperties& Table::properties() const { return *properties_; }

src/iceberg/table.h

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#pragma once
2121

22+
#include <functional>
2223
#include <memory>
2324
#include <string>
2425
#include <unordered_map>
@@ -60,24 +61,24 @@ class ICEBERG_EXPORT Table {
6061
Result<std::shared_ptr<Schema>> schema() const;
6162

6263
/// \brief Return a map of schema for this table
63-
/// \note This method is **not** thread-safe in the current implementation.
64-
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>& schemas()
65-
const;
64+
Result<
65+
std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<Schema>>>>
66+
schemas() const;
6667

6768
/// \brief Return the partition spec for this table, return NotFoundError if not found
6869
Result<std::shared_ptr<PartitionSpec>> spec() const;
6970

7071
/// \brief Return a map of partition specs for this table
71-
/// \note This method is **not** thread-safe in the current implementation.
72-
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
72+
Result<std::reference_wrapper<
73+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
7374
specs() const;
7475

7576
/// \brief Return the sort order for this table, return NotFoundError if not found
7677
Result<std::shared_ptr<SortOrder>> sort_order() const;
7778

7879
/// \brief Return a map of sort order IDs to sort orders for this table
79-
/// \note This method is **not** thread-safe in the current implementation.
80-
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
80+
Result<std::reference_wrapper<
81+
const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
8182
sort_orders() const;
8283

8384
/// \brief Return a map of string properties for this table
@@ -124,14 +125,7 @@ class ICEBERG_EXPORT Table {
124125
std::shared_ptr<FileIO> io_;
125126
std::shared_ptr<Catalog> catalog_;
126127
std::unique_ptr<TableProperties> properties_;
127-
128-
// Cache lazy-initialized maps.
129-
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>
130-
schemas_map_;
131-
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>
132-
partition_spec_map_;
133-
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>
134-
sort_orders_map_;
128+
std::unique_ptr<class TableMetadataCache> metadata_cache_;
135129
};
136130

137131
} // namespace iceberg

src/iceberg/table_metadata.cc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,70 @@ bool operator==(const TableMetadata& lhs, const TableMetadata& rhs) {
162162
lhs.next_row_id == rhs.next_row_id;
163163
}
164164

165+
// TableMetadataCache implementation
166+
167+
Result<TableMetadataCache::SchemasMapRef> TableMetadataCache::GetSchemasById() const {
168+
return schemas_map_.Get(metadata_);
169+
}
170+
171+
Result<TableMetadataCache::PartitionSpecsMapRef>
172+
TableMetadataCache::GetPartitionSpecsById() const {
173+
return partition_specs_map_.Get(metadata_);
174+
}
175+
176+
Result<TableMetadataCache::SortOrdersMapRef> TableMetadataCache::GetSortOrdersById()
177+
const {
178+
return sort_orders_map_.Get(metadata_);
179+
}
180+
181+
Result<TableMetadataCache::SnapshotsMapRef> TableMetadataCache::GetSnapshotsById() const {
182+
return snapshot_map_.Get(metadata_);
183+
}
184+
185+
Result<TableMetadataCache::SchemasMap> TableMetadataCache::InitSchemasMap(
186+
const TableMetadata* metadata) {
187+
SchemasMap schemas_map;
188+
schemas_map.reserve(metadata->schemas.size());
189+
for (const auto& schema : metadata->schemas) {
190+
if (schema->schema_id()) {
191+
schemas_map.emplace(schema->schema_id().value(), schema);
192+
}
193+
}
194+
return schemas_map;
195+
}
196+
197+
Result<TableMetadataCache::PartitionSpecsMap> TableMetadataCache::InitPartitionSpecsMap(
198+
const TableMetadata* metadata) {
199+
PartitionSpecsMap partition_specs_map;
200+
partition_specs_map.reserve(metadata->partition_specs.size());
201+
for (const auto& spec : metadata->partition_specs) {
202+
partition_specs_map.emplace(spec->spec_id(), spec);
203+
}
204+
return partition_specs_map;
205+
}
206+
207+
Result<TableMetadataCache::SortOrdersMap> TableMetadataCache::InitSortOrdersMap(
208+
const TableMetadata* metadata) {
209+
SortOrdersMap sort_orders_map;
210+
sort_orders_map.reserve(metadata->sort_orders.size());
211+
for (const auto& order : metadata->sort_orders) {
212+
sort_orders_map.emplace(order->order_id(), order);
213+
}
214+
return sort_orders_map;
215+
}
216+
217+
Result<TableMetadataCache::SnapshotsMap> TableMetadataCache::InitSnapshotMap(
218+
const TableMetadata* metadata) {
219+
SnapshotsMap snapshots_map;
220+
snapshots_map.reserve(metadata->snapshots.size());
221+
for (const auto& snapshot : metadata->snapshots) {
222+
snapshots_map.emplace(snapshot->snapshot_id, snapshot);
223+
}
224+
return snapshots_map;
225+
}
226+
227+
// TableMetadataUtil implementation
228+
165229
Result<MetadataFileCodecType> TableMetadataUtil::CodecFromFileName(
166230
std::string_view file_name) {
167231
if (file_name.find(".metadata.json") == std::string::npos) {

src/iceberg/table_metadata.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
#include "iceberg/iceberg_export.h"
3232
#include "iceberg/type_fwd.h"
33+
#include "iceberg/util/lazy.h"
3334
#include "iceberg/util/timepoint.h"
3435

3536
namespace iceberg {
@@ -139,6 +140,40 @@ struct ICEBERG_EXPORT TableMetadata {
139140
const TableMetadata& rhs);
140141
};
141142

143+
// Cache for table metadata mappings to facilitate fast lookups.
144+
class ICEBERG_EXPORT TableMetadataCache {
145+
public:
146+
explicit TableMetadataCache(const TableMetadata* metadata) : metadata_(metadata) {}
147+
148+
template <typename T>
149+
using ByIdMap = std::unordered_map<int32_t, std::shared_ptr<T>>;
150+
using SchemasMap = ByIdMap<Schema>;
151+
using PartitionSpecsMap = ByIdMap<PartitionSpec>;
152+
using SortOrdersMap = ByIdMap<SortOrder>;
153+
using SnapshotsMap = std::unordered_map<int64_t, std::shared_ptr<Snapshot>>;
154+
using SchemasMapRef = std::reference_wrapper<const SchemasMap>;
155+
using PartitionSpecsMapRef = std::reference_wrapper<const PartitionSpecsMap>;
156+
using SortOrdersMapRef = std::reference_wrapper<const SortOrdersMap>;
157+
using SnapshotsMapRef = std::reference_wrapper<const SnapshotsMap>;
158+
159+
Result<SchemasMapRef> GetSchemasById() const;
160+
Result<PartitionSpecsMapRef> GetPartitionSpecsById() const;
161+
Result<SortOrdersMapRef> GetSortOrdersById() const;
162+
Result<SnapshotsMapRef> GetSnapshotsById() const;
163+
164+
private:
165+
static Result<SchemasMap> InitSchemasMap(const TableMetadata* metadata);
166+
static Result<PartitionSpecsMap> InitPartitionSpecsMap(const TableMetadata* metadata);
167+
static Result<SortOrdersMap> InitSortOrdersMap(const TableMetadata* metadata);
168+
static Result<SnapshotsMap> InitSnapshotMap(const TableMetadata* metadata);
169+
170+
const TableMetadata* metadata_;
171+
Lazy<InitSchemasMap> schemas_map_;
172+
Lazy<InitPartitionSpecsMap> partition_specs_map_;
173+
Lazy<InitSortOrdersMap> sort_orders_map_;
174+
Lazy<InitSnapshotMap> snapshot_map_;
175+
};
176+
142177
/// \brief Returns a string representation of a SnapshotLogEntry
143178
ICEBERG_EXPORT std::string ToString(const SnapshotLogEntry& entry);
144179

src/iceberg/test/table_test.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,19 @@ TEST(Table, TableV1) {
4949
ASSERT_TRUE(schema.has_value());
5050
ASSERT_EQ(schema.value()->fields().size(), 3);
5151
auto schemas = table.schemas();
52-
ASSERT_TRUE(schemas->empty());
52+
ASSERT_TRUE(schemas->get().empty());
5353

5454
// Check table spec
5555
auto spec = table.spec();
5656
ASSERT_TRUE(spec.has_value());
5757
auto specs = table.specs();
58-
ASSERT_EQ(1UL, specs->size());
58+
ASSERT_EQ(1UL, specs->get().size());
5959

6060
// Check table sort_order
6161
auto sort_order = table.sort_order();
6262
ASSERT_TRUE(sort_order.has_value());
6363
auto sort_orders = table.sort_orders();
64-
ASSERT_EQ(1UL, sort_orders->size());
64+
ASSERT_EQ(1UL, sort_orders->get().size());
6565

6666
// Check table location
6767
auto location = table.location();
@@ -89,19 +89,19 @@ TEST(Table, TableV2) {
8989
ASSERT_TRUE(schema.has_value());
9090
ASSERT_EQ(schema.value()->fields().size(), 3);
9191
auto schemas = table.schemas();
92-
ASSERT_FALSE(schemas->empty());
92+
ASSERT_FALSE(schemas->get().empty());
9393

9494
// Check partition spec
9595
auto spec = table.spec();
9696
ASSERT_TRUE(spec.has_value());
9797
auto specs = table.specs();
98-
ASSERT_EQ(1UL, specs->size());
98+
ASSERT_EQ(1UL, specs->get().size());
9999

100100
// Check sort order
101101
auto sort_order = table.sort_order();
102102
ASSERT_TRUE(sort_order.has_value());
103103
auto sort_orders = table.sort_orders();
104-
ASSERT_EQ(1UL, sort_orders->size());
104+
ASSERT_EQ(1UL, sort_orders->get().size());
105105

106106
// Check table location
107107
auto location = table.location();

0 commit comments

Comments
 (0)