Skip to content

Commit 6206797

Browse files
committed
first version of rest catalog
1 parent ad5dadc commit 6206797

File tree

12 files changed

+1052
-129
lines changed

12 files changed

+1052
-129
lines changed

src/iceberg/catalog/rest/CMakeLists.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
set(ICEBERG_REST_SOURCES rest_catalog.cc json_internal.cc validator.cc)
18+
set(ICEBERG_REST_SOURCES
19+
catalog.cc
20+
json_internal.cc
21+
validator.cc
22+
config.cc
23+
http_client_internal.cc)
1924

2025
set(ICEBERG_REST_STATIC_BUILD_INTERFACE_LIBS)
2126
set(ICEBERG_REST_SHARED_BUILD_INTERFACE_LIBS)
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/catalog/rest/catalog.h"
21+
22+
#include <memory>
23+
#include <utility>
24+
25+
#include <cpr/cpr.h>
26+
27+
#include "iceberg/catalog/rest/config.h"
28+
#include "iceberg/catalog/rest/http_client_interal.h"
29+
#include "iceberg/catalog/rest/json_internal.h"
30+
#include "iceberg/catalog/rest/types.h"
31+
#include "iceberg/catalog/rest/util.h"
32+
#include "iceberg/json_internal.h"
33+
#include "iceberg/result.h"
34+
#include "iceberg/table.h"
35+
#include "iceberg/util/macros.h"
36+
37+
namespace iceberg::rest {
38+
39+
Result<RestCatalog> RestCatalog::Make(RestCatalogConfig config) {
40+
// Validate that uri is not empty
41+
if (config.uri.empty()) {
42+
return InvalidArgument("Rest catalog configuration property 'uri' is required.");
43+
}
44+
ICEBERG_ASSIGN_OR_RAISE(auto tmp_client, HttpClient::Make(config));
45+
const std::string endpoint = config.GetConfigEndpoint();
46+
cpr::Parameters params;
47+
if (config.warehouse.has_value()) {
48+
params.Add({"warehouse", config.warehouse.value()});
49+
}
50+
ICEBERG_ASSIGN_OR_RAISE(const auto& response, tmp_client->Get(endpoint, params));
51+
switch (response.status_code) {
52+
case cpr::status::HTTP_OK: {
53+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
54+
ICEBERG_ASSIGN_OR_RAISE(auto server_config, CatalogConfigFromJson(json));
55+
// Merge server config into client config, server config overrides > client config
56+
// properties > server config defaults
57+
auto final_props = std::move(server_config.defaults);
58+
for (const auto& kv : config.properties_) {
59+
final_props.insert_or_assign(kv.first, kv.second);
60+
}
61+
62+
for (const auto& kv : server_config.overrides) {
63+
final_props.insert_or_assign(kv.first, kv.second);
64+
}
65+
RestCatalogConfig final_config = {
66+
.uri = config.uri,
67+
.name = config.name,
68+
.warehouse = config.warehouse,
69+
.properties_ = std::move(final_props),
70+
};
71+
ICEBERG_ASSIGN_OR_RAISE(auto client, HttpClient::Make(final_config));
72+
return RestCatalog(std::make_shared<RestCatalogConfig>(std::move(final_config)),
73+
std::move(client));
74+
};
75+
default: {
76+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
77+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ErrorResponseFromJson(json));
78+
return UnknownError("Error listing namespaces: {}", list_response.error.message);
79+
}
80+
}
81+
}
82+
83+
RestCatalog::RestCatalog(std::shared_ptr<RestCatalogConfig> config,
84+
std::unique_ptr<HttpClient> client)
85+
: config_(std::move(config)), client_(std::move(client)) {}
86+
87+
std::string_view RestCatalog::name() const {
88+
return config_->name.has_value() ? std::string_view(*config_->name)
89+
: std::string_view("");
90+
}
91+
92+
Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
93+
const std::string endpoint = config_->GetNamespacesEndpoint();
94+
std::vector<Namespace> result;
95+
std::string next_token;
96+
while (true) {
97+
cpr::Parameters params;
98+
if (!ns.levels.empty()) {
99+
params.Add({"parent", EncodeNamespaceForUrl(ns)});
100+
}
101+
if (!next_token.empty()) {
102+
params.Add({"page_token", next_token});
103+
}
104+
ICEBERG_ASSIGN_OR_RAISE(const auto& response, client_->Get(endpoint, params));
105+
switch (response.status_code) {
106+
case cpr::status::HTTP_OK: {
107+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
108+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListNamespacesResponseFromJson(json));
109+
result.insert(result.end(), list_response.namespaces.begin(),
110+
list_response.namespaces.end());
111+
if (list_response.next_page_token.empty()) {
112+
return result;
113+
}
114+
next_token = list_response.next_page_token;
115+
continue;
116+
}
117+
case cpr::status::HTTP_NOT_FOUND: {
118+
return NoSuchNamespace("Namespace not found");
119+
}
120+
default:
121+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
122+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ErrorResponseFromJson(json));
123+
return UnknownError("Error listing namespaces: {}", list_response.error.message);
124+
}
125+
}
126+
}
127+
128+
Status RestCatalog::CreateNamespace(
129+
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
130+
return NotImplemented("Not implemented");
131+
}
132+
133+
Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespaceProperties(
134+
const Namespace& ns) const {
135+
return NotImplemented("Not implemented");
136+
}
137+
138+
Status RestCatalog::DropNamespace(const Namespace& ns) {
139+
return NotImplemented("Not implemented");
140+
}
141+
142+
Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
143+
return NotImplemented("Not implemented");
144+
}
145+
146+
Status RestCatalog::UpdateNamespaceProperties(
147+
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
148+
const std::unordered_set<std::string>& removals) {
149+
return NotImplemented("Not implemented");
150+
}
151+
152+
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
153+
return NotImplemented("Not implemented");
154+
}
155+
156+
Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
157+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
158+
const std::string& location,
159+
const std::unordered_map<std::string, std::string>& properties) {
160+
return NotImplemented("Not implemented");
161+
}
162+
163+
Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
164+
const TableIdentifier& identifier,
165+
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
166+
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
167+
return NotImplemented("Not implemented");
168+
}
169+
170+
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
171+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
172+
const std::string& location,
173+
const std::unordered_map<std::string, std::string>& properties) {
174+
return NotImplemented("Not implemented");
175+
}
176+
177+
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
178+
return NotImplemented("Not implemented");
179+
}
180+
181+
Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
182+
return NotImplemented("Not implemented");
183+
}
184+
185+
Result<std::unique_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
186+
return NotImplemented("Not implemented");
187+
}
188+
189+
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
190+
const TableIdentifier& identifier, const std::string& metadata_file_location) {
191+
return NotImplemented("Not implemented");
192+
}
193+
194+
std::unique_ptr<RestCatalog::TableBuilder> RestCatalog::BuildTable(
195+
const TableIdentifier& identifier, const Schema& schema) const {
196+
return nullptr;
197+
}
198+
199+
} // namespace iceberg::rest

src/iceberg/catalog/rest/catalog.h

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include <cpr/cpr.h>
26+
27+
#include "iceberg/catalog.h"
28+
#include "iceberg/catalog/rest/config.h"
29+
#include "iceberg/catalog/rest/http_client_interal.h"
30+
#include "iceberg/catalog/rest/iceberg_rest_export.h"
31+
#include "iceberg/catalog/rest/types.h"
32+
#include "iceberg/result.h"
33+
34+
/// \file iceberg/catalog/rest/catalog.h
35+
/// RestCatalog implementation for Iceberg REST API.
36+
37+
namespace iceberg::rest {
38+
39+
class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
40+
public:
41+
RestCatalog(const RestCatalog&) = delete;
42+
RestCatalog& operator=(const RestCatalog&) = delete;
43+
RestCatalog(RestCatalog&&) = default;
44+
RestCatalog& operator=(RestCatalog&&) = default;
45+
46+
/// \brief Create a RestCatalog instance
47+
///
48+
/// \param config the configuration for the RestCatalog
49+
/// \return a RestCatalog instance
50+
static Result<RestCatalog> Make(RestCatalogConfig config);
51+
52+
/// \brief Return the name for this catalog
53+
std::string_view name() const override;
54+
55+
/// \brief List child namespaces from the given namespace.
56+
Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const override;
57+
58+
/// \brief Create a namespace with associated properties.
59+
Status CreateNamespace(
60+
const Namespace& ns,
61+
const std::unordered_map<std::string, std::string>& properties) override;
62+
63+
/// \brief Get metadata properties for a namespace.
64+
Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
65+
const Namespace& ns) const override;
66+
67+
/// \brief Drop a namespace.
68+
Status DropNamespace(const Namespace& ns) override;
69+
70+
/// \brief Check whether the namespace exists.
71+
Result<bool> NamespaceExists(const Namespace& ns) const override;
72+
73+
/// \brief Update a namespace's properties by applying additions and removals.
74+
Status UpdateNamespaceProperties(
75+
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
76+
const std::unordered_set<std::string>& removals) override;
77+
78+
/// \brief Return all the identifiers under this namespace
79+
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
80+
81+
/// \brief Create a table
82+
Result<std::unique_ptr<Table>> CreateTable(
83+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
84+
const std::string& location,
85+
const std::unordered_map<std::string, std::string>& properties) override;
86+
87+
/// \brief Update a table
88+
///
89+
/// \param identifier a table identifier
90+
/// \param requirements a list of table requirements
91+
/// \param updates a list of table updates
92+
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
93+
Result<std::unique_ptr<Table>> UpdateTable(
94+
const TableIdentifier& identifier,
95+
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
96+
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
97+
98+
/// \brief Start a transaction to create a table
99+
///
100+
/// \param identifier a table identifier
101+
/// \param schema a schema
102+
/// \param spec a partition spec
103+
/// \param location a location for the table; leave empty if unspecified
104+
/// \param properties a string map of table properties
105+
/// \return a Transaction to create the table or ErrorKind::kAlreadyExists if the
106+
/// table already exists
107+
Result<std::shared_ptr<Transaction>> StageCreateTable(
108+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
109+
const std::string& location,
110+
const std::unordered_map<std::string, std::string>& properties) override;
111+
112+
/// \brief Check whether table exists
113+
///
114+
/// \param identifier a table identifier
115+
/// \return Result<bool> indicating table exists or not.
116+
/// - On success, the table existence was successfully checked (actual
117+
/// existence may be inferred elsewhere).
118+
/// - On failure, contains error information.
119+
Result<bool> TableExists(const TableIdentifier& identifier) const override;
120+
121+
/// \brief Drop a table; optionally delete data and metadata files
122+
///
123+
/// If purge is set to true the implementation should delete all data and metadata
124+
/// files.
125+
///
126+
/// \param identifier a table identifier
127+
/// \param purge if true, delete all data and metadata files in the table
128+
/// \return Status indicating the outcome of the operation.
129+
/// - On success, the table was dropped (or did not exist).
130+
/// - On failure, contains error information.
131+
Status DropTable(const TableIdentifier& identifier, bool purge) override;
132+
133+
/// \brief Load a table
134+
///
135+
/// \param identifier a table identifier
136+
/// \return instance of Table implementation referred to by identifier or
137+
/// ErrorKind::kNoSuchTable if the table does not exist
138+
Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
139+
140+
/// \brief Register a table with the catalog if it does not exist
141+
///
142+
/// \param identifier a table identifier
143+
/// \param metadata_file_location the location of a metadata file
144+
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
145+
Result<std::shared_ptr<Table>> RegisterTable(
146+
const TableIdentifier& identifier,
147+
const std::string& metadata_file_location) override;
148+
149+
/// \brief A builder used to create valid tables or start create/replace transactions
150+
///
151+
/// \param identifier a table identifier
152+
/// \param schema a schema
153+
/// \return the builder to create a table or start a create/replace transaction
154+
std::unique_ptr<RestCatalog::TableBuilder> BuildTable(
155+
const TableIdentifier& identifier, const Schema& schema) const override;
156+
157+
private:
158+
RestCatalog(std::shared_ptr<RestCatalogConfig> config,
159+
std::unique_ptr<HttpClient> client);
160+
161+
std::shared_ptr<RestCatalogConfig> config_;
162+
std::unique_ptr<HttpClient> client_;
163+
};
164+
165+
} // namespace iceberg::rest

0 commit comments

Comments
 (0)