Skip to content

Commit 86e8e24

Browse files
authored
feat: introduce database level ops for cpp bindings (#286)
1 parent 57b2e4c commit 86e8e24

File tree

8 files changed

+592
-0
lines changed

8 files changed

+592
-0
lines changed

bindings/cpp/BUILD.bazel

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,3 +340,36 @@ cc_binary(
340340
visibility = ["//visibility:public"],
341341
)
342342

343+
cc_binary(
344+
name = "fluss_cpp_admin_example",
345+
srcs = [
346+
"examples/admin_example.cpp",
347+
],
348+
deps = [":fluss_cpp"],
349+
copts = [
350+
"-std=c++17",
351+
] + select({
352+
":debug_mode": [
353+
"-g3",
354+
"-O0",
355+
"-ggdb",
356+
"-fno-omit-frame-pointer",
357+
"-DDEBUG",
358+
],
359+
":fastbuild_mode": [
360+
"-g",
361+
"-O0",
362+
],
363+
":release_mode": [
364+
"-O2",
365+
"-DNDEBUG",
366+
],
367+
}),
368+
linkopts = select({
369+
":debug_mode": ["-g"],
370+
":fastbuild_mode": ["-g"],
371+
":release_mode": [],
372+
}),
373+
visibility = ["//visibility:public"],
374+
)
375+

bindings/cpp/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ target_link_libraries(fluss_cpp_example PRIVATE Arrow::arrow_shared)
102102
target_compile_definitions(fluss_cpp_example PRIVATE ARROW_FOUND)
103103
target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})
104104

105+
add_executable(fluss_cpp_admin_example examples/admin_example.cpp)
106+
target_link_libraries(fluss_cpp_admin_example PRIVATE fluss_cpp)
107+
target_link_libraries(fluss_cpp_admin_example PRIVATE Arrow::arrow_shared)
108+
target_compile_definitions(fluss_cpp_admin_example PRIVATE ARROW_FOUND)
109+
target_include_directories(fluss_cpp_admin_example PUBLIC ${CPP_INCLUDE_DIR})
110+
105111
set_target_properties(fluss_cpp
106112
PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
107113
)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <iostream>
19+
#include <string>
20+
#include <unordered_map>
21+
#include <vector>
22+
23+
#include "fluss.hpp"
24+
25+
static void check(const char* step, const fluss::Result& r) {
26+
if (!r.Ok()) {
27+
std::cerr << step << " failed: code=" << r.error_code << " msg=" << r.error_message
28+
<< std::endl;
29+
std::exit(1);
30+
}
31+
}
32+
33+
int main() {
34+
const std::string bootstrap = "127.0.0.1:9123";
35+
const std::string db_name = "admin_example_db";
36+
const std::string table_name = "admin_example_table";
37+
38+
// 1) Connect and get Admin
39+
fluss::Connection conn;
40+
check("connect", fluss::Connection::Connect(bootstrap, conn));
41+
42+
fluss::Admin admin;
43+
check("get_admin", conn.GetAdmin(admin));
44+
45+
// 2) Database operations
46+
std::cout << "--- Database operations ---" << std::endl;
47+
48+
bool exists = false;
49+
check("database_exists (before create)", admin.DatabaseExists(db_name, exists));
50+
std::cout << "Database " << db_name << " exists before create: " << (exists ? "yes" : "no")
51+
<< std::endl;
52+
53+
fluss::DatabaseDescriptor db_desc;
54+
db_desc.comment = "Example database for Admin API";
55+
db_desc.properties["owner"] = "admin_example";
56+
check("create_database", admin.CreateDatabase(db_name, db_desc, true));
57+
58+
check("database_exists (after create)", admin.DatabaseExists(db_name, exists));
59+
std::cout << "Database " << db_name << " exists after create: " << (exists ? "yes" : "no")
60+
<< std::endl;
61+
62+
fluss::DatabaseInfo db_info;
63+
check("get_database_info", admin.GetDatabaseInfo(db_name, db_info));
64+
std::cout << "Database info: name=" << db_info.database_name
65+
<< " comment=" << db_info.comment << " created_time=" << db_info.created_time
66+
<< std::endl;
67+
68+
std::vector<std::string> databases;
69+
check("list_databases", admin.ListDatabases(databases));
70+
std::cout << "List databases (" << databases.size() << "): ";
71+
for (size_t i = 0; i < databases.size(); ++i) {
72+
if (i > 0) std::cout << ", ";
73+
std::cout << databases[i];
74+
}
75+
std::cout << std::endl;
76+
77+
// 3) Table operations in the new database
78+
std::cout << "--- Table operations ---" << std::endl;
79+
80+
fluss::TablePath table_path(db_name, table_name);
81+
82+
bool table_exists_flag = false;
83+
check("table_exists (before create)", admin.TableExists(table_path, table_exists_flag));
84+
std::cout << "Table " << db_name << "." << table_name
85+
<< " exists before create: " << (table_exists_flag ? "yes" : "no") << std::endl;
86+
87+
auto schema = fluss::Schema::NewBuilder()
88+
.AddColumn("id", fluss::DataType::Int())
89+
.AddColumn("name", fluss::DataType::String())
90+
.Build();
91+
auto descriptor = fluss::TableDescriptor::NewBuilder()
92+
.SetSchema(schema)
93+
.SetBucketCount(1)
94+
.SetComment("admin example table")
95+
.Build();
96+
97+
check("create_table", admin.CreateTable(table_path, descriptor, true));
98+
99+
check("table_exists (after create)", admin.TableExists(table_path, table_exists_flag));
100+
std::cout << "Table exists after create: " << (table_exists_flag ? "yes" : "no") << std::endl;
101+
102+
std::vector<std::string> tables;
103+
check("list_tables", admin.ListTables(db_name, tables));
104+
std::cout << "List tables in " << db_name << " (" << tables.size() << "): ";
105+
for (size_t i = 0; i < tables.size(); ++i) {
106+
if (i > 0) std::cout << ", ";
107+
std::cout << tables[i];
108+
}
109+
std::cout << std::endl;
110+
111+
// 4) Cleanup: drop table, then drop database
112+
std::cout << "--- Cleanup ---" << std::endl;
113+
check("drop_table", admin.DropTable(table_path, true));
114+
check("drop_database", admin.DropDatabase(db_name, true, true));
115+
116+
check("database_exists (after drop)", admin.DatabaseExists(db_name, exists));
117+
std::cout << "Database exists after drop: " << (exists ? "yes" : "no") << std::endl;
118+
119+
std::cout << "Admin example completed successfully." << std::endl;
120+
return 0;
121+
}

bindings/cpp/include/fluss.hpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,21 @@ struct PartitionInfo {
709709
std::string partition_name;
710710
};
711711

712+
/// Descriptor for create_database (optional). Leave comment and properties empty for default.
713+
struct DatabaseDescriptor {
714+
std::string comment;
715+
std::unordered_map<std::string, std::string> properties;
716+
};
717+
718+
/// Metadata returned by GetDatabaseInfo.
719+
struct DatabaseInfo {
720+
std::string database_name;
721+
std::string comment;
722+
std::unordered_map<std::string, std::string> properties;
723+
int64_t created_time{0};
724+
int64_t modified_time{0};
725+
};
726+
712727
class AppendWriter;
713728
class WriteResult;
714729
class LogScanner;
@@ -773,6 +788,27 @@ class Admin {
773788
const std::unordered_map<std::string, std::string>& partition_spec,
774789
bool ignore_if_exists = false);
775790

791+
Result DropPartition(const TablePath& table_path,
792+
const std::unordered_map<std::string, std::string>& partition_spec,
793+
bool ignore_if_not_exists = false);
794+
795+
Result CreateDatabase(const std::string& database_name,
796+
const DatabaseDescriptor& descriptor,
797+
bool ignore_if_exists = false);
798+
799+
Result DropDatabase(const std::string& database_name, bool ignore_if_not_exists = false,
800+
bool cascade = true);
801+
802+
Result ListDatabases(std::vector<std::string>& out);
803+
804+
Result DatabaseExists(const std::string& database_name, bool& out);
805+
806+
Result GetDatabaseInfo(const std::string& database_name, DatabaseInfo& out);
807+
808+
Result ListTables(const std::string& database_name, std::vector<std::string>& out);
809+
810+
Result TableExists(const TablePath& table_path, bool& out);
811+
776812
private:
777813
Result DoListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
778814
const OffsetQuery& offset_query, std::unordered_map<int32_t, int64_t>& out,

bindings/cpp/src/admin.cpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,4 +204,124 @@ Result Admin::CreatePartition(const TablePath& table_path,
204204
return utils::from_ffi_result(ffi_result);
205205
}
206206

207+
Result Admin::DropPartition(const TablePath& table_path,
208+
const std::unordered_map<std::string, std::string>& partition_spec,
209+
bool ignore_if_not_exists) {
210+
if (!Available()) {
211+
return utils::make_error(1, "Admin not available");
212+
}
213+
214+
auto ffi_path = utils::to_ffi_table_path(table_path);
215+
216+
rust::Vec<ffi::FfiPartitionKeyValue> rust_spec;
217+
for (const auto& [key, value] : partition_spec) {
218+
ffi::FfiPartitionKeyValue kv;
219+
kv.key = rust::String(key);
220+
kv.value = rust::String(value);
221+
rust_spec.push_back(std::move(kv));
222+
}
223+
224+
auto ffi_result =
225+
admin_->drop_partition(ffi_path, std::move(rust_spec), ignore_if_not_exists);
226+
return utils::from_ffi_result(ffi_result);
227+
}
228+
229+
Result Admin::CreateDatabase(const std::string& database_name,
230+
const DatabaseDescriptor& descriptor,
231+
bool ignore_if_exists) {
232+
if (!Available()) {
233+
return utils::make_error(1, "Admin not available");
234+
}
235+
236+
auto ffi_desc = utils::to_ffi_database_descriptor(descriptor);
237+
auto ffi_result =
238+
admin_->create_database(rust::Str(database_name), ffi_desc, ignore_if_exists);
239+
return utils::from_ffi_result(ffi_result);
240+
}
241+
242+
Result Admin::DropDatabase(const std::string& database_name, bool ignore_if_not_exists,
243+
bool cascade) {
244+
if (!Available()) {
245+
return utils::make_error(1, "Admin not available");
246+
}
247+
248+
auto ffi_result =
249+
admin_->drop_database(rust::Str(database_name), ignore_if_not_exists, cascade);
250+
return utils::from_ffi_result(ffi_result);
251+
}
252+
253+
Result Admin::ListDatabases(std::vector<std::string>& out) {
254+
if (!Available()) {
255+
return utils::make_error(1, "Admin not available");
256+
}
257+
258+
auto ffi_result = admin_->list_databases();
259+
auto result = utils::from_ffi_result(ffi_result.result);
260+
if (result.Ok()) {
261+
out.clear();
262+
out.reserve(ffi_result.database_names.size());
263+
for (const auto& name : ffi_result.database_names) {
264+
out.push_back(std::string(name));
265+
}
266+
}
267+
return result;
268+
}
269+
270+
Result Admin::DatabaseExists(const std::string& database_name, bool& out) {
271+
if (!Available()) {
272+
return utils::make_error(1, "Admin not available");
273+
}
274+
275+
auto ffi_result = admin_->database_exists(rust::Str(database_name));
276+
auto result = utils::from_ffi_result(ffi_result.result);
277+
if (result.Ok()) {
278+
out = ffi_result.value;
279+
}
280+
return result;
281+
}
282+
283+
Result Admin::GetDatabaseInfo(const std::string& database_name, DatabaseInfo& out) {
284+
if (!Available()) {
285+
return utils::make_error(1, "Admin not available");
286+
}
287+
288+
auto ffi_result = admin_->get_database_info(rust::Str(database_name));
289+
auto result = utils::from_ffi_result(ffi_result.result);
290+
if (result.Ok()) {
291+
out = utils::from_ffi_database_info(ffi_result.database_info);
292+
}
293+
return result;
294+
}
295+
296+
Result Admin::ListTables(const std::string& database_name, std::vector<std::string>& out) {
297+
if (!Available()) {
298+
return utils::make_error(1, "Admin not available");
299+
}
300+
301+
auto ffi_result = admin_->list_tables(rust::Str(database_name));
302+
auto result = utils::from_ffi_result(ffi_result.result);
303+
if (result.Ok()) {
304+
out.clear();
305+
out.reserve(ffi_result.table_names.size());
306+
for (const auto& name : ffi_result.table_names) {
307+
out.push_back(std::string(name));
308+
}
309+
}
310+
return result;
311+
}
312+
313+
Result Admin::TableExists(const TablePath& table_path, bool& out) {
314+
if (!Available()) {
315+
return utils::make_error(1, "Admin not available");
316+
}
317+
318+
auto ffi_path = utils::to_ffi_table_path(table_path);
319+
auto ffi_result = admin_->table_exists(ffi_path);
320+
auto result = utils::from_ffi_result(ffi_result.result);
321+
if (result.Ok()) {
322+
out = ffi_result.value;
323+
}
324+
return result;
325+
}
326+
207327
} // namespace fluss

bindings/cpp/src/ffi_converter.hpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,5 +290,30 @@ inline LakeSnapshot from_ffi_lake_snapshot(const ffi::FfiLakeSnapshot& ffi_snaps
290290
return snapshot;
291291
}
292292

293+
inline ffi::FfiDatabaseDescriptor to_ffi_database_descriptor(
294+
const DatabaseDescriptor& desc) {
295+
ffi::FfiDatabaseDescriptor ffi_desc;
296+
ffi_desc.comment = rust::String(desc.comment);
297+
for (const auto& [k, v] : desc.properties) {
298+
ffi::HashMapValue kv;
299+
kv.key = rust::String(k);
300+
kv.value = rust::String(v);
301+
ffi_desc.properties.push_back(std::move(kv));
302+
}
303+
return ffi_desc;
304+
}
305+
306+
inline DatabaseInfo from_ffi_database_info(const ffi::FfiDatabaseInfo& ffi_info) {
307+
DatabaseInfo info;
308+
info.database_name = std::string(ffi_info.database_name);
309+
info.comment = std::string(ffi_info.comment);
310+
info.created_time = ffi_info.created_time;
311+
info.modified_time = ffi_info.modified_time;
312+
for (const auto& prop : ffi_info.properties) {
313+
info.properties[std::string(prop.key)] = std::string(prop.value);
314+
}
315+
return info;
316+
}
317+
293318
} // namespace utils
294319
} // namespace fluss

0 commit comments

Comments
 (0)