Skip to content

Commit 63047b9

Browse files
committed
feat: first version of rest catalog
1 parent 6fe80fe commit 63047b9

File tree

15 files changed

+1167
-157
lines changed

15 files changed

+1167
-157
lines changed

src/iceberg/catalog/rest/CMakeLists.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
set(ICEBERG_REST_SOURCES rest_catalog.cc json_internal.cc)
18+
set(ICEBERG_REST_SOURCES
19+
catalog.cc
20+
json_internal.cc
21+
config.cc
22+
http_client_internal.cc
23+
resource_paths.cc)
1924

2025
set(ICEBERG_REST_STATIC_BUILD_INTERFACE_LIBS)
2126
set(ICEBERG_REST_SHARED_BUILD_INTERFACE_LIBS)
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/catalog/rest/catalog.h"
21+
22+
#include <memory>
23+
#include <utility>
24+
25+
#include <cpr/cpr.h>
26+
27+
#include "iceberg/catalog/rest/config.h"
28+
#include "iceberg/catalog/rest/constant.h"
29+
#include "iceberg/catalog/rest/endpoint_util.h"
30+
#include "iceberg/catalog/rest/http_client_interal.h"
31+
#include "iceberg/catalog/rest/json_internal.h"
32+
#include "iceberg/catalog/rest/types.h"
33+
#include "iceberg/json_internal.h"
34+
#include "iceberg/result.h"
35+
#include "iceberg/table.h"
36+
#include "iceberg/util/macros.h"
37+
38+
namespace iceberg::rest {
39+
40+
Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(const RestCatalogConfig& config) {
41+
// Create ResourcePaths and validate URI
42+
ICEBERG_ASSIGN_OR_RAISE(auto paths, ResourcePaths::Make(config));
43+
44+
ICEBERG_ASSIGN_OR_RAISE(auto tmp_client, HttpClient::Make(config));
45+
46+
const std::string endpoint = paths->V1Config();
47+
cpr::Parameters params;
48+
ICEBERG_ASSIGN_OR_RAISE(const auto& response, tmp_client->Get(endpoint, params));
49+
switch (response.status_code) {
50+
case cpr::status::HTTP_OK: {
51+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
52+
ICEBERG_ASSIGN_OR_RAISE(auto server_config, CatalogConfigFromJson(json));
53+
// Merge server config into client config, server config overrides > client config
54+
// properties > server config defaults
55+
auto final_props = std::move(server_config.defaults);
56+
for (const auto& kv : config.configs()) {
57+
final_props.insert_or_assign(kv.first, kv.second);
58+
}
59+
60+
for (const auto& kv : server_config.overrides) {
61+
final_props.insert_or_assign(kv.first, kv.second);
62+
}
63+
auto final_config = RestCatalogConfig::FromMap(final_props);
64+
ICEBERG_ASSIGN_OR_RAISE(auto client, HttpClient::Make(*final_config));
65+
ICEBERG_ASSIGN_OR_RAISE(auto final_paths, ResourcePaths::Make(*final_config));
66+
return std::unique_ptr<RestCatalog>(new RestCatalog(
67+
std::move(final_config), std::move(client), std::move(*final_paths)));
68+
};
69+
default: {
70+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
71+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ErrorResponseFromJson(json));
72+
return UnknownError("Error listing namespaces: {}", list_response.error.message);
73+
}
74+
}
75+
}
76+
77+
RestCatalog::RestCatalog(std::unique_ptr<RestCatalogConfig> config,
78+
std::unique_ptr<HttpClient> client, ResourcePaths paths)
79+
: config_(std::move(config)), client_(std::move(client)), paths_(std::move(paths)) {}
80+
81+
std::string_view RestCatalog::name() const {
82+
auto it = config_->configs().find(std::string(RestCatalogConfig::kName));
83+
if (it == config_->configs().end() || it->second.empty()) {
84+
return {""};
85+
}
86+
return std::string_view(it->second);
87+
}
88+
89+
Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
90+
const std::string endpoint = paths_.V1Namespaces();
91+
std::vector<Namespace> result;
92+
std::string next_token;
93+
while (true) {
94+
cpr::Parameters params;
95+
if (!ns.levels.empty()) {
96+
params.Add({std::string(kQueryParamParent), EncodeNamespaceForUrl(ns)});
97+
}
98+
if (!next_token.empty()) {
99+
params.Add({std::string(kQueryParamPageToken), next_token});
100+
}
101+
ICEBERG_ASSIGN_OR_RAISE(const auto& response, client_->Get(endpoint, params));
102+
switch (response.status_code) {
103+
case cpr::status::HTTP_OK: {
104+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
105+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListNamespacesResponseFromJson(json));
106+
result.insert(result.end(), list_response.namespaces.begin(),
107+
list_response.namespaces.end());
108+
if (list_response.next_page_token.empty()) {
109+
return result;
110+
}
111+
next_token = list_response.next_page_token;
112+
continue;
113+
}
114+
case cpr::status::HTTP_NOT_FOUND: {
115+
return NoSuchNamespace("Namespace not found");
116+
}
117+
default:
118+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.text));
119+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ErrorResponseFromJson(json));
120+
return UnknownError("Error listing namespaces: {}", list_response.error.message);
121+
}
122+
}
123+
}
124+
125+
Status RestCatalog::CreateNamespace(
126+
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
127+
return NotImplemented("Not implemented");
128+
}
129+
130+
Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespaceProperties(
131+
const Namespace& ns) const {
132+
return NotImplemented("Not implemented");
133+
}
134+
135+
Status RestCatalog::DropNamespace(const Namespace& ns) {
136+
return NotImplemented("Not implemented");
137+
}
138+
139+
Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
140+
return NotImplemented("Not implemented");
141+
}
142+
143+
Status RestCatalog::UpdateNamespaceProperties(
144+
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
145+
const std::unordered_set<std::string>& removals) {
146+
return NotImplemented("Not implemented");
147+
}
148+
149+
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
150+
return NotImplemented("Not implemented");
151+
}
152+
153+
Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
154+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
155+
const std::string& location,
156+
const std::unordered_map<std::string, std::string>& properties) {
157+
return NotImplemented("Not implemented");
158+
}
159+
160+
Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
161+
const TableIdentifier& identifier,
162+
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
163+
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
164+
return NotImplemented("Not implemented");
165+
}
166+
167+
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
168+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
169+
const std::string& location,
170+
const std::unordered_map<std::string, std::string>& properties) {
171+
return NotImplemented("Not implemented");
172+
}
173+
174+
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
175+
return NotImplemented("Not implemented");
176+
}
177+
178+
Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
179+
return NotImplemented("Not implemented");
180+
}
181+
182+
Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
183+
return NotImplemented("Not implemented");
184+
}
185+
186+
Result<std::unique_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
187+
return NotImplemented("Not implemented");
188+
}
189+
190+
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
191+
const TableIdentifier& identifier, const std::string& metadata_file_location) {
192+
return NotImplemented("Not implemented");
193+
}
194+
195+
std::unique_ptr<RestCatalog::TableBuilder> RestCatalog::BuildTable(
196+
const TableIdentifier& identifier, const Schema& schema) const {
197+
return nullptr;
198+
}
199+
200+
} // namespace iceberg::rest

src/iceberg/catalog/rest/catalog.h

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include "iceberg/catalog.h"
26+
#include "iceberg/catalog/rest/config.h"
27+
#include "iceberg/catalog/rest/http_client_interal.h"
28+
#include "iceberg/catalog/rest/iceberg_rest_export.h"
29+
#include "iceberg/catalog/rest/resource_paths.h"
30+
#include "iceberg/result.h"
31+
32+
/// \file iceberg/catalog/rest/catalog.h
33+
/// RestCatalog implementation for Iceberg REST API.
34+
35+
namespace iceberg::rest {
36+
37+
class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
38+
public:
39+
RestCatalog(const RestCatalog&) = delete;
40+
RestCatalog& operator=(const RestCatalog&) = delete;
41+
RestCatalog(RestCatalog&&) = delete;
42+
RestCatalog& operator=(RestCatalog&&) = delete;
43+
44+
/// \brief Create a RestCatalog instance
45+
///
46+
/// \param config the configuration for the RestCatalog
47+
/// \return a unique_ptr to RestCatalog instance
48+
static Result<std::unique_ptr<RestCatalog>> Make(const RestCatalogConfig& config);
49+
50+
std::string_view name() const override;
51+
52+
Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const override;
53+
54+
Status CreateNamespace(
55+
const Namespace& ns,
56+
const std::unordered_map<std::string, std::string>& properties) override;
57+
58+
Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
59+
const Namespace& ns) const override;
60+
61+
Status DropNamespace(const Namespace& ns) override;
62+
63+
Result<bool> NamespaceExists(const Namespace& ns) const override;
64+
65+
Status UpdateNamespaceProperties(
66+
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
67+
const std::unordered_set<std::string>& removals) override;
68+
69+
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
70+
71+
Result<std::unique_ptr<Table>> CreateTable(
72+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
73+
const std::string& location,
74+
const std::unordered_map<std::string, std::string>& properties) override;
75+
76+
Result<std::unique_ptr<Table>> UpdateTable(
77+
const TableIdentifier& identifier,
78+
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
79+
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
80+
81+
Result<std::shared_ptr<Transaction>> StageCreateTable(
82+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
83+
const std::string& location,
84+
const std::unordered_map<std::string, std::string>& properties) override;
85+
86+
Result<bool> TableExists(const TableIdentifier& identifier) const override;
87+
88+
Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override;
89+
90+
Status DropTable(const TableIdentifier& identifier, bool purge) override;
91+
92+
Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
93+
94+
Result<std::shared_ptr<Table>> RegisterTable(
95+
const TableIdentifier& identifier,
96+
const std::string& metadata_file_location) override;
97+
98+
std::unique_ptr<RestCatalog::TableBuilder> BuildTable(
99+
const TableIdentifier& identifier, const Schema& schema) const override;
100+
101+
private:
102+
RestCatalog(std::unique_ptr<RestCatalogConfig> config,
103+
std::unique_ptr<HttpClient> client, ResourcePaths paths);
104+
105+
std::unique_ptr<RestCatalogConfig> config_;
106+
std::unique_ptr<HttpClient> client_;
107+
ResourcePaths paths_;
108+
};
109+
110+
} // namespace iceberg::rest

src/iceberg/catalog/rest/config.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/catalog/rest/config.h"
21+
22+
#include "iceberg/catalog/rest/constant.h"
23+
24+
namespace iceberg::rest {
25+
26+
std::unique_ptr<RestCatalogConfig> RestCatalogConfig::default_properties() {
27+
return std::make_unique<RestCatalogConfig>();
28+
}
29+
30+
std::unique_ptr<RestCatalogConfig> RestCatalogConfig::FromMap(
31+
const std::unordered_map<std::string, std::string>& properties) {
32+
auto rest_catalog_config = std::make_unique<RestCatalogConfig>();
33+
rest_catalog_config->configs_ = properties;
34+
return rest_catalog_config;
35+
}
36+
37+
Result<cpr::Header> RestCatalogConfig::GetExtraHeaders() const {
38+
cpr::Header headers;
39+
40+
headers[std::string(kHeaderContentType)] = std::string(kMimeTypeApplicationJson);
41+
headers[std::string(kHeaderUserAgent)] = std::string(kUserAgent);
42+
headers[std::string(kHeaderXClientVersion)] = std::string(kClientVersion);
43+
44+
constexpr std::string_view prefix = "header.";
45+
for (const auto& [key, value] : configs_) {
46+
if (key.starts_with(prefix)) {
47+
std::string_view header_name = std::string_view(key).substr(prefix.length());
48+
49+
if (header_name.empty()) {
50+
return InvalidArgument("Header name cannot be empty after '{}' prefix", prefix);
51+
}
52+
53+
if (value.empty()) {
54+
return InvalidArgument("Header value for '{}' cannot be empty", header_name);
55+
}
56+
headers[std::string(header_name)] = value;
57+
}
58+
}
59+
return headers;
60+
}
61+
62+
} // namespace iceberg::rest

0 commit comments

Comments
 (0)