Skip to content

Commit 8ad5a1a

Browse files
author
Alexander Indenbaum
committed
bdev/rbd: integrate SpdkContextWQ for reactor thread execution
Add SpdkContextWQ implementation to schedule RBD I/O operations on SPDK reactor threads instead of ASIO thread pool. This includes: - New SpdkContextWQ class implementing ContextWQ interface - Reactor thread selection via bdev_rbd_find_reactor_thread() - Integration with rbd_open_with_context_wq() API Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
1 parent f2333b8 commit 8ad5a1a

File tree

5 files changed

+346
-2
lines changed

5 files changed

+346
-2
lines changed

mk/spdk.modules.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ endif
7474

7575
ifeq ($(CONFIG_RBD),y)
7676
BLOCKDEV_MODULES_LIST += bdev_rbd
77-
BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd
77+
BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd -lstdc++
7878
endif
7979

8080
ifeq ($(CONFIG_DAOS),y)

module/bdev/rbd/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SO_VER := 8
1010
SO_MINOR := 0
1111

1212
C_SRCS = bdev_rbd.c bdev_rbd_rpc.c
13+
CXX_SRCS = bdev_rbd_spdk_context_wq.cpp
1314
LIBNAME = bdev_rbd
1415

1516
SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map

module/bdev/rbd/bdev_rbd.c

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "spdk/stdinc.h"
77

88
#include "bdev_rbd.h"
9+
#include "bdev_rbd_spdk_context_wq.h"
910

1011
#include <rbd/librbd.h>
1112
#include <rados/librados.h>
@@ -20,6 +21,10 @@
2021
#include "spdk/bdev_module.h"
2122
#include "spdk/log.h"
2223

24+
/* to access reactor framework */
25+
#include "spdk_internal/event.h"
26+
#include "spdk_internal/thread.h"
27+
2328
SPDK_LOG_REGISTER_COMPONENT(reservation)
2429

2530
static int bdev_rbd_count = 0;
@@ -75,6 +80,9 @@ struct bdev_rbd {
7580
int (*reservation_fn_cbk)(void *ns);
7681
char cluster_fsid[37];
7782

83+
/* SPDK ContextWQ for this bdev */
84+
struct bdev_rbd_spdk_context_wq *spdk_context_wq;
85+
7886
};
7987

8088
struct bdev_rbd_io_channel {
@@ -224,6 +232,16 @@ bdev_rbd_free(struct bdev_rbd *rbd)
224232
rbd_close(rbd->image);
225233
}
226234

235+
/* Clean up SPDK ContextWQ after RBD image is closed.
236+
* This ensures no new I/O completions can occur after ContextWQ is destroyed.
237+
* The drain() function in the destructor will wait for any pending messages
238+
* to complete before the ContextWQ is actually destroyed.
239+
*/
240+
if (rbd->spdk_context_wq != NULL) {
241+
bdev_rbd_spdk_context_wq_destroy(rbd->spdk_context_wq);
242+
rbd->spdk_context_wq = NULL;
243+
}
244+
227245
free(rbd->disk.name);
228246
free(rbd->rbd_name);
229247
free(rbd->user_id);
@@ -459,6 +477,9 @@ bdev_rbd_get_pool_ctx(rados_t *cluster_p, const char *name, const char *namespac
459477
return -1;
460478
}
461479

480+
/* Forward declaration for reactor thread selection function */
481+
static struct spdk_thread *bdev_rbd_find_reactor_thread(void);
482+
462483
static void *
463484
bdev_rbd_init_context(void *arg)
464485
{
@@ -492,7 +513,11 @@ bdev_rbd_init_context(void *arg)
492513
SPDK_DEBUGLOG(bdev_rbd, "Will open RBD image %s/%s as read-only\n", rbd->pool_name, rbd->rbd_name);
493514
rc = rbd_open_read_only(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
494515
} else {
495-
rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
516+
/* Find reactor thread, create SpdkContextWQ if available, then open with context_wq (NULL uses default AsioContextWQ) */
517+
struct spdk_thread *reactor_thread = bdev_rbd_find_reactor_thread();
518+
rbd->spdk_context_wq = bdev_rbd_spdk_context_wq_create_from_ioctx(
519+
*io_ctx, reactor_thread);
520+
rc = rbd_open_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq);
496521
}
497522
if (rc < 0) {
498523
SPDK_ERRLOG("Failed to open specified rbd device\n");
@@ -729,6 +754,46 @@ static struct spdk_bdev_module rbd_if = {
729754
};
730755
SPDK_BDEV_MODULE_REGISTER(rbd, &rbd_if)
731756

757+
/**
758+
* Find a reactor thread for SpdkContextWQ.
759+
* Iterates through reactors to find a reactor thread (not app_thread).
760+
* Returns NULL if no suitable thread is found.
761+
*/
762+
static struct spdk_thread *
763+
bdev_rbd_find_reactor_thread(void)
764+
{
765+
uint32_t lcore;
766+
767+
SPDK_ENV_FOREACH_CORE(lcore) {
768+
struct spdk_reactor *reactor = spdk_reactor_get(lcore);
769+
if (reactor == NULL || !reactor->flags.is_valid) {
770+
continue;
771+
}
772+
773+
if (reactor->thread_count == 0) {
774+
continue;
775+
}
776+
777+
struct spdk_lw_thread *lw_thread = TAILQ_FIRST(&reactor->threads);
778+
if (lw_thread == NULL) {
779+
continue;
780+
}
781+
782+
struct spdk_thread *thread = spdk_thread_get_from_ctx(lw_thread);
783+
if (thread == NULL || spdk_thread_is_app_thread(thread)) {
784+
continue;
785+
}
786+
787+
uint64_t thread_id = spdk_thread_get_id(thread);
788+
const char *thread_name = spdk_thread_get_name(thread);
789+
SPDK_NOTICELOG("bdev_rbd_find_reactor_thread: reactor thread lcore=%u: thread=%p, id=%lu, name=%s\n",
790+
lcore, thread, thread_id, thread_name ? thread_name : "NULL");
791+
return thread;
792+
}
793+
794+
return NULL;
795+
}
796+
732797
static int bdev_rbd_reset_timer(void *arg);
733798

734799
static void
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/* SPDX-License-Identifier: BSD-3-Clause
2+
* Copyright (C) 2025,2026 IBM, Inc.
3+
* All rights reserved.
4+
*/
5+
6+
#include <rbd/asio/ContextWQ.hpp>
7+
#include <rbd/librbd.h>
8+
#include <rados/librados.hpp>
9+
#include <atomic>
10+
11+
#include "bdev_rbd_spdk_context_wq.h"
12+
13+
extern "C" {
14+
#include "spdk/stdinc.h"
15+
#include "spdk/thread.h"
16+
#include "spdk/log.h"
17+
#include "spdk/env.h"
18+
}
19+
20+
namespace librbd {
21+
namespace asio {
22+
23+
SpdkContextWQ::SpdkContextWQ(void* cct, struct spdk_thread* reactor_thread)
24+
: ContextWQ(cct), m_reactor_thread(reactor_thread) {
25+
assert(reactor_thread != nullptr);
26+
}
27+
28+
SpdkContextWQ::~SpdkContextWQ() {
29+
// Set shutdown flag to reject new operations
30+
m_shutdown.store(true, std::memory_order_release);
31+
32+
// Wait for all pending messages to complete
33+
drain();
34+
35+
// Verify all messages are processed
36+
uint64_t queued = m_queued_ops.load(std::memory_order_acquire);
37+
if (queued > 0) {
38+
SPDK_ERRLOG("SpdkContextWQ::~SpdkContextWQ: Warning: %lu operations still pending during destruction\n", queued);
39+
}
40+
}
41+
42+
void SpdkContextWQ::queue(Context *ctx, int r) {
43+
// Check if shutdown is in progress
44+
if (m_shutdown.load(std::memory_order_acquire)) {
45+
// ContextWQ is shutting down, complete with error
46+
SPDK_ERRLOG("SpdkContextWQ::queue: ContextWQ is shutting down, rejecting new operation\n");
47+
rbd_context_complete(ctx, -ESHUTDOWN);
48+
return;
49+
}
50+
51+
// Increment queued operations counter
52+
m_queued_ops.fetch_add(1, std::memory_order_acq_rel);
53+
54+
// Allocate message structure to pass context and return value
55+
auto msg = new SpdkContextMsg{ctx, r, this};
56+
57+
// Schedule work on the SPDK reactor thread
58+
int rc = spdk_thread_send_msg(m_reactor_thread, spdk_msg_handler, msg);
59+
if (rc != 0) {
60+
// If message send failed, we need to clean up and complete with error
61+
m_queued_ops.fetch_sub(1, std::memory_order_acq_rel);
62+
delete msg;
63+
// Complete context with error using public API
64+
SPDK_ERRLOG("SpdkContextWQ::queue: calling rbd_context_complete(ctx=%p, r=%d) on error path\n", ctx, rc);
65+
rbd_context_complete(ctx, rc);
66+
}
67+
}
68+
69+
void SpdkContextWQ::spdk_msg_handler(void *arg) {
70+
auto msg = static_cast<SpdkContextMsg*>(arg);
71+
72+
if (msg == nullptr) {
73+
SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: FATAL: msg is nullptr\n");
74+
return;
75+
}
76+
77+
if (msg->wq == nullptr) {
78+
SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: FATAL: msg->wq is nullptr\n");
79+
delete msg;
80+
return;
81+
}
82+
83+
if (msg->ctx == nullptr) {
84+
SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: FATAL: msg->ctx is nullptr\n");
85+
delete msg;
86+
return;
87+
}
88+
89+
// Execute the context callback on the reactor thread
90+
rbd_context_complete(msg->ctx, msg->r);
91+
92+
uint64_t queued_ops_before = msg->wq->m_queued_ops.load(std::memory_order_acquire);
93+
if (queued_ops_before == 0) {
94+
SPDK_ERRLOG("SpdkContextWQ::spdk_msg_handler: WARNING: m_queued_ops is 0, expected > 0\n");
95+
}
96+
97+
// Update queued ops counter
98+
msg->wq->m_queued_ops.fetch_sub(1, std::memory_order_acq_rel);
99+
100+
// Free the message structure
101+
delete msg;
102+
}
103+
104+
void SpdkContextWQ::drain() {
105+
// Wait for all pending messages to be processed.
106+
// Note: This relies on the SPDK reactor thread to be actively polling.
107+
// TODO: conf parameter, non busy wait implementation
108+
const int max_iterations = 100000; // 10 seconds at 100us per iteration
109+
int iterations = 0;
110+
111+
// Wait for all queued operations to complete
112+
while (m_queued_ops.load(std::memory_order_acquire) > 0 &&
113+
iterations < max_iterations) {
114+
// Yield to allow SPDK reactor thread to process messages
115+
spdk_delay_us(100);
116+
++iterations;
117+
}
118+
119+
uint64_t queued = m_queued_ops.load(std::memory_order_acquire);
120+
if (queued > 0) {
121+
SPDK_ERRLOG("SpdkContextWQ::drain: Incomplete drain - queued_ops=%lu after %d iterations\n",
122+
queued, iterations);
123+
}
124+
}
125+
126+
} // namespace asio
127+
} // namespace librbd
128+
129+
// C API implementation
130+
extern "C" {
131+
132+
struct bdev_rbd_spdk_context_wq* bdev_rbd_spdk_context_wq_create_from_ioctx(rados_ioctx_t io_ctx, struct spdk_thread* reactor_thread)
133+
{
134+
if (io_ctx == NULL || reactor_thread == NULL) {
135+
return NULL;
136+
}
137+
138+
// Convert rados_ioctx_t to librados::IoCtx to get CephContext
139+
librados::IoCtx ioctx;
140+
librados::IoCtx::from_rados_ioctx_t(io_ctx, ioctx);
141+
void* cct_ptr = ioctx.cct();
142+
143+
if (cct_ptr == NULL) {
144+
SPDK_ERRLOG("Failed to get CephContext from rados_ioctx_t\n");
145+
return NULL;
146+
}
147+
148+
// Create SpdkContextWQ
149+
uint64_t thread_id = spdk_thread_get_id(reactor_thread);
150+
const char *thread_name = spdk_thread_get_name(reactor_thread);
151+
try {
152+
auto wq = new librbd::asio::SpdkContextWQ(cct_ptr, reactor_thread);
153+
// Cast to opaque struct pointer for type safety
154+
struct bdev_rbd_spdk_context_wq* result = reinterpret_cast<struct bdev_rbd_spdk_context_wq*>(wq);
155+
SPDK_NOTICELOG("bdev_rbd_spdk_context_wq_create_from_ioctx: Successfully created SpdkContextWQ=%p with reactor thread=%p (id=%lu, name=%s)\n",
156+
result, reactor_thread, thread_id, thread_name ? thread_name : "NULL");
157+
return result;
158+
} catch (...) {
159+
SPDK_ERRLOG("bdev_rbd_spdk_context_wq_create_from_ioctx: Failed to create SpdkContextWQ with reactor thread=%p (id=%lu, name=%s)\n",
160+
reactor_thread, thread_id, thread_name ? thread_name : "NULL");
161+
return NULL;
162+
}
163+
}
164+
165+
void bdev_rbd_spdk_context_wq_destroy(struct bdev_rbd_spdk_context_wq* context_wq)
166+
{
167+
if (context_wq == NULL) {
168+
return;
169+
}
170+
171+
// Cast back to SpdkContextWQ and delete
172+
auto wq = reinterpret_cast<librbd::asio::SpdkContextWQ*>(context_wq);
173+
delete wq;
174+
}
175+
176+
} // extern "C"

0 commit comments

Comments
 (0)