Skip to content

Commit 3ad6e64

Browse files
author
Alexander Indenbaum
committed
bdev/rbd: round-robin reactor selection for SpdkContextWQ
Reactor threads are discovered and cached, and each new RBD bdev gets the next reactor in round-robin order so SpdkContextWQ work is spread across cores instead of a single reactor. Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
1 parent a492b4b commit 3ad6e64

File tree

3 files changed

+87
-47
lines changed

3 files changed

+87
-47
lines changed

module/bdev/rbd/bdev_rbd.c

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@
2121
#include "spdk/bdev_module.h"
2222
#include "spdk/log.h"
2323

24-
/* to access reactor framework */
25-
#include "spdk_internal/event.h"
26-
#include "spdk_internal/thread.h"
27-
2824
SPDK_LOG_REGISTER_COMPONENT(reservation)
2925

3026
static int bdev_rbd_count = 0;
@@ -478,9 +474,6 @@ bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name, const char *namespac
478474
return -1;
479475
}
480476

481-
/* Forward declaration for reactor thread selection function */
482-
static struct spdk_thread *bdev_rbd_find_reactor_thread(void);
483-
484477
static void *
485478
bdev_rbd_init_context(void *arg)
486479
{
@@ -761,46 +754,6 @@ static struct spdk_bdev_module rbd_if = {
761754
};
762755
SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
763756

764-
/**
765-
* Find a reactor thread for SpdkContextWQ.
766-
* Iterates through reactors to find a reactor thread (not app_thread).
767-
* Returns NULL if no suitable thread is found.
768-
*/
769-
static struct spdk_thread *
770-
bdev_rbd_find_reactor_thread(void)
771-
{
772-
uint32_t lcore;
773-
774-
SPDK_ENV_FOREACH_CORE(lcore) {
775-
struct spdk_reactor *reactor = spdk_reactor_get(lcore);
776-
if (reactor == NULL || !reactor->flags.is_valid) {
777-
continue;
778-
}
779-
780-
if (reactor->thread_count == 0) {
781-
continue;
782-
}
783-
784-
struct spdk_lw_thread *lw_thread = TAILQ_FIRST(&reactor->threads);
785-
if (lw_thread == NULL) {
786-
continue;
787-
}
788-
789-
struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
790-
if (thread == NULL || spdk_thread_is_app_thread(thread)) {
791-
continue;
792-
}
793-
794-
uint64_t thread_id = spdk_thread_get_id(thread);
795-
const char *thread_name = spdk_thread_get_name(thread);
796-
SPDK_NOTICELOG("bdev_rbd_find_reactor_thread: reactor thread lcore=%u: thread=%p, id=%lu, name=%s\n",
797-
lcore, thread, thread_id, thread_name ? thread_name : "NULL");
798-
return thread;
799-
}
800-
801-
return NULL;
802-
}
803-
804757
static int bdev_rbd_reset_timer(void *arg);
805758

806759
static void

module/bdev/rbd/bdev_rbd_spdk_context_wq.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <rbd/librbd.h>
88
#include <rados/librados.hpp>
99
#include <atomic>
10+
#include <vector>
1011

1112
#include "bdev_rbd_spdk_context_wq.h"
1213

@@ -15,8 +16,80 @@ extern "C" {
1516
#include "spdk/thread.h"
1617
#include "spdk/log.h"
1718
#include "spdk/env.h"
19+
#include "spdk_internal/event.h"
1820
}
1921

22+
/**
23+
* Encapsulates the set of reactor threads for round-robin assignment.
24+
*/
25+
class ReactorThreadPool {
26+
public:
27+
/** Ensures reactor list exists (lock-free one-time init via CAS). */
28+
static void ensure_discovered() {
29+
if (g_reactor_list.load(std::memory_order_acquire) != nullptr) {
30+
return;
31+
}
32+
auto *p = new std::vector<struct spdk_thread *>();
33+
discover_into(p);
34+
std::vector<struct spdk_thread *> *expected = nullptr;
35+
if (!g_reactor_list.compare_exchange_strong(expected, p, std::memory_order_release)) {
36+
delete p;
37+
}
38+
}
39+
40+
static struct spdk_thread *get_next() {
41+
std::vector<struct spdk_thread *> *list =
42+
g_reactor_list.load(std::memory_order_acquire);
43+
if (list == nullptr || list->empty()) {
44+
SPDK_ERRLOG("bdev_rbd: reactor thread pool is empty, no reactor threads available for SpdkContextWQ\n");
45+
return NULL;
46+
}
47+
size_t n = list->size();
48+
size_t idx = g_reactor_next.fetch_add(1, std::memory_order_relaxed) % n;
49+
struct spdk_thread *t = (*list)[idx];
50+
const char *name = spdk_thread_get_name(t);
51+
SPDK_NOTICELOG("bdev_rbd: next reactor thread=%p (id=%lu, name=%s, index=%zu/%zu)\n",
52+
t, spdk_thread_get_id(t), name ? name : "NULL", idx, n);
53+
return t;
54+
}
55+
56+
private:
57+
static void discover_into(std::vector<struct spdk_thread *> *vec) {
58+
uint32_t lcore;
59+
SPDK_ENV_FOREACH_CORE(lcore) {
60+
struct spdk_reactor *reactor = spdk_reactor_get(lcore);
61+
if (reactor == NULL || !reactor->flags.is_valid) {
62+
continue;
63+
}
64+
if (reactor->thread_count == 0) {
65+
continue;
66+
}
67+
struct spdk_lw_thread *lw_thread = TAILQ_FIRST(&reactor->threads);
68+
if (lw_thread == NULL) {
69+
continue;
70+
}
71+
struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
72+
if (thread == NULL || spdk_thread_is_app_thread(thread)) {
73+
continue;
74+
}
75+
const char *name = spdk_thread_get_name(thread);
76+
SPDK_NOTICELOG("bdev_rbd: discovered reactor thread=%p (id=%lu, name=%s, index=%zu)\n",
77+
thread, spdk_thread_get_id(thread), name ? name : "NULL", vec->size());
78+
vec->push_back(thread);
79+
}
80+
if (!vec->empty()) {
81+
SPDK_NOTICELOG("bdev_rbd: reactor thread pool: discovered %zu reactor(s) for round-robin\n",
82+
vec->size());
83+
}
84+
}
85+
86+
static std::atomic<std::vector<struct spdk_thread *> *> g_reactor_list;
87+
static std::atomic<uint32_t> g_reactor_next;
88+
};
89+
90+
std::atomic<std::vector<struct spdk_thread *> *> ReactorThreadPool::g_reactor_list{nullptr};
91+
std::atomic<uint32_t> ReactorThreadPool::g_reactor_next{0};
92+
2093
namespace librbd {
2194
namespace asio {
2295

@@ -173,4 +246,10 @@ void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_w
173246
delete wq;
174247
}
175248

249+
struct spdk_thread* bdev_rbd_find_reactor_thread(void)
250+
{
251+
ReactorThreadPool::ensure_discovered();
252+
return ReactorThreadPool::get_next();
253+
}
254+
176255
} // extern "C"

module/bdev/rbd/bdev_rbd_spdk_context_wq.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ struct bdev_rbd_spdk_context_wq* bdev_rbd_spdk_context_wq_create_from_ioctx(rado
4747
*/
4848
void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_wq);
4949

50+
/**
51+
* Return the next reactor thread for SpdkContextWQ (round-robin).
52+
* Lazy-initializes the reactor list on first call (lock-free, atomic CAS).
53+
* Each call returns a different reactor so RBD images are balanced across reactors.
54+
* Returns NULL if no reactor threads were discovered (error is logged).
55+
*/
56+
struct spdk_thread* bdev_rbd_find_reactor_thread(void);
57+
5058
#ifdef __cplusplus
5159
}
5260

0 commit comments

Comments
 (0)