Skip to content

Commit c6e9b4f

Browse files
Gazizonokigithub-actions[bot]
authored andcommitted
[C++ SDK] Added run command for key_value SLO workload (#25350)
1 parent ff38128 commit c6e9b4f

File tree

14 files changed

+311
-97
lines changed

14 files changed

+311
-97
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
5530b6f935933ca64b83d1498b6fffe3e1143390
1+
8c2829069cd347ea279af65062f90c71a2cd4e8e

include/ydb-cpp-sdk/client/types/ydb.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ class TBalancingPolicy {
6060
//! Use pile with preferable state
6161
static TBalancingPolicy UsePreferablePileState(EPileState pileState = EPileState::PRIMARY);
6262

63+
TBalancingPolicy(const TBalancingPolicy&) = delete;
64+
TBalancingPolicy(TBalancingPolicy&&) = default;
65+
TBalancingPolicy& operator=(const TBalancingPolicy&) = delete;
66+
TBalancingPolicy& operator=(TBalancingPolicy&&) = default;
67+
68+
~TBalancingPolicy();
69+
6370
class TImpl;
6471
private:
6572
TBalancingPolicy(std::unique_ptr<TImpl>&& impl);

src/client/driver/driver.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
6969
TCP_KEEPALIVE_INTERVAL
7070
};
7171
bool DrainOnDtors = true;
72-
TBalancingPolicy::TImpl BalancingSettings = TBalancingPolicy::TImpl("");
72+
TBalancingPolicy::TImpl BalancingSettings = TBalancingPolicy::TImpl::UsePreferableLocation("");
7373
TDuration GRpcKeepAliveTimeout = TDuration::Seconds(10);
7474
bool GRpcKeepAlivePermitWithoutCalls = true;
7575
TDuration SocketIdleTimeout = TDuration::Minutes(6);

src/client/impl/internal/common/balancing_policies.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,24 @@
33

44
namespace NYdb::inline V3 {
55

6-
std::unique_ptr<TBalancingPolicy::TImpl> TBalancingPolicy::TImpl::UseAllNodes() {
7-
return std::make_unique<TImpl>();
6+
TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UseAllNodes() {
7+
TBalancingPolicy::TImpl impl;
8+
impl.PolicyType = EPolicyType::UseAllNodes;
9+
return impl;
810
}
911

10-
std::unique_ptr<TBalancingPolicy::TImpl> TBalancingPolicy::TImpl::UsePreferableLocation(const std::string& location) {
11-
return std::make_unique<TImpl>(location);
12+
TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UsePreferableLocation(const std::string& location) {
13+
TBalancingPolicy::TImpl impl;
14+
impl.PolicyType = EPolicyType::UsePreferableLocation;
15+
impl.Location = location;
16+
return impl;
1217
}
1318

14-
std::unique_ptr<TBalancingPolicy::TImpl> TBalancingPolicy::TImpl::UsePreferablePileState(EPileState pileState) {
15-
return std::make_unique<TImpl>(pileState);
19+
TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UsePreferablePileState(EPileState pileState) {
20+
TBalancingPolicy::TImpl impl;
21+
impl.PolicyType = EPolicyType::UsePreferablePileState;
22+
impl.PileState = pileState;
23+
return impl;
1624
}
1725

1826
}

src/client/impl/internal/common/balancing_policies.h

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,11 @@ class TBalancingPolicy::TImpl {
1717
UsePreferablePileState
1818
};
1919

20-
static std::unique_ptr<TImpl> UseAllNodes();
20+
static TImpl UseAllNodes();
2121

22-
static std::unique_ptr<TImpl> UsePreferableLocation(const std::string& location);
22+
static TImpl UsePreferableLocation(const std::string& location);
2323

24-
static std::unique_ptr<TImpl> UsePreferablePileState(EPileState pileState);
25-
26-
TImpl()
27-
: PolicyType(EPolicyType::UseAllNodes)
28-
{}
29-
30-
TImpl(const std::string& location)
31-
: PolicyType(EPolicyType::UsePreferableLocation)
32-
, Location(location)
33-
{}
34-
35-
TImpl(EPileState pileState)
36-
: PolicyType(EPolicyType::UsePreferablePileState)
37-
, PileState(pileState)
38-
{}
24+
static TImpl UsePreferablePileState(EPileState pileState);
3925

4026
EPolicyType PolicyType;
4127

src/client/types/ydb.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,30 @@ namespace NYdb::inline V3 {
1010
TBalancingPolicy::TBalancingPolicy(EBalancingPolicy policy, const std::string& params) {
1111
switch (policy) {
1212
case EBalancingPolicy::UsePreferableLocation:
13-
Impl_ = TImpl::UsePreferableLocation(params);
13+
Impl_ = std::make_unique<TImpl>(TImpl::UsePreferableLocation(params));
1414
break;
1515
case EBalancingPolicy::UseAllNodes:
16-
Impl_ = TImpl::UseAllNodes();
16+
Impl_ = std::make_unique<TImpl>(TImpl::UseAllNodes());
1717
break;
1818
}
1919
}
2020

2121
TBalancingPolicy TBalancingPolicy::UsePreferableLocation(const std::string& location) {
22-
return TBalancingPolicy(TImpl::UsePreferableLocation(location));
22+
return TBalancingPolicy(std::make_unique<TImpl>(TImpl::UsePreferableLocation(location)));
2323
}
2424

2525
TBalancingPolicy TBalancingPolicy::UseAllNodes() {
26-
return TBalancingPolicy(TImpl::UseAllNodes());
26+
return TBalancingPolicy(std::make_unique<TImpl>(TImpl::UseAllNodes()));
2727
}
2828

2929
TBalancingPolicy TBalancingPolicy::UsePreferablePileState(EPileState pileState) {
30-
return TBalancingPolicy(TImpl::UsePreferablePileState(pileState));
30+
return TBalancingPolicy(std::make_unique<TImpl>(TImpl::UsePreferablePileState(pileState)));
3131
}
3232

3333
TBalancingPolicy::TBalancingPolicy(std::unique_ptr<TImpl>&& impl)
3434
: Impl_(std::move(impl))
3535
{}
3636

37+
TBalancingPolicy::~TBalancingPolicy() = default;
38+
3739
}

tests/slo_workloads/key_value/drop.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ using namespace NYdb;
55
using namespace NYdb::NTable;
66

77
static void DropTable(TTableClient& client, const std::string& path) {
8-
ThrowOnError(client.RetryOperationSync([path](TSession session) {
8+
NYdb::NStatusHelpers::ThrowOnError(client.RetryOperationSync([path](TSession session) {
99
return session.DropTable(path).ExtractValueSync();
1010
}));
1111
}

tests/slo_workloads/key_value/key_value.cpp

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,47 @@ int DoCreate(TDatabaseOptions& dbOptions, int argc, char** argv) {
5555
}
5656

5757
int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv) {
58-
Y_UNUSED(dbOptions);
59-
Y_UNUSED(argc);
60-
Y_UNUSED(argv);
61-
Cerr << "The run command has not been implemented for this scenario yet" << Endl;
58+
TRunOptions runOptions{ {dbOptions} };
59+
if (!ParseOptionsRun(argc, argv, runOptions)) {
60+
return EXIT_FAILURE;
61+
}
62+
63+
Cout << TInstant::Now().ToRfc822StringLocal() << " Creating and initializing jobs..." << Endl;
64+
65+
std::uint32_t maxId = GetTableStats(dbOptions, TableName).MaxId;
66+
67+
std::shared_ptr<TJobContainer>& jobs = *Singleton<std::shared_ptr<TJobContainer>>();
68+
TJobGC gc(jobs);
69+
jobs = std::make_shared<TJobContainer>();
70+
71+
if (!runOptions.DontRunA) {
72+
runOptions.CommonOptions.Rps = runOptions.Read_rps;
73+
runOptions.CommonOptions.ReactionTime = TDuration::MilliSeconds(runOptions.CommonOptions.A_ReactionTime);
74+
jobs->Add(new TReadJob(runOptions.CommonOptions, maxId));
75+
}
76+
if (!runOptions.DontRunB) {
77+
runOptions.CommonOptions.Rps = runOptions.Write_rps;
78+
runOptions.CommonOptions.ReactionTime = DefaultReactionTime;
79+
jobs->Add(new TWriteJob(runOptions.CommonOptions, maxId));
80+
}
81+
82+
TInstant start = TInstant::Now();
83+
TInstant deadline = start + TDuration::Seconds(runOptions.CommonOptions.SecondsToRun);
84+
85+
jobs->Start(deadline);
86+
87+
SetUpInteraction();
88+
Cout << "Jobs launched. Do 'kill -USR1 " << GetPID()
89+
<< "' for progress details or 'kill -INT " << GetPID() << "' (Ctrl/Cmd + C) to interrupt" << Endl
90+
<< " Start time: " << start.ToRfc822StringLocal() << Endl
91+
<< "Estimated finish time: " << deadline.ToRfc822StringLocal() << Endl;
92+
93+
jobs->Wait();
94+
95+
Cout << "All jobs finished: " << TInstant::Now().ToRfc822StringLocal() << Endl;
96+
97+
jobs->ShowProgress();
98+
6299
return EXIT_SUCCESS;
63100
}
64101

tests/slo_workloads/key_value/key_value.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,34 @@ class TGenerateInitialContentJob : public TThreadJob {
2323
std::uint64_t Total;
2424
};
2525

26+
// Write workload job
27+
class TWriteJob : public TThreadJob {
28+
public:
29+
TWriteJob(const TCommonOptions& opts, std::uint32_t maxId);
30+
void ShowProgress(TStringBuilder& report) override;
31+
void DoJob() override;
32+
void OnFinish() override;
33+
34+
private:
35+
TExecutor Executor;
36+
TKeyValueGenerator Generator;
37+
std::atomic<std::uint64_t> ValuesGenerated = 0;
38+
};
39+
40+
// Read workload job
41+
class TReadJob : public TThreadJob {
42+
public:
43+
TReadJob(const TCommonOptions& opts, std::uint32_t maxId);
44+
void ShowProgress(TStringBuilder& report) override;
45+
void DoJob() override;
46+
void OnFinish() override;
47+
48+
private:
49+
std::unique_ptr<TExecutor> Executor;
50+
std::uint32_t ObjectIdRange;
51+
bool SaveResult;
52+
};
53+
2654
int CreateTable(TDatabaseOptions& dbOptions);
2755
int DropTable(TDatabaseOptions& dbOptions);
2856

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
#include "key_value.h"
2+
3+
#include <ydb-cpp-sdk/client/table/table.h>
4+
#include <util/string/printf.h>
5+
6+
using namespace NYdb;
7+
using namespace NYdb::NTable;
8+
9+
TWriteJob::TWriteJob(const TCommonOptions& opts, std::uint32_t maxId)
10+
: TThreadJob(opts)
11+
, Executor(opts, Stats, TExecutor::ModeNonBlocking)
12+
, Generator(opts, maxId)
13+
{}
14+
15+
void TWriteJob::ShowProgress(TStringBuilder& report) {
16+
report << Endl << "=====- WriteJob report (Thread B) -=====" << Endl;
17+
Executor.Report(report);
18+
report << "Generated " << ValuesGenerated.load() << " new elements." << Endl
19+
<< "Generator compute time: " << Generator.GetComputeTime() << Endl;
20+
Stats.PrintStatistics(report);
21+
report << "==================================================" << Endl;
22+
}
23+
24+
void TWriteJob::DoJob() {
25+
while (!ShouldStop.load() && TInstant::Now() < Deadline) {
26+
TKeyValueRecordData recordData = Generator.Get();
27+
auto value = BuildValueFromRecord(recordData);
28+
29+
ValuesGenerated.fetch_add(1);
30+
31+
auto upload = [value{ std::move(value) }, this](TSession session)->TAsyncStatus {
32+
static const TString query = Sprintf(R"(
33+
--!syntax_v1
34+
PRAGMA TablePathPrefix("%s");
35+
36+
DECLARE $items AS
37+
List<Struct<
38+
`object_id_key`: Uint32,
39+
`object_id`: Uint32,
40+
`timestamp`: Uint64,
41+
`payload`: Utf8>>;
42+
43+
UPSERT INTO `%s` SELECT * FROM AS_TABLE($items);
44+
45+
)", Prefix.c_str(), TableName.c_str());
46+
47+
auto promise = NThreading::NewPromise<TStatus>();
48+
auto params = PackValuesToParamsAsList({value});
49+
50+
auto resultFuture = session.ExecuteDataQuery(
51+
query,
52+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
53+
std::move(params),
54+
TExecDataQuerySettings()
55+
.KeepInQueryCache(true)
56+
.OperationTimeout(MaxDelay + ReactionTimeDelay)
57+
.ClientTimeout(MaxDelay + ReactionTimeDelay)
58+
);
59+
60+
resultFuture.Subscribe([promise](TAsyncDataQueryResult queryFuture) mutable {
61+
Y_ABORT_UNLESS(queryFuture.HasValue());
62+
TDataQueryResult queryResult = queryFuture.GetValue();
63+
promise.SetValue(std::move(queryResult));
64+
});
65+
66+
return promise.GetFuture();
67+
};
68+
69+
RpsProvider.Use();
70+
71+
if (!Executor.Execute(upload)) {
72+
break;
73+
}
74+
}
75+
}
76+
77+
void TWriteJob::OnFinish() {
78+
Executor.Finish();
79+
Executor.Wait();
80+
Stats.Flush();
81+
}
82+
83+
// Implementation of TReadJob
84+
TReadJob::TReadJob(const TCommonOptions& opts, std::uint32_t maxId)
85+
: TThreadJob(opts)
86+
, Executor(opts.RetryMode ? new TExecutorWithRetry(opts, Stats) : new TExecutor(opts, Stats))
87+
, ObjectIdRange(static_cast<std::uint32_t>(maxId * 1.25)) // 20% of requests with no result
88+
, SaveResult(opts.SaveResult)
89+
{}
90+
91+
void TReadJob::ShowProgress(TStringBuilder& report) {
92+
report << Endl << "======- ReadJob report (Thread A) -======" << Endl;
93+
Executor->Report(report);
94+
Stats.PrintStatistics(report);
95+
report << "========================================" << Endl;
96+
}
97+
98+
void TReadJob::DoJob() {
99+
while (!ShouldStop.load() && TInstant::Now() < Deadline) {
100+
std::uint32_t idToSelect = RandomNumber<std::uint32_t>() % ObjectIdRange;
101+
102+
auto read = [idToSelect, this](TSession session) -> TAsyncStatus {
103+
static const TString query = Sprintf(R"(
104+
--!syntax_v1
105+
PRAGMA TablePathPrefix("%s");
106+
107+
DECLARE $object_id_key AS Uint32;
108+
DECLARE $object_id AS Uint32;
109+
110+
SELECT * FROM `%s` WHERE `object_id_key` = $object_id_key AND `object_id` = $object_id;
111+
112+
)", Prefix.c_str(), TableName.c_str());
113+
114+
auto promise = NThreading::NewPromise<TStatus>();
115+
auto params = TParamsBuilder()
116+
.AddParam("$object_id_key")
117+
.Uint32(GetHash(idToSelect))
118+
.Build()
119+
.AddParam("$object_id")
120+
.Uint32(idToSelect)
121+
.Build()
122+
.Build();
123+
124+
auto resultFuture = session.ExecuteDataQuery(
125+
query,
126+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
127+
std::move(params),
128+
TExecDataQuerySettings()
129+
.KeepInQueryCache(true)
130+
.OperationTimeout(MaxDelay + ReactionTimeDelay)
131+
.ClientTimeout(MaxDelay + ReactionTimeDelay)
132+
);
133+
134+
resultFuture.Subscribe([promise](TAsyncDataQueryResult queryFuture) mutable {
135+
Y_ABORT_UNLESS(queryFuture.HasValue());
136+
TDataQueryResult queryResult = queryFuture.GetValue();
137+
promise.SetValue(std::move(queryResult));
138+
});
139+
140+
return promise.GetFuture();
141+
};
142+
143+
RpsProvider.Use();
144+
145+
if (!Executor->Execute(read)) {
146+
break;
147+
}
148+
}
149+
}
150+
151+
void TReadJob::OnFinish() {
152+
Executor->Finish();
153+
std::uint32_t infly = Executor->Wait(WaitTimeout);
154+
if (infly) {
155+
Cerr << "Warning: thread A finished while having " << infly << " infly requests." << Endl;
156+
}
157+
Stats.Flush();
158+
if (SaveResult) {
159+
Stats.SaveResult();
160+
}
161+
}

0 commit comments

Comments
 (0)