77#include < rbd/librbd.h>
88#include < rados/librados.hpp>
99#include < atomic>
10+ #include < mutex>
11+ #include < vector>
1012
1113#include " bdev_rbd_spdk_context_wq.h"
1214
@@ -15,8 +17,80 @@ extern "C" {
1517#include " spdk/thread.h"
1618#include " spdk/log.h"
1719#include " spdk/env.h"
20+ #include " spdk_internal/event.h"
1821}
1922
23+ /* *
24+ * Encapsulates the set of reactor threads for round-robin assignment.
25+ * init() discovers reactor threads; it runs once lazily on first
26+ * bdev_rbd_find_reactor_thread() (std::call_once).
27+ */
28+ class ReactorThreadPool {
29+ public:
30+ /* * Ensures reactor list is discovered once (lazy init inside bdev_rbd_find_reactor_thread). */
31+ static void ensure_discovered () {
32+ std::call_once (g_discover_once, init);
33+ }
34+
35+ static void init () {
36+ if (!g_reactor_threads.empty ()) {
37+ SPDK_ERRLOG (" bdev_rbd: reactor thread pool init skipped, list already has %zu thread(s)\n " ,
38+ g_reactor_threads.size ());
39+ return ;
40+ }
41+ uint32_t lcore;
42+ SPDK_ENV_FOREACH_CORE (lcore) {
43+ struct spdk_reactor *reactor = spdk_reactor_get (lcore);
44+ if (reactor == NULL || !reactor->flags .is_valid ) {
45+ continue ;
46+ }
47+ if (reactor->thread_count == 0 ) {
48+ continue ;
49+ }
50+ struct spdk_lw_thread *lw_thread = TAILQ_FIRST (&reactor->threads );
51+ if (lw_thread == NULL ) {
52+ continue ;
53+ }
54+ struct spdk_thread *thread = spdk_thread_get_from_ctx (lw_thread);
55+ if (thread == NULL || spdk_thread_is_app_thread (thread)) {
56+ continue ;
57+ }
58+ const char *name = spdk_thread_get_name (thread);
59+ SPDK_NOTICELOG (" bdev_rbd: discovered reactor thread=%p (id=%lu, name=%s, index=%zu)\n " ,
60+ thread, spdk_thread_get_id (thread), name ? name : " NULL" , g_reactor_threads.size ());
61+ g_reactor_threads.push_back (thread);
62+ }
63+
64+ if (!g_reactor_threads.empty ()) {
65+ SPDK_NOTICELOG (" bdev_rbd: reactor thread pool: discovered %zu reactor(s) for round-robin\n " ,
66+ g_reactor_threads.size ());
67+ }
68+ }
69+
70+ static struct spdk_thread *get_next () {
71+ if (g_reactor_threads.empty ()) {
72+ SPDK_ERRLOG (" bdev_rbd: reactor thread pool is empty, no reactor threads available for SpdkContextWQ\n " );
73+ return NULL ;
74+ }
75+ size_t n = g_reactor_threads.size ();
76+ size_t idx = g_reactor_thread_next.fetch_add (1 , std::memory_order_relaxed) % n;
77+ struct spdk_thread *t = g_reactor_threads[idx];
78+ const char *name = spdk_thread_get_name (t);
79+ SPDK_NOTICELOG (" bdev_rbd: next reactor thread=%p (id=%lu, name=%s, index=%zu/%zu)\n " ,
80+ t, spdk_thread_get_id (t), name ? name : " NULL" , idx, n);
81+ return t;
82+ }
83+
84+ private:
85+ static std::vector<struct spdk_thread *> g_reactor_threads;
86+ static std::atomic<uint32_t > g_reactor_thread_next;
87+ static std::once_flag g_discover_once;
88+ };
89+
90+ std::vector<struct spdk_thread *> ReactorThreadPool::g_reactor_threads;
91+ std::atomic<uint32_t > ReactorThreadPool::g_reactor_thread_next{0 };
92+ std::once_flag ReactorThreadPool::g_discover_once;
93+
2094namespace librbd {
2195namespace asio {
2296
@@ -173,4 +247,10 @@ void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_w
173247 delete wq;
174248}
175249
250+ struct spdk_thread * bdev_rbd_find_reactor_thread (void )
251+ {
252+ ReactorThreadPool::ensure_discovered ();
253+ return ReactorThreadPool::get_next ();
254+ }
255+
176256} // extern "C"
0 commit comments