Skip to content

Commit 100e44d

Browse files
committed
test/allocsim: worker threads, op hashed by client
Signed-off-by: Pere Diaz Bou <[email protected]>
1 parent 6bd4b29 commit 100e44d

File tree

1 file changed

+74
-57
lines changed

1 file changed

+74
-57
lines changed

src/test/objectstore/allocsim/ops_replayer.cc

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <algorithm>
22
#include <cassert>
33
#include <fcntl.h>
4+
#include <ranges>
45
#include <string_view>
56
#include <sys/mman.h>
67
#include <sys/stat.h>
@@ -197,12 +198,77 @@ void parse_entry_point(shared_ptr<ParserContext> context) {
197198
}
198199
}
199200

201+
void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) {
202+
203+
bufferlist bl;
204+
gen_buffer(bl, max_buffer_size);
205+
hash<string> hasher;
206+
207+
cout << "starting thread " << io_depth << endl;
208+
for (auto &op : ops) {
209+
{
210+
std::unique_lock<std::mutex> lock(in_flight_mutex);
211+
cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
212+
213+
}
214+
size_t key = hasher(*op.who) % nworker_threads;
215+
if (key != id) {
216+
continue;
217+
}
218+
// cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
219+
op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
220+
switch (op.type) {
221+
case Write: {
222+
int ret = io->aio_write(*op.object, op.completion, bl, op.length, op.offset);
223+
if (ret != 0) {
224+
cout << fmt::format("Error writing ecode={}", ret) << endl;;
225+
}
226+
break;
227+
}
228+
case WriteFull: {
229+
int ret = io->aio_write_full(*op.object, op.completion, bl);
230+
if (ret != 0) {
231+
cout << fmt::format("Error writing full ecode={}", ret) << endl;;
232+
}
233+
break;
234+
}
235+
case Read: {
236+
bufferlist read;
237+
int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
238+
if (ret != 0) {
239+
cout << fmt::format("Error reading ecode={}", ret) << endl;;
240+
}
241+
break;
242+
}
243+
case Truncate: {
244+
librados::ObjectWriteOperation write_operation;
245+
write_operation.truncate(op.offset);
246+
int ret = io->aio_operate(*op.object, op.completion, &write_operation);
247+
if (ret != 0) {
248+
cout << fmt::format("Error truncating ecode={}", ret) << endl;;
249+
}
250+
break;
251+
}
252+
case Zero: {
253+
librados::ObjectWriteOperation write_operation;
254+
write_operation.zero(op.offset, op.length);
255+
int ret = io->aio_operate(*op.object, op.completion, &write_operation);
256+
if (ret != 0) {
257+
cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
258+
}
259+
break;
260+
}
261+
}
262+
in_flight_ops++;
263+
}
264+
}
265+
200266
int main(int argc, char** argv) {
201267
vector<Op> ops;
202268
librados::Rados cluster;
203269
librados::IoCtx io;
204270
uint64_t max_buffer_size = 0;
205-
uint64_t io_depth = 64;
271+
uint64_t io_depth = 8;
206272
string file;
207273
std::filesystem::path ceph_conf_path;
208274

@@ -255,8 +321,6 @@ int main(int argc, char** argv) {
255321
context->ops.clear();
256322
}
257323

258-
259-
260324
int ret = cluster.init2("client.admin", "ceph", 0);
261325
if (ret < 0) {
262326
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
@@ -286,60 +350,13 @@ int main(int argc, char** argv) {
286350

287351
// process ops
288352
// Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
289-
bufferlist bl;
290-
gen_buffer(bl, max_buffer_size);
291-
292-
for (auto &op : ops) {
293-
{
294-
std::unique_lock<std::mutex> lock(in_flight_mutex);
295-
cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
296-
297-
}
298-
// cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
299-
op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
300-
switch (op.type) {
301-
case Write: {
302-
int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset);
303-
if (ret != 0) {
304-
cout << fmt::format("Error writing ecode={}", ret) << endl;;
305-
}
306-
break;
307-
}
308-
case WriteFull: {
309-
int ret = io.aio_write_full(*op.object, op.completion, bl);
310-
if (ret != 0) {
311-
cout << fmt::format("Error writing full ecode={}", ret) << endl;;
312-
}
313-
break;
314-
}
315-
case Read: {
316-
bufferlist read;
317-
int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
318-
if (ret != 0) {
319-
cout << fmt::format("Error reading ecode={}", ret) << endl;;
320-
}
321-
break;
322-
}
323-
case Truncate: {
324-
librados::ObjectWriteOperation write_operation;
325-
write_operation.truncate(op.offset);
326-
int ret = io.aio_operate(*op.object, op.completion, &write_operation);
327-
if (ret != 0) {
328-
cout << fmt::format("Error truncating ecode={}", ret) << endl;;
329-
}
330-
break;
331-
}
332-
case Zero: {
333-
librados::ObjectWriteOperation write_operation;
334-
write_operation.zero(op.offset, op.length);
335-
int ret = io.aio_operate(*op.object, op.completion, &write_operation);
336-
if (ret != 0) {
337-
cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
338-
}
339-
break;
340-
}
341-
}
342-
in_flight_ops++;
353+
vector<thread> worker_threads;
354+
uint64_t nworker_threads = 16;
355+
for (int i = 0; i < nworker_threads; i++) {
356+
worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io));
357+
}
358+
for (auto& worker : worker_threads) {
359+
worker.join();
343360
}
344361
while (in_flight_ops > 0) {
345362
std::this_thread::sleep_for(std::chrono::milliseconds(100));

0 commit comments

Comments
 (0)