Skip to content

Commit 147fb83

Browse files
Gazizonokigithub-actions[bot]
authored andcommitted
[C++ SDK] Added SLO key_value workload (#25082)
1 parent 0e7a197 commit 147fb83

File tree

19 files changed

+2659
-4
lines changed

19 files changed

+2659
-4
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
b39a359f04c57875ea162524646ae879f06c7a33
1+
aaa5e68e24e0ddb288032dfac9709ac53d1f1e29

examples/auth/ssa_delegation/main.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ int main(int argc, char** argv) {
3434

3535
opts.AddLongOption("microservice-id", "Microservice id")
3636
.RequiredArgument("ID")
37-
.DefaultValue("control-plane")
37+
.DefaultValue("data-plane")
3838
.StoreResult(&microserviceId);
39-
39+
4040
opts.AddLongOption("resource-id", "Resource id")
4141
.Required()
4242
.RequiredArgument("ID")

src/client/iam_private/iam.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ TCredentialsProviderFactoryPtr CreateIamServiceCredentialsProviderFactory(const
3232
CreateIamTokenForServiceRequest,
3333
CreateIamTokenResponse,
3434
IamTokenService
35-
>>(std::move(params));
35+
>>(params);
3636
}
3737

3838
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#include "key_value.h"
2+
3+
#include <ydb-cpp-sdk/client/table/table.h>
4+
5+
using namespace NYdb;
6+
using namespace NYdb::NTable;
7+
8+
9+
namespace {
10+
void CreateTable(TTableClient& client, const std::string& prefix) {
11+
RetryBackoff(client, 5, [prefix](TSession session) {
12+
auto desc = TTableBuilder()
13+
.AddNullableColumn("object_id_key", EPrimitiveType::Uint32)
14+
.AddNullableColumn("object_id", EPrimitiveType::Uint32)
15+
.AddNullableColumn("timestamp", EPrimitiveType::Uint64)
16+
.AddNullableColumn("payload", EPrimitiveType::Utf8)
17+
.SetPrimaryKeyColumns({ "object_id_key", "object_id" })
18+
.Build();
19+
20+
auto tableSettings = TCreateTableSettings()
21+
.PartitioningPolicy(TPartitioningPolicy().UniformPartitions(PartitionsCount))
22+
.CancelAfter(DefaultReactionTime)
23+
.ClientTimeout(DefaultReactionTime + TDuration::Seconds(5));
24+
25+
return session.CreateTable(
26+
JoinPath(prefix, TableName)
27+
, std::move(desc)
28+
, std::move(tableSettings)
29+
).ExtractValueSync();
30+
});
31+
}
32+
} //namespace
33+
34+
int CreateTable(TDatabaseOptions& dbOptions) {
35+
TTableClient client(dbOptions.Driver);
36+
CreateTable(client, dbOptions.Prefix);
37+
Cout << "Table created." << Endl;
38+
return EXIT_SUCCESS;
39+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#include "key_value.h"
2+
3+
using namespace NLastGetopt;
4+
using namespace NYdb;
5+
using namespace NYdb::NTable;
6+
7+
static void DropTable(TTableClient& client, const std::string& path) {
8+
ThrowOnError(client.RetryOperationSync([path](TSession session) {
9+
return session.DropTable(path).ExtractValueSync();
10+
}));
11+
}
12+
13+
int DropTable(TDatabaseOptions& dbOptions) {
14+
TTableClient client(dbOptions.Driver);
15+
DropTable(client, JoinPath(dbOptions.Prefix, TableName));
16+
Cout << "Table dropped." << Endl;
17+
return EXIT_SUCCESS;
18+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
#include "key_value.h"
2+
3+
#include <util/string/printf.h>
4+
5+
using namespace NLastGetopt;
6+
using namespace NYdb;
7+
using namespace NYdb::NTable;
8+
9+
10+
TGenerateInitialContentJob::TGenerateInitialContentJob(const TCreateOptions& createOpts, std::uint32_t maxId)
11+
: TThreadJob(createOpts.CommonOptions)
12+
, Executor(createOpts.CommonOptions, Stats, TExecutor::ModeBlocking)
13+
, PackGenerator(
14+
createOpts.CommonOptions
15+
, createOpts.PackSize
16+
, [](const TKeyValueRecordData& recordData) { return BuildValueFromRecord(recordData); }
17+
, createOpts.Count
18+
, maxId
19+
)
20+
, Total(createOpts.Count)
21+
{}
22+
23+
void TGenerateInitialContentJob::ShowProgress(TStringBuilder& report) {
24+
report << Endl << "======- GenerateInitialContentJob report -======" << Endl;
25+
Executor.Report(report);
26+
TDuration timePassed = TInstant::Now() - Stats.GetStartTime();
27+
std::uint64_t rps = (Total - PackGenerator.GetRemain()) * 1000000 / timePassed.MicroSeconds();
28+
report << "Generated " << Total - PackGenerator.GetRemain() << " new elements." << Endl
29+
<< "With pack_size=" << PackGenerator.GetPackSize() << " its " << rps << " rows/sec" << Endl
30+
<< "Generator compute time: " << PackGenerator.GetComputeTime() << Endl;
31+
Stats.PrintStatistics(report);
32+
report << "========================================" << Endl;
33+
}
34+
35+
void TGenerateInitialContentJob::DoJob() {
36+
std::vector<TValue> pack;
37+
while (!ShouldStop.load() && PackGenerator.GetNextPack(pack)) {
38+
auto upload = [pack{ std::move(pack) }, this](TSession session)->TAsyncStatus {
39+
static const TString query = Sprintf(R"(
40+
--!syntax_v1
41+
PRAGMA TablePathPrefix("%s");
42+
43+
DECLARE $items AS
44+
List<Struct<
45+
`object_id_key`: Uint32,
46+
`object_id`: Uint32,
47+
`timestamp`: Uint64,
48+
`payload`: Utf8>>;
49+
50+
UPSERT INTO `%s` SELECT * FROM AS_TABLE($items);
51+
52+
)", Prefix.c_str(), TableName.c_str());
53+
auto promise = NThreading::NewPromise<TStatus>();
54+
auto params = PackValuesToParamsAsList(pack);
55+
56+
auto resultFuture = session.ExecuteDataQuery(
57+
query
58+
, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()
59+
, std::move(params)
60+
, TExecDataQuerySettings()
61+
.KeepInQueryCache(true)
62+
.OperationTimeout(MaxDelay + ReactionTimeDelay)
63+
.ClientTimeout(MaxDelay + ReactionTimeDelay)
64+
);
65+
66+
resultFuture.Subscribe([promise](TAsyncDataQueryResult queryFuture) mutable {
67+
Y_ABORT_UNLESS(queryFuture.HasValue());
68+
TDataQueryResult queryResult = queryFuture.GetValue();
69+
promise.SetValue(std::move(queryResult));
70+
});
71+
72+
return promise.GetFuture();
73+
};
74+
75+
if (!Executor.Execute(upload)) {
76+
break;
77+
}
78+
}
79+
}
80+
81+
void TGenerateInitialContentJob::OnFinish() {
82+
Executor.Finish();
83+
Executor.Wait();
84+
Stats.Flush();
85+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#include "key_value.h"
2+
3+
#include <util/stream/file.h>
4+
#include <util/system/getpid.h>
5+
#include <util/thread/pool.h>
6+
7+
using namespace NYdb;
8+
9+
const std::string TableName = "key_value";
10+
11+
NYdb::TValue BuildValueFromRecord(const TKeyValueRecordData& recordData) {
12+
NYdb::TValueBuilder value;
13+
value.BeginStruct();
14+
value.AddMember("object_id_key").Uint32(GetHash(recordData.ObjectId));
15+
value.AddMember("object_id").Uint32(recordData.ObjectId);
16+
value.AddMember("timestamp").Uint64(recordData.Timestamp);
17+
value.AddMember("payload").Utf8(recordData.Payload);
18+
value.EndStruct();
19+
return value.Build();
20+
}
21+
22+
int DoCreate(TDatabaseOptions& dbOptions, int argc, char** argv) {
23+
TCreateOptions createOptions{ {dbOptions} };
24+
if (!ParseOptionsCreate(argc, argv, createOptions)) {
25+
return EXIT_FAILURE;
26+
}
27+
28+
createOptions.CommonOptions.MaxInfly = createOptions.CommonOptions.MaxInputThreads;
29+
30+
int result = CreateTable(dbOptions);
31+
if (result) {
32+
return result;
33+
}
34+
35+
std::uint32_t maxId = GetTableStats(dbOptions, TableName).MaxId;
36+
37+
createOptions.CommonOptions.ReactionTime = TDuration::Seconds(20);
38+
39+
Cout << TInstant::Now().ToRfc822StringLocal() << " Uploading initial content... do 'kill -USR1 " << GetPID()
40+
<< "' for progress details or Ctrl/Cmd+C to interrupt" << Endl;
41+
42+
std::shared_ptr<TJobContainer>& jobs = *Singleton<std::shared_ptr<TJobContainer>>();
43+
TJobGC gc(jobs);
44+
jobs = std::make_shared<TJobContainer>();
45+
46+
jobs->Add(new TGenerateInitialContentJob(createOptions, maxId));
47+
48+
SetUpInteraction();
49+
50+
jobs->Start();
51+
jobs->Wait();
52+
jobs->ShowProgress();
53+
54+
return EXIT_SUCCESS;
55+
}
56+
57+
int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv) {
58+
Y_UNUSED(dbOptions);
59+
Y_UNUSED(argc);
60+
Y_UNUSED(argv);
61+
Cerr << "The run command has not been implemented for this scenario yet" << Endl;
62+
return EXIT_SUCCESS;
63+
}
64+
65+
int DoCleanup(TDatabaseOptions& dbOptions, int argc) {
66+
if (argc > 1) {
67+
Cerr << "Unexpected arguments after cleanup" << Endl;
68+
return EXIT_FAILURE;
69+
}
70+
return DropTable(dbOptions);
71+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#pragma once
2+
3+
#include <tests/slo_workloads/utils/utils.h>
4+
#include <tests/slo_workloads/utils/executor.h>
5+
#include <tests/slo_workloads/utils/generator.h>
6+
#include <tests/slo_workloads/utils/job.h>
7+
8+
extern const std::string TableName;
9+
10+
NYdb::TValue BuildValueFromRecord(const TKeyValueRecordData& recordData);
11+
12+
// Initial content generation
13+
class TGenerateInitialContentJob : public TThreadJob {
14+
public:
15+
TGenerateInitialContentJob(const TCreateOptions& createOpts, std::uint32_t maxId);
16+
void ShowProgress(TStringBuilder& report) override;
17+
void DoJob() override;
18+
void OnFinish() override;
19+
20+
private:
21+
TExecutor Executor;
22+
TPackGenerator<TKeyValueGenerator, TKeyValueRecordData> PackGenerator;
23+
std::uint64_t Total;
24+
};
25+
26+
int CreateTable(TDatabaseOptions& dbOptions);
27+
int DropTable(TDatabaseOptions& dbOptions);
28+
29+
// Creates a table and fills it with initial content
30+
int DoCreate(TDatabaseOptions& dbOptions, int argc, char** argv);
31+
// Not implemented
32+
int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv);
33+
// Drops the table
34+
int DoCleanup(TDatabaseOptions& dbOptions, int argc);
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#include "key_value.h"
2+
3+
4+
int main(int argc, char** argv) {
5+
return DoMain(argc, argv, DoCreate, DoRun, DoCleanup);
6+
}

0 commit comments

Comments
 (0)