Skip to content

Commit 599949f

Browse files
committed
[TASK-250] KV Tables in CPP
1 parent 86e8e24 commit 599949f

File tree

7 files changed

+1234
-106
lines changed

7 files changed

+1234
-106
lines changed

bindings/cpp/BUILD.bazel

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,3 +373,36 @@ cc_binary(
373373
visibility = ["//visibility:public"],
374374
)
375375

376+
cc_binary(
377+
name = "fluss_cpp_kv_example",
378+
srcs = [
379+
"examples/kv_example.cpp",
380+
],
381+
deps = [":fluss_cpp"],
382+
copts = [
383+
"-std=c++17",
384+
] + select({
385+
":debug_mode": [
386+
"-g3",
387+
"-O0",
388+
"-ggdb",
389+
"-fno-omit-frame-pointer",
390+
"-DDEBUG",
391+
],
392+
":fastbuild_mode": [
393+
"-g",
394+
"-O0",
395+
],
396+
":release_mode": [
397+
"-O2",
398+
"-DNDEBUG",
399+
],
400+
}),
401+
linkopts = select({
402+
":debug_mode": ["-g"],
403+
":fastbuild_mode": ["-g"],
404+
":release_mode": [],
405+
}),
406+
visibility = ["//visibility:public"],
407+
)
408+

bindings/cpp/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ target_link_libraries(fluss_cpp_admin_example PRIVATE Arrow::arrow_shared)
108108
target_compile_definitions(fluss_cpp_admin_example PRIVATE ARROW_FOUND)
109109
target_include_directories(fluss_cpp_admin_example PUBLIC ${CPP_INCLUDE_DIR})
110110

111+
add_executable(fluss_cpp_kv_example examples/kv_example.cpp)
112+
target_link_libraries(fluss_cpp_kv_example PRIVATE fluss_cpp)
113+
target_link_libraries(fluss_cpp_kv_example PRIVATE Arrow::arrow_shared)
114+
target_compile_definitions(fluss_cpp_kv_example PRIVATE ARROW_FOUND)
115+
target_include_directories(fluss_cpp_kv_example PUBLIC ${CPP_INCLUDE_DIR})
116+
111117
set_target_properties(fluss_cpp
112118
PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
113119
)
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <iostream>
19+
#include <string>
20+
#include <vector>
21+
22+
#include "fluss.hpp"
23+
24+
static void check(const char* step, const fluss::Result& r) {
25+
if (!r.Ok()) {
26+
std::cerr << step << " failed: code=" << r.error_code << " msg=" << r.error_message
27+
<< std::endl;
28+
std::exit(1);
29+
}
30+
}
31+
32+
int main() {
33+
const std::string bootstrap = "127.0.0.1:9123";
34+
35+
// 1) Connect and get Admin
36+
fluss::Connection conn;
37+
check("connect", fluss::Connection::Connect(bootstrap, conn));
38+
39+
fluss::Admin admin;
40+
check("get_admin", conn.GetAdmin(admin));
41+
42+
fluss::TablePath kv_table_path("fluss", "kv_table_cpp_v1");
43+
44+
// Drop if exists
45+
admin.DropTable(kv_table_path, true);
46+
47+
// 2) Create a KV table with primary key, including decimal and temporal types
48+
auto kv_schema = fluss::Schema::NewBuilder()
49+
.AddColumn("user_id", fluss::DataType::Int())
50+
.AddColumn("name", fluss::DataType::String())
51+
.AddColumn("email", fluss::DataType::String())
52+
.AddColumn("score", fluss::DataType::Float())
53+
.AddColumn("balance", fluss::DataType::Decimal(10, 2))
54+
.AddColumn("birth_date", fluss::DataType::Date())
55+
.AddColumn("login_time", fluss::DataType::Time())
56+
.AddColumn("created_at", fluss::DataType::Timestamp())
57+
.AddColumn("last_seen", fluss::DataType::TimestampLtz())
58+
.SetPrimaryKeys({"user_id"})
59+
.Build();
60+
61+
auto kv_descriptor = fluss::TableDescriptor::NewBuilder()
62+
.SetSchema(kv_schema)
63+
.SetBucketCount(3)
64+
.SetComment("cpp kv table example")
65+
.Build();
66+
67+
check("create_kv_table", admin.CreateTable(kv_table_path, kv_descriptor, false));
68+
std::cout << "Created KV table with primary key" << std::endl;
69+
70+
fluss::Table kv_table;
71+
check("get_kv_table", conn.GetTable(kv_table_path, kv_table));
72+
73+
// 3) Upsert rows using name-based Set()
74+
// - Set("balance", "1234.56") auto-routes to SetDecimal (schema-aware)
75+
// - Set("created_at", ts) auto-routes to SetTimestampNtz (schema-aware)
76+
// - Set("last_seen", ts) auto-routes to SetTimestampLtz (schema-aware)
77+
std::cout << "\n--- Upsert Rows ---" << std::endl;
78+
fluss::UpsertWriter upsert_writer;
79+
check("new_upsert_writer", kv_table.NewUpsertWriter(upsert_writer));
80+
81+
// Fire-and-forget upserts
82+
{
83+
auto row = kv_table.NewRow();
84+
row.Set("user_id", 1);
85+
row.Set("name", "Alice");
86+
row.Set("email", "alice@example.com");
87+
row.Set("score", 95.5f);
88+
row.Set("balance", "1234.56");
89+
row.Set("birth_date", fluss::Date::FromYMD(1990, 3, 15));
90+
row.Set("login_time", fluss::Time::FromHMS(9, 30, 0));
91+
row.Set("created_at", fluss::Timestamp::FromMillis(1700000000000));
92+
row.Set("last_seen", fluss::Timestamp::FromMillis(1700000060000));
93+
check("upsert_1", upsert_writer.Upsert(row));
94+
}
95+
{
96+
auto row = kv_table.NewRow();
97+
row.Set("user_id", 2);
98+
row.Set("name", "Bob");
99+
row.Set("email", "bob@example.com");
100+
row.Set("score", 87.3f);
101+
row.Set("balance", "567.89");
102+
row.Set("birth_date", fluss::Date::FromYMD(1985, 7, 22));
103+
row.Set("login_time", fluss::Time::FromHMS(14, 15, 30));
104+
row.Set("created_at", fluss::Timestamp::FromMillis(1700000100000));
105+
row.Set("last_seen", fluss::Timestamp::FromMillis(1700000200000));
106+
check("upsert_2", upsert_writer.Upsert(row));
107+
}
108+
109+
// Per-record acknowledgment
110+
{
111+
auto row = kv_table.NewRow();
112+
row.Set("user_id", 3);
113+
row.Set("name", "Charlie");
114+
row.Set("email", "charlie@example.com");
115+
row.Set("score", 92.0f);
116+
row.Set("balance", "99999.99");
117+
row.Set("birth_date", fluss::Date::FromYMD(2000, 1, 1));
118+
row.Set("login_time", fluss::Time::FromHMS(23, 59, 59));
119+
row.Set("created_at", fluss::Timestamp::FromMillis(1700000300000));
120+
row.Set("last_seen", fluss::Timestamp::FromMillis(1700000400000));
121+
fluss::WriteResult wr;
122+
check("upsert_3", upsert_writer.Upsert(row, wr));
123+
check("upsert_3_wait", wr.Wait());
124+
std::cout << "Upsert acknowledged by server" << std::endl;
125+
}
126+
127+
check("upsert_flush", upsert_writer.Flush());
128+
std::cout << "Upserted 3 rows" << std::endl;
129+
130+
// 4) Lookup by primary key — verify all types round-trip
131+
std::cout << "\n--- Lookup by Primary Key ---" << std::endl;
132+
fluss::Lookuper lookuper;
133+
check("new_lookuper", kv_table.NewLookuper(lookuper));
134+
135+
// Lookup existing key
136+
{
137+
auto pk_row = kv_table.NewRow();
138+
pk_row.Set("user_id", 1);
139+
140+
bool found = false;
141+
fluss::GenericRow result_row;
142+
check("lookup_1", lookuper.Lookup(pk_row, found, result_row));
143+
if (found) {
144+
auto date = result_row.GetDate(5);
145+
auto time = result_row.GetTime(6);
146+
auto created = result_row.GetTimestamp(7);
147+
auto seen = result_row.GetTimestamp(8);
148+
std::cout << "Found user_id=1:"
149+
<< "\n name=" << result_row.GetString(1)
150+
<< "\n email=" << result_row.GetString(2)
151+
<< "\n score=" << result_row.GetFloat32(3)
152+
<< "\n balance=" << result_row.DecimalToString(4)
153+
<< "\n birth_date=" << date.Year() << "-" << date.Month() << "-"
154+
<< date.Day() << "\n login_time=" << time.Hour() << ":" << time.Minute()
155+
<< ":" << time.Second()
156+
<< "\n created_at(ms)=" << created.epoch_millis
157+
<< "\n last_seen(ms)=" << seen.epoch_millis << std::endl;
158+
} else {
159+
std::cerr << "ERROR: Expected to find user_id=1" << std::endl;
160+
std::exit(1);
161+
}
162+
}
163+
164+
// Lookup non-existing key
165+
{
166+
auto pk_row = kv_table.NewRow();
167+
pk_row.Set("user_id", 999);
168+
169+
bool found = false;
170+
fluss::GenericRow result_row;
171+
check("lookup_999", lookuper.Lookup(pk_row, found, result_row));
172+
if (!found) {
173+
std::cout << "user_id=999 not found (expected)" << std::endl;
174+
} else {
175+
std::cerr << "ERROR: Expected user_id=999 to not be found" << std::endl;
176+
std::exit(1);
177+
}
178+
}
179+
180+
// 5) Update via upsert (overwrite existing key)
181+
std::cout << "\n--- Update via Upsert ---" << std::endl;
182+
{
183+
auto row = kv_table.NewRow();
184+
row.Set("user_id", 1);
185+
row.Set("name", "Alice Updated");
186+
row.Set("email", "alice.new@example.com");
187+
row.Set("score", 99.0f);
188+
row.Set("balance", "9999.00");
189+
row.Set("birth_date", fluss::Date::FromYMD(1990, 3, 15));
190+
row.Set("login_time", fluss::Time::FromHMS(10, 0, 0));
191+
row.Set("created_at", fluss::Timestamp::FromMillis(1700000000000));
192+
row.Set("last_seen", fluss::Timestamp::FromMillis(1700000500000));
193+
fluss::WriteResult wr;
194+
check("upsert_update", upsert_writer.Upsert(row, wr));
195+
check("upsert_update_wait", wr.Wait());
196+
}
197+
198+
// Verify update
199+
{
200+
auto pk_row = kv_table.NewRow();
201+
pk_row.Set("user_id", 1);
202+
203+
bool found = false;
204+
fluss::GenericRow result_row;
205+
check("lookup_updated", lookuper.Lookup(pk_row, found, result_row));
206+
if (found && result_row.GetString(1) == "Alice Updated") {
207+
std::cout << "Update verified: name=" << result_row.GetString(1)
208+
<< " balance=" << result_row.DecimalToString(4)
209+
<< " last_seen(ms)=" << result_row.GetTimestamp(8).epoch_millis << std::endl;
210+
} else {
211+
std::cerr << "ERROR: Update verification failed" << std::endl;
212+
std::exit(1);
213+
}
214+
}
215+
216+
// 6) Delete by primary key
217+
std::cout << "\n--- Delete by Primary Key ---" << std::endl;
218+
{
219+
auto pk_row = kv_table.NewRow();
220+
pk_row.Set("user_id", 2);
221+
fluss::WriteResult wr;
222+
check("delete_2", upsert_writer.Delete(pk_row, wr));
223+
check("delete_2_wait", wr.Wait());
224+
std::cout << "Deleted user_id=2" << std::endl;
225+
}
226+
227+
// Verify deletion
228+
{
229+
auto pk_row = kv_table.NewRow();
230+
pk_row.Set("user_id", 2);
231+
232+
bool found = false;
233+
fluss::GenericRow result_row;
234+
check("lookup_deleted", lookuper.Lookup(pk_row, found, result_row));
235+
if (!found) {
236+
std::cout << "Delete verified: user_id=2 not found" << std::endl;
237+
} else {
238+
std::cerr << "ERROR: Expected user_id=2 to be deleted" << std::endl;
239+
std::exit(1);
240+
}
241+
}
242+
243+
// 7) Partial update by column names
244+
std::cout << "\n--- Partial Update by Column Names ---" << std::endl;
245+
fluss::UpsertWriter partial_writer;
246+
check("new_partial_upsert_writer",
247+
kv_table.NewUpsertWriter(partial_writer,
248+
std::vector<std::string>{"user_id", "balance", "last_seen"}));
249+
250+
{
251+
auto row = kv_table.NewRow();
252+
row.Set("user_id", 3);
253+
row.Set("balance", "50000.00");
254+
row.Set("last_seen", fluss::Timestamp::FromMillis(1700000999000));
255+
fluss::WriteResult wr;
256+
check("partial_upsert", partial_writer.Upsert(row, wr));
257+
check("partial_upsert_wait", wr.Wait());
258+
std::cout << "Partial update: set balance=50000.00, last_seen for user_id=3" << std::endl;
259+
}
260+
261+
// Verify partial update (other fields unchanged)
262+
{
263+
auto pk_row = kv_table.NewRow();
264+
pk_row.Set("user_id", 3);
265+
266+
bool found = false;
267+
fluss::GenericRow result_row;
268+
check("lookup_partial", lookuper.Lookup(pk_row, found, result_row));
269+
if (found) {
270+
std::cout << "Partial update verified:"
271+
<< "\n name=" << result_row.GetString(1) << " (unchanged)"
272+
<< "\n balance=" << result_row.DecimalToString(4) << " (updated)"
273+
<< "\n last_seen(ms)=" << result_row.GetTimestamp(8).epoch_millis
274+
<< " (updated)" << std::endl;
275+
} else {
276+
std::cerr << "ERROR: Expected to find user_id=3" << std::endl;
277+
std::exit(1);
278+
}
279+
}
280+
281+
// 8) Partial update by column indices (using index-based setters for lower overhead)
282+
std::cout << "\n--- Partial Update by Column Indices ---" << std::endl;
283+
fluss::UpsertWriter partial_writer_idx;
284+
// Columns: 0=user_id (PK), 1=name — update name only
285+
check("new_partial_upsert_writer_idx",
286+
kv_table.NewUpsertWriter(partial_writer_idx, std::vector<size_t>{0, 1}));
287+
288+
{
289+
// Index-based setters: lighter than name-based, useful for hot paths
290+
fluss::GenericRow row;
291+
row.SetInt32(0, 3); // user_id (PK)
292+
row.SetString(1, "Charlie Updated"); // name
293+
fluss::WriteResult wr;
294+
check("partial_upsert_idx", partial_writer_idx.Upsert(row, wr));
295+
check("partial_upsert_idx_wait", wr.Wait());
296+
std::cout << "Partial update by indices: set name='Charlie Updated' for user_id=3"
297+
<< std::endl;
298+
}
299+
300+
// Verify: name changed, balance/last_seen unchanged from previous partial update
301+
{
302+
auto pk_row = kv_table.NewRow();
303+
pk_row.Set("user_id", 3);
304+
305+
bool found = false;
306+
fluss::GenericRow result_row;
307+
check("lookup_partial_idx", lookuper.Lookup(pk_row, found, result_row));
308+
if (found) {
309+
std::cout << "Partial update by indices verified:"
310+
<< "\n name=" << result_row.GetString(1) << " (updated)"
311+
<< "\n balance=" << result_row.DecimalToString(4) << " (unchanged)"
312+
<< "\n last_seen(ms)=" << result_row.GetTimestamp(8).epoch_millis
313+
<< " (unchanged)" << std::endl;
314+
} else {
315+
std::cerr << "ERROR: Expected to find user_id=3" << std::endl;
316+
std::exit(1);
317+
}
318+
}
319+
320+
// Cleanup
321+
check("drop_kv_table", admin.DropTable(kv_table_path, true));
322+
std::cout << "\nKV table example completed successfully!" << std::endl;
323+
324+
return 0;
325+
}

0 commit comments

Comments
 (0)