Skip to content

Commit a55bf5d

Browse files
authored
Mixed stress test (#14163)
1 parent 751c197 commit a55bf5d

File tree

9 files changed

+646
-2
lines changed

9 files changed

+646
-2
lines changed
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
#include "mixed.h"
2+
#include <util/generic/serialized_enum.h>
3+
#include <util/random/normal.h>
4+
#include <util/random/random.h>
5+
#include <util/datetime/base.h>
6+
7+
namespace NYdbWorkload {
8+
9+
namespace NMixed {
10+
11+
using TRow = TLogGenerator::TRow;
12+
13+
14+
TLogGenerator::TLogGenerator(const TMixedWorkloadParams* params)
15+
: TBase(params)
16+
, TotalColumnsCnt(1 + Params.IntColumnsCnt + Params.StrColumnsCnt)
17+
{
18+
Y_ABORT_UNLESS(TotalColumnsCnt >= Params.KeyColumnsCnt);
19+
}
20+
21+
std::string TLogGenerator::GetDDLQueries() const {
22+
std::stringstream ss;
23+
24+
ss << "--!syntax_v1\n";
25+
ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`(";
26+
27+
for (size_t i = 0; i < TotalColumnsCnt; ++i) {
28+
if (i == 0) {
29+
ss << "ts Timestamp";
30+
} else if (i < Params.IntColumnsCnt + 1) {
31+
ss << "c" << i << " Uint64";
32+
} else {
33+
ss << "c" << i << " String";
34+
}
35+
36+
if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TMixedWorkloadParams::EStoreType::Column) {
37+
ss << " NOT NULL";
38+
}
39+
ss << ", ";
40+
}
41+
42+
ss << "PRIMARY KEY(";
43+
ss << "ts";
44+
for (size_t i = 1; i < Params.KeyColumnsCnt; ++i) {
45+
ss << ", c" << i;
46+
}
47+
ss << ")) WITH (";
48+
49+
ss << "TTL = Interval(\"PT" << Params.TimestampTtlMinutes << "M\") ON ts, ";
50+
51+
switch (Params.GetStoreType()) {
52+
case TMixedWorkloadParams::EStoreType::Row:
53+
ss << "STORE = ROW, ";
54+
break;
55+
case TMixedWorkloadParams::EStoreType::Column:
56+
ss << "STORE = COLUMN, ";
57+
break;
58+
default:
59+
throw yexception() << "Unsupported store type: " << Params.GetStoreType();
60+
}
61+
if (Params.PartitionsByLoad) {
62+
ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";
63+
}
64+
ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", ";
65+
ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", ";
66+
ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")";
67+
return ss.str();
68+
}
69+
70+
TQueryInfoList TLogGenerator::GetWorkload(int type) {
71+
switch (static_cast<EType>(type)) {
72+
case EType::Insert:
73+
return Insert(GenerateRandomRows());
74+
case EType::Upsert:
75+
return Upsert(GenerateRandomRows());
76+
case EType::BulkUpsert:
77+
return BulkUpsert(GenerateRandomRows());
78+
case EType::Select:
79+
return Select(GenerateRandomRows());
80+
default:
81+
return TQueryInfoList();
82+
}
83+
}
84+
85+
86+
TVector<IWorkloadQueryGenerator::TWorkloadType> TLogGenerator::GetSupportedWorkloadTypes() const {
87+
TVector<TWorkloadType> result;
88+
result.emplace_back(static_cast<int>(EType::Insert), "insert", "Insert random rows into table near current ts");
89+
result.emplace_back(static_cast<int>(EType::Upsert), "upsert", "Upsert random rows into table near current ts");
90+
result.emplace_back(static_cast<int>(EType::BulkUpsert), "bulk_upsert", "Bulk upsert random rows into table near current ts");
91+
result.emplace_back(static_cast<int>(EType::Select), "select", "Select random rows from table");
92+
return result;
93+
}
94+
95+
TQueryInfoList TLogGenerator::WriteRows(TString operation, TVector<TRow>&& rows) {
96+
std::stringstream ss;
97+
98+
NYdb::TParamsBuilder paramsBuilder;
99+
100+
ss << "--!syntax_v1\n";
101+
102+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
103+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
104+
TString cname = "$c" + std::to_string(row) + "_" + std::to_string(col);
105+
if (col == 0) {
106+
ss << "DECLARE " << cname << " AS Timestamp;\n";
107+
paramsBuilder.AddParam(cname).Timestamp(rows[row].Ts).Build();
108+
} else if (col < Params.IntColumnsCnt + 1) {
109+
ss << "DECLARE " << cname << " AS Uint64;\n";
110+
paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col - 1]).Build();
111+
} else {
112+
ss << "DECLARE " << cname << " AS String;\n";
113+
paramsBuilder.AddParam(cname).String(rows[row].Strings[col - Params.IntColumnsCnt - 1]).Build();
114+
}
115+
}
116+
}
117+
118+
ss << operation << " INTO `" << Params.TableName << "` (";
119+
120+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
121+
if (col != 0) {
122+
ss << "c" << col;
123+
} else {
124+
ss << "ts";
125+
}
126+
127+
if (col + 1 < TotalColumnsCnt) {
128+
ss << ", ";
129+
}
130+
}
131+
132+
ss << ") VALUES ";
133+
134+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
135+
ss << "(";
136+
137+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
138+
ss << "$c" << row << "_" << col;
139+
if (col + 1 < TotalColumnsCnt) {
140+
ss << ", ";
141+
}
142+
}
143+
144+
ss << ")";
145+
146+
if (row + 1 < Params.RowsCnt) {
147+
ss << ", ";
148+
}
149+
}
150+
auto params = paramsBuilder.Build();
151+
return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));
152+
}
153+
154+
TQueryInfoList TLogGenerator::Insert(TVector<TRow>&& rows) {
155+
return WriteRows("INSERT", std::move(rows));
156+
}
157+
158+
TQueryInfoList TLogGenerator::Upsert(TVector<TRow>&& rows) {
159+
return WriteRows("UPSERT", std::move(rows));
160+
}
161+
162+
TQueryInfoList TLogGenerator::Select(TVector<TRow>&& rows) {
163+
std::stringstream ss;
164+
165+
NYdb::TParamsBuilder paramsBuilder;
166+
167+
ss << "--!syntax_v1\n";
168+
169+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
170+
for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) {
171+
TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col);
172+
if (col == 0) {
173+
ss << "DECLARE " << paramName << " AS Timestamp;\n";
174+
paramsBuilder.AddParam(paramName).Timestamp(rows[row].Ts).Build();
175+
} else if (col < Params.IntColumnsCnt) {
176+
ss << "DECLARE " << paramName << " AS Uint64;\n";
177+
paramsBuilder.AddParam(paramName).Uint64(rows[row].Ints[col]).Build();
178+
} else {
179+
ss << "DECLARE " << paramName << " AS String;\n";
180+
paramsBuilder.AddParam(paramName).String(rows[row].Strings[col - Params.IntColumnsCnt]).Build();
181+
}
182+
}
183+
}
184+
185+
ss << "SELECT ";
186+
for (size_t col = 1; col <= TotalColumnsCnt; ++col) {
187+
ss << "c" << col;
188+
if (col + 1 < TotalColumnsCnt) {
189+
ss << ",";
190+
}
191+
ss << " ";
192+
}
193+
194+
ss << "FROM `" << Params.TableName << "` WHERE ";
195+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
196+
for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) {
197+
TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col);
198+
if (col == 0) {
199+
ss << "ts = " << paramName;
200+
} else {
201+
ss << "c" << col << " = " << paramName;
202+
}
203+
if (col + 1 < Params.KeyColumnsCnt) {
204+
ss << " AND ";
205+
}
206+
}
207+
if (row + 1 < Params.RowsCnt) {
208+
ss << " OR ";
209+
}
210+
}
211+
212+
auto params = paramsBuilder.Build();
213+
TQueryInfo info(ss.str(), std::move(params));
214+
return TQueryInfoList(1, std::move(info));
215+
}
216+
217+
TQueryInfoList TLogGenerator::BulkUpsert(TVector<TRow>&& rows) {
218+
NYdb::TValueBuilder valueBuilder;
219+
valueBuilder.BeginList();
220+
for (const TRow& row : rows) {
221+
auto &listItem = valueBuilder.AddListItem();
222+
listItem.BeginStruct();
223+
for (size_t col = 0; col < TotalColumnsCnt; ++col) {
224+
if (col == 0) {
225+
listItem.AddMember("ts").Timestamp(row.Ts);
226+
} else if (col < Params.IntColumnsCnt + 1) {
227+
listItem.AddMember(std::format("c{}", col)).Uint64(row.Ints[col-1]);
228+
} else {
229+
listItem.AddMember(std::format("c{}", col)).String(row.Strings[col - Params.IntColumnsCnt - 1]);
230+
}
231+
}
232+
listItem.EndStruct();
233+
}
234+
valueBuilder.EndList();
235+
TString table_path = Params.DbPath + "/" + Params.TableName;
236+
NYdb::TValue rowsValue = valueBuilder.Build();
237+
auto bulkUpsertOperation = [table_path, rowsValue](NYdb::NTable::TTableClient& tableClient) {
238+
auto r = rowsValue;
239+
auto status = tableClient.BulkUpsert(table_path, std::move(r));
240+
return status.GetValueSync();
241+
};
242+
TQueryInfo queryInfo;
243+
queryInfo.TableOperation = bulkUpsertOperation;
244+
return TQueryInfoList(1, std::move(queryInfo));
245+
}
246+
247+
248+
TQueryInfoList TLogGenerator::GetInitialData() {
249+
TQueryInfoList res;
250+
return res;
251+
}
252+
253+
TVector<std::string> TLogGenerator::GetCleanPaths() const {
254+
return { Params.TableName };
255+
}
256+
257+
TVector<TRow> TLogGenerator::GenerateRandomRows() {
258+
TVector<TRow> result(Params.RowsCnt);
259+
260+
for (size_t row = 0; row < Params.RowsCnt; ++row) {
261+
result[row].Ts = TInstant::Now();
262+
i64 millisecondsDiff = 60 * 1000 * NormalRandom(0., static_cast<double>(Params.TimestampStandardDeviationMinutes));
263+
if (millisecondsDiff >= 0) { // TDuration::MilliSeconds can't be negative for some reason...
264+
result[row].Ts = result[row].Ts + TDuration::MilliSeconds(millisecondsDiff);
265+
} else {
266+
result[row].Ts = result[row].Ts - TDuration::MilliSeconds(-millisecondsDiff);
267+
}
268+
269+
result[row].Ints.resize(Params.IntColumnsCnt);
270+
result[row].Strings.resize(Params.StrColumnsCnt);
271+
272+
for (size_t col = 0; col < Params.IntColumnsCnt; ++col) {
273+
ui64 val = RandomNumber<ui64>();
274+
result[row].Ints[col] = val;
275+
}
276+
277+
for (size_t col = 0; col < Params.StrColumnsCnt; ++col) {
278+
TString val;
279+
val = TString(Params.StringLen, '_');
280+
for (size_t i = 0; i < Params.StringLen; i++) {
281+
val[i] = (char)('a' + RandomNumber<u_char>(26));
282+
}
283+
result[row].Strings[col] = val;
284+
}
285+
}
286+
287+
return result;
288+
}
289+
290+
void TMixedWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) {
291+
opts.AddLongOption('p', "path", "Path where benchmark tables are located")
292+
.Optional()
293+
.DefaultValue(TableName)
294+
.Handler1T<TStringBuf>([this](TStringBuf arg) {
295+
while(arg.SkipPrefix("/"));
296+
while(arg.ChopSuffix("/"));
297+
TableName = arg;
298+
});
299+
switch (commandType) {
300+
case TWorkloadParams::ECommandType::Init:
301+
opts.AddLongOption("min-partitions", "Minimum partitions for tables.")
302+
.DefaultValue((ui64)LogWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions);
303+
opts.AddLongOption("max-partitions", "Maximum partitions for tables.")
304+
.DefaultValue((ui64)LogWorkloadConstants::MAX_PARTITIONS).StoreResult(&MaxPartitions);
305+
opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).")
306+
.DefaultValue((ui64)LogWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb);
307+
opts.AddLongOption("auto-partition", "Enable auto partitioning by load.")
308+
.DefaultValue((ui64)LogWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad);
309+
opts.AddLongOption("len", "String len")
310+
.DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
311+
opts.AddLongOption("int-cols", "Number of int columns")
312+
.DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
313+
opts.AddLongOption("str-cols", "Number of string columns")
314+
.DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt);
315+
opts.AddLongOption("key-cols", "Number of key columns")
316+
.DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
317+
opts.AddLongOption("ttl", "TTL for timestamp column in minutes")
318+
.DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes);
319+
opts.AddLongOption("store", "Storage type."
320+
" Options: row, column\n"
321+
" row - use row-based storage engine;\n"
322+
" column - use column-based storage engine.")
323+
.DefaultValue(StoreType)
324+
.Handler1T<TStringBuf>([this](TStringBuf arg) {
325+
const auto l = to_lower(TString(arg));
326+
if (!TryFromString(arg, StoreType)) {
327+
throw yexception() << "Ivalid store type: " << arg;
328+
}
329+
});
330+
break;
331+
case TWorkloadParams::ECommandType::Run:
332+
opts.AddLongOption("int-cols", "Number of int columns")
333+
.DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
334+
opts.AddLongOption("str-cols", "Number of string columns")
335+
.DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt);
336+
opts.AddLongOption("key-cols", "Number of key columns")
337+
.DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
338+
switch (static_cast<TLogGenerator::EType>(workloadType)) {
339+
case TLogGenerator::EType::Select:
340+
case TLogGenerator::EType::Insert:
341+
case TLogGenerator::EType::Upsert:
342+
case TLogGenerator::EType::BulkUpsert:
343+
opts.AddLongOption("len", "String len")
344+
.DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
345+
opts.AddLongOption("rows", "Number of rows to upsert")
346+
.DefaultValue((ui64)LogWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
347+
opts.AddLongOption("timestamp_deviation", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.")
348+
.DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes);
349+
break;
350+
}
351+
break;
352+
default:
353+
break;
354+
}
355+
}
356+
357+
THolder<IWorkloadQueryGenerator> TMixedWorkloadParams::CreateGenerator() const {
358+
return MakeHolder<TLogGenerator>(this);
359+
}
360+
361+
TString TMixedWorkloadParams::GetWorkloadName() const {
362+
return "Log";
363+
}
364+
365+
} // namespace NMixed
366+
367+
} // namespace NYdbWorkload

0 commit comments

Comments
 (0)