Skip to content

Commit 642de87

Browse files
committed
feat(catalog): add LoadTableSchema interface
1 parent f19f854 commit 642de87

File tree

9 files changed

+289
-0
lines changed

9 files changed

+289
-0
lines changed

include/paimon/catalog/catalog.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "paimon/catalog/identifier.h"
2525
#include "paimon/result.h"
26+
#include "paimon/schema/schema.h"
2627
#include "paimon/status.h"
2728
#include "paimon/type_fwd.h"
2829
#include "paimon/visibility.h"
@@ -93,6 +94,16 @@ class PAIMON_EXPORT Catalog {
9394
/// @return A result containing a vector of table names in the specified database, or an error
9495
/// status.
9596
virtual Result<std::vector<std::string>> ListTables(const std::string& db_name) const = 0;
97+
98+
/// Loads the latest schema of a specified table.
99+
///
100+
/// @note System tables will not be supported.
101+
///
102+
/// @param identifier The identifier (database and table name) of the table to load.
103+
/// @return A result containing table schema if the table exists, or std::nullopt if it
104+
/// doesn't, or an error status on failure.
105+
virtual Result<std::optional<std::shared_ptr<Schema>>> LoadTableSchema(
106+
const Identifier& identifier) const = 0;
96107
};
97108

98109
} // namespace paimon

include/paimon/schema/schema.h

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2025-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <map>
20+
#include <memory>
21+
#include <string>
22+
#include <vector>
23+
24+
#include "arrow/api.h"
25+
#include "arrow/c/bridge.h"
26+
#include "paimon/result.h"
27+
#include "paimon/visibility.h"
28+
struct ArrowSchema;
29+
30+
namespace paimon {
31+
class Catalog;
32+
class Identifier;
33+
/// This interface provides access to TableSchema-related information.
34+
class PAIMON_EXPORT Schema {
35+
public:
36+
Schema();
37+
~Schema();
38+
39+
public:
40+
/// Get the Arrow C schema representation of this table schema.
41+
/// @return A result containing an ArrowSchema, or an error status if conversion fails.
42+
Result<std::unique_ptr<::ArrowSchema>> GetArrowSchema() const;
43+
44+
/// Get the names of all fields in the table schema.
45+
/// @return A vector of field names.
46+
std::vector<std::string> FieldNames() const;
47+
48+
/// Get the unique identifier of this table schema.
49+
/// @return The schema ID
50+
int64_t Id() const;
51+
52+
/// Get the list of primary key field names.
53+
/// @return A reference to the vector of primary key names; empty if no primary keys are
54+
/// defined.
55+
const std::vector<std::string>& PrimaryKeys() const;
56+
57+
/// Get the list of partition key field names.
58+
/// @return A reference to the vector of partition key names; empty if the table is not
59+
/// partitioned.
60+
const std::vector<std::string>& PartitionKeys() const;
61+
62+
/// Get the list of bucket key field names used for bucketing.
63+
/// @return A reference to the vector of bucket key names.
64+
const std::vector<std::string>& BucketKeys() const;
65+
66+
/// Get the number of buckets configured for this table.
67+
/// @return The number of buckets;
68+
int32_t NumBuckets() const;
69+
70+
/// Get the highest field ID assigned in this schema.
71+
/// @return The maximum field ID.
72+
int32_t HighestFieldId() const;
73+
74+
/// Get the table-level options associated with this schema.
75+
/// @return A reference to the map of option key-value pairs (e.g., file format, filesystem).
76+
const std::map<std::string, std::string>& Options();
77+
78+
/// Get an optional comment describing the table.
79+
/// @return The table comment if set, or std::nullopt otherwise.
80+
std::optional<std::string> Comment();
81+
82+
public:
83+
/// Initializes the internal implementation of the Schema
84+
/// @param catalog Pointer to the Catalog instance.
85+
/// @param identifier The identifier of the table whose schema is to be loaded.
86+
/// @return A result indicating success (true) or failure (false).
87+
88+
Result<bool> MakeImpl(const Catalog* catalog, const Identifier& identifier);
89+
90+
class Impl;
91+
92+
private:
93+
std::unique_ptr<Impl> impl_;
94+
};
95+
96+
} // namespace paimon

src/paimon/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ set(PAIMON_CORE_SRCS
218218
core/schema/schema_manager.cpp
219219
core/schema/schema_validation.cpp
220220
core/schema/table_schema.cpp
221+
core/schema/schema.cpp
221222
core/snapshot.cpp
222223
core/stats/simple_stats_collector.cpp
223224
core/stats/simple_stats_converter.cpp

src/paimon/core/catalog/file_system_catalog.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,17 @@ Result<bool> FileSystemCatalog::TableExistsInFileSystem(const std::string& table
211211
}
212212
}
213213

214+
Result<std::optional<std::shared_ptr<Schema>>> FileSystemCatalog::LoadTableSchema(
215+
const Identifier& identifier) const {
216+
if (IsSystemTable(identifier)) {
217+
return Status::NotImplemented("do not support loading schema for system table.");
218+
}
219+
std::shared_ptr<Schema> schema = std::make_shared<Schema>();
220+
PAIMON_ASSIGN_OR_RAISE(bool impl_status, schema->MakeImpl(this, identifier));
221+
if (impl_status) {
222+
return std::optional<std::shared_ptr<Schema>>(schema);
223+
}
224+
return std::optional<std::shared_ptr<Schema>>();
225+
}
226+
214227
} // namespace paimon

src/paimon/core/catalog/file_system_catalog.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ class FileSystemCatalog : public Catalog {
4949

5050
Result<std::vector<std::string>> ListDatabases() const override;
5151
Result<std::vector<std::string>> ListTables(const std::string& database_names) const override;
52+
Result<std::optional<std::shared_ptr<Schema>>> LoadTableSchema(
53+
const Identifier& identifier) const override;
54+
std::shared_ptr<FileSystem> GetFileSystem() const {
55+
return fs_;
56+
}
57+
const std::string& Warehouse() const {
58+
return warehouse_;
59+
}
5260

5361
private:
5462
static std::string NewDatabasePath(const std::string& warehouse, const std::string& db_name);

src/paimon/core/catalog/file_system_catalog_test.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,12 @@ TEST(FileSystemCatalogTest, TestCreateTableWithBlob) {
180180
ASSERT_OK_AND_ASSIGN(std::vector<std::string> table_names, catalog.ListTables("db1"));
181181
ASSERT_EQ(1, table_names.size());
182182
ASSERT_EQ(table_names[0], "tbl1");
183+
ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<Schema>> table_schema,
184+
catalog.LoadTableSchema(Identifier("db1", "tbl1")));
185+
ASSERT_TRUE(table_schema.has_value());
186+
ASSERT_OK_AND_ASSIGN(auto arrow_schema, (*table_schema)->GetArrowSchema());
187+
auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie();
188+
ASSERT_TRUE(typed_schema.Equals(loaded_schema));
183189
ArrowSchemaRelease(&schema);
184190
}
185191

@@ -309,4 +315,56 @@ TEST(FileSystemCatalogTest, TestInvalidList) {
309315
"do not support listing tables for system database.");
310316
}
311317

318+
TEST(FileSystemCatalogTest, TestValidateTableSchema) {
319+
std::map<std::string, std::string> options;
320+
options[Options::FILE_SYSTEM] = "local";
321+
options[Options::FILE_FORMAT] = "orc";
322+
ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options));
323+
auto dir = UniqueTestDirectory::Create();
324+
ASSERT_TRUE(dir);
325+
FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str());
326+
{
327+
ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true));
328+
arrow::FieldVector fields = {
329+
arrow::field("f0", arrow::utf8()),
330+
arrow::field("f1", arrow::int32()),
331+
arrow::field("f2", arrow::int32()),
332+
arrow::field("f3", arrow::float64()),
333+
};
334+
arrow::Schema typed_schema(fields);
335+
::ArrowSchema schema;
336+
ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
337+
ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, {"f1"}, {}, options,
338+
/*ignore_if_exists=*/false));
339+
340+
ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<Schema>> table_schema,
341+
catalog.LoadTableSchema(Identifier("db0", "tbl0")));
342+
ASSERT_FALSE(table_schema.has_value());
343+
ASSERT_OK_AND_ASSIGN(table_schema, catalog.LoadTableSchema(Identifier("db1", "tbl1")));
344+
ASSERT_TRUE(table_schema.has_value());
345+
ASSERT_EQ(0, (*table_schema)->Id());
346+
ASSERT_EQ(3, (*table_schema)->HighestFieldId());
347+
ASSERT_EQ(1, (*table_schema)->PartitionKeys().size());
348+
ASSERT_EQ(0, (*table_schema)->PrimaryKeys().size());
349+
ASSERT_EQ(-1, (*table_schema)->NumBuckets());
350+
ASSERT_FALSE((*table_schema)->Comment().has_value());
351+
std::vector<std::string> fieldNames = (*table_schema)->FieldNames();
352+
std::vector<std::string> expectedFieldNames = {"f0", "f1", "f2", "f3"};
353+
ASSERT_EQ(fieldNames, expectedFieldNames);
354+
355+
ASSERT_OK_AND_ASSIGN(auto arrow_schema, (*table_schema)->GetArrowSchema());
356+
auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie();
357+
ASSERT_TRUE(typed_schema.Equals(loaded_schema));
358+
359+
ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {}));
360+
ASSERT_OK(fs->Delete(PathUtil::JoinPath(dir->Str(), "db1.db/tbl1/schema/schema-0")));
361+
ASSERT_OK_AND_ASSIGN(table_schema, catalog.LoadTableSchema(Identifier("db1", "tbl1")));
362+
ASSERT_FALSE(table_schema.has_value());
363+
364+
ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")),
365+
"do not support loading schema for system table.");
366+
ArrowSchemaRelease(&schema);
367+
}
368+
}
369+
312370
} // namespace paimon::test

src/paimon/core/schema/schema.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2025-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "paimon/schema/schema.h"
18+
19+
#include "paimon/catalog/identifier.h"
20+
#include "paimon/core/catalog/file_system_catalog.h"
21+
#include "paimon/core/schema/schema_manager.h"
22+
#include "paimon/core/schema/table_schema.h"
23+
24+
namespace paimon {
25+
26+
class Schema::Impl {
27+
public:
28+
std::shared_ptr<TableSchema> table_schema_;
29+
};
30+
31+
Schema::Schema() = default;
32+
33+
Schema::~Schema() = default;
34+
35+
Result<bool> Schema::MakeImpl(const Catalog* catalog, const Identifier& identifier) {
36+
auto fs_catalog = dynamic_cast<const FileSystemCatalog*>(catalog);
37+
if (fs_catalog == nullptr) {
38+
return false;
39+
}
40+
impl_ = std::make_unique<Impl>();
41+
SchemaManager schema_manager(fs_catalog->GetFileSystem(),
42+
fs_catalog->NewDataTablePath(fs_catalog->Warehouse(), identifier));
43+
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
44+
schema_manager.Latest());
45+
if (latest_schema.has_value()) {
46+
impl_->table_schema_ = latest_schema.value();
47+
return true;
48+
}
49+
return false;
50+
}
51+
52+
Result<std::unique_ptr<::ArrowSchema>> Schema::GetArrowSchema() const {
53+
return impl_->table_schema_->GetArrowSchema();
54+
}
55+
56+
std::vector<std::string> Schema::FieldNames() const {
57+
return impl_->table_schema_->FieldNames();
58+
}
59+
60+
int64_t Schema::Id() const {
61+
return impl_->table_schema_->Id();
62+
}
63+
64+
const std::vector<std::string>& Schema::PrimaryKeys() const {
65+
return impl_->table_schema_->PrimaryKeys();
66+
}
67+
68+
const std::vector<std::string>& Schema::PartitionKeys() const {
69+
return impl_->table_schema_->PartitionKeys();
70+
}
71+
72+
const std::vector<std::string>& Schema::BucketKeys() const {
73+
return impl_->table_schema_->BucketKeys();
74+
}
75+
76+
int32_t Schema::NumBuckets() const {
77+
return impl_->table_schema_->NumBuckets();
78+
}
79+
80+
int32_t Schema::HighestFieldId() const {
81+
return impl_->table_schema_->HighestFieldId();
82+
}
83+
84+
const std::map<std::string, std::string>& Schema::Options() {
85+
return impl_->table_schema_->Options();
86+
}
87+
88+
std::optional<std::string> Schema::Comment() {
89+
return impl_->table_schema_->Commit();
90+
}
91+
92+
} // namespace paimon

src/paimon/core/schema/table_schema.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <utility>
2323

2424
#include "arrow/api.h"
25+
#include "arrow/c/bridge.h"
2526
#include "arrow/util/checked_cast.h"
2627
#include "fmt/format.h"
2728
#include "paimon/common/utils/arrow/status_utils.h"
@@ -333,4 +334,11 @@ bool TableSchema::CrossPartitionUpdate() const {
333334
return !ObjectUtils::ContainsAll(primary_keys_, partition_keys_);
334335
}
335336

337+
Result<std::unique_ptr<::ArrowSchema>> TableSchema::GetArrowSchema() const {
338+
std::shared_ptr<arrow::Schema> schema = DataField::ConvertDataFieldsToArrowSchema(fields_);
339+
auto arrow_schema = std::make_unique<::ArrowSchema>();
340+
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, arrow_schema.get()));
341+
return arrow_schema;
342+
}
343+
336344
} // namespace paimon

src/paimon/core/schema/table_schema.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ class TableSchema : public Jsonizable<TableSchema> {
9898

9999
bool CrossPartitionUpdate() const;
100100

101+
Result<std::unique_ptr<::ArrowSchema>> GetArrowSchema() const;
102+
101103
private:
102104
JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(TableSchema);
103105

0 commit comments

Comments
 (0)