Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 102 additions & 4 deletions pxd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1733,13 +1733,111 @@ static void _pxd_setup(struct pxd_device *pxd_dev, bool enable)
static void pxdctx_set_connected(struct pxd_context *ctx, bool enable)
{
struct list_head *cur;
spin_lock(&ctx->lock);
list_for_each(cur, &ctx->list) {
struct pxd_device *pxd_dev = container_of(cur, struct pxd_device, node);
size_t ndevs;
struct pxd_device **snap_list;
struct pxd_device *pxd_dev;
size_t i = 0;
size_t j;

_pxd_setup(pxd_dev, enable);
if (enable) {
spin_lock(&ctx->lock);
list_for_each(cur, &ctx->list) {
struct pxd_device *pxd_dev = container_of(cur, struct pxd_device, node);

_pxd_setup(pxd_dev, enable);
}
spin_unlock(&ctx->lock);
return;
}

// _pxd_setup with enable=false would call pxd_fastpath_reset_device which would
// call blk_mq_quiesce_queue as part of pxd_suspend_io. but blk_mq_quiesce_queue could
// sleep => avoid holding ctx->lock spinlock.
// to avoid holding spinlock at the time of _pxd_setup with enable=false, do the following
// 1. with ctx->lock held, get the number of entries in ctx->list
// 2. without ctx->lock held, allocate memory for the snapshot list
// 3. with ctx->lock held, copy the entries from ctx->list to the snapshot list and increment their refcount
// 4. without ctx->lock held, call _pxd_setup with enable=false for each entry in the snapshot list
// and then decrement their refcount
// 5. without ctx->lock held, free the memory allocated in step 2.
//
// incrementing the refcount in step 3 is required because there could be a parallel pxd_finish_remove which could
// remove the device from ctx->list => the snapshot list could have more entries than ctx->list
// because of the parallel pxd_finish_remove.
// but snapshot list can never have less entries than ctx->list.
// to understand the difference in behavior, let's first consider why snapshot list can never have less entries
// than ctx->list. Note that adding an entry to ctx->list is via PXD_ADD which requires an open
// fd to /dev/pxd/pxd-control. but pxd_control_open (which is called when /dev/pxd/pxd-control is opened) calls
// cancel_delayed_work_sync(&ctx->abort_work) which either cancels the pending abort work or waits for it to complete.
// => can't have a parallel PXD_ADD when there is a pending pxd_abort_work
// the same doesn't apply for pxd_finish_remove which is called as part of PXD_IOCTL_DETACH_DEVICE which also requires an open
// fd, but the pxd (pxd.cc) uses /dev/pxd/pxd-control-10 which doesn't wait for the pending abort work of /dev/pxd/pxd-control

// step 1
spin_lock(&ctx->lock);
ndevs = ctx->num_devices;
spin_unlock(&ctx->lock);

// step 2
snap_list = kcalloc(ndevs, sizeof(*snap_list), GFP_KERNEL);

if (snap_list) {
// step 3
spin_lock(&ctx->lock);
list_for_each_entry(pxd_dev, &ctx->list, node) {
if (i >= ndevs) {
pr_warn("%s: ctx->list has more entries than snap_list, ignoring extra entries, devID : %llu minor %d\n", __func__,
pxd_dev->dev_id, pxd_dev->minor);
break;
}
// increment the refcount because of the possibility of parallel pxd_finish_remove
get_device(&pxd_dev->dev);
snap_list[i++] = pxd_dev;
}
spin_unlock(&ctx->lock);

// step 4
for (j = 0; j < i; j++) {
_pxd_setup(snap_list[j], false);
put_device(&snap_list[j]->dev);
}

// step 5
kfree(snap_list);
return;
}

// unlikely scenario where kcalloc fails
// handle one-by-one, zero allocation but O(n^2)
for (;;) {
struct pxd_device *picked = NULL;

spin_lock(&ctx->lock);
list_for_each_entry(pxd_dev, &ctx->list, node) {
spin_lock(&pxd_dev->lock);
// connected = false => already processed
// connected = true => not processed yet
if (pxd_dev->connected) {
pxd_dev->connected = false;
spin_unlock(&pxd_dev->lock);
// increment refcount
get_device(&pxd_dev->dev);
picked = pxd_dev;
break;
}
spin_unlock(&pxd_dev->lock);
}
spin_unlock(&ctx->lock);

if (!picked) {
// processed all entries
break;
}

_pxd_setup(picked, false);
// decrement refcount
put_device(&picked->dev);
}
}

static struct pxd_device *dev_to_pxd_dev(struct device *dev)
Expand Down
90 changes: 89 additions & 1 deletion test/pxd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <chrono>

#include "fuse.h"
#include "pxd.h"
Expand Down Expand Up @@ -122,13 +123,14 @@ class PxdTest : public ::testing::Test
int finish_io(struct rdwr_in *, bool read_data = false);
void write_thread(const char *name);
void read_thread(const char *name);
void write_thread_timeout(const char *name);
void cleaner();
};

int PxdTest::write_pxd_timeout(int minor, int timeout_value)
{
char sysfs_path[256];
snprintf(sysfs_path, sizeof(sysfs_path), "/sys/devices/pxd!pxd/%d/timeout", minor);
snprintf(sysfs_path, sizeof(sysfs_path), "/sys/devices/pxd/%d/timeout", minor);

FILE *fp = fopen(sysfs_path, "w");
if (!fp) {
Expand Down Expand Up @@ -852,6 +854,92 @@ TEST_F(PxdTest, blkdiscard_ioctl)
}
}

void PxdTest::write_thread_timeout(const char *name)
{
auto buf = aligned_buffer(write_len);
init_pattern(buf.get(), write_len);

int fd = open(name, O_RDWR | O_DIRECT);
ASSERT_TRUE(fd >= 0);

fprintf(stderr, "write thread: waiting for attach IO to complete for 10 seconds\n");
std::this_thread::sleep_for(std::chrono::seconds(10)); // wait for attach IO to complete

auto off = test_off * 2;
ssize_t write_bytes = pwrite(fd, buf.get(), write_len, off);

ASSERT_EQ(-1, write_bytes);
close(fd);
}

TEST_F(PxdTest, AbortContextWithInFlightIO)
{
struct pxd_add_out add;
std::string name;
int minor = 0;

add.dev_id = 1;
add.size = 1024 * 1024;
add.queue_depth = 128;
add.discard_size = PXD_LBS;
// create a /dev/pxd/pxd1 device
dev_add(add, minor, name);

ASSERT_TRUE(added_ids.find(1) != added_ids.end());

const int test_timeout_secs = 30; // min valid value
// set timeout to 30s
ASSERT_EQ(0, write_pxd_timeout(minor, test_timeout_secs));

// start a thread to do a write to /dev/pxd/pxd1 at test_off
std::thread wt(&PxdTest::write_thread_timeout, this, name.c_str());

struct rdwr_in rdwr;
struct pxd_rdwr_in *wr {nullptr};
auto off = test_off * 2;
while (1) {
// wait for upto 1 second for px-fuse to notify the ctrl fd
int ret = wait_msg(1);
if (ret == -ETIMEDOUT) {
continue;
}
EXPECT_EQ(ret, 0);

ssize_t read_bytes = read(ctl_fd, &rdwr, sizeof(rdwr_in));
EXPECT_EQ(read_bytes, sizeof(rdwr_in));
wr = reinterpret_cast<pxd_rdwr_in*>(&rdwr.rdwr);
// find the test write
if (rdwr.in.opcode == PXD_WRITE && rdwr.rdwr.offset >= off && rdwr.rdwr.offset < off + write_len) {
fprintf(stderr, "found the test write: offset: %ld, len = %d\n", rdwr.rdwr.offset, rdwr.rdwr.size);
break;
} else {
fprintf(stderr, "opcode : %d, offset: %ld, len = %d\n", rdwr.in.opcode, rdwr.rdwr.offset, rdwr.rdwr.size);
// finish unrelated IO
finish_io(&rdwr);
}
}

// validate the write request
ASSERT_EQ(wr->dev_minor, minor);
ASSERT_EQ(wr->offset, off);
ASSERT_EQ(wr->size, write_len);

// close the ctrl fd to trigger pxd_abort_context
close(ctl_fd);
ctl_fd = -1;

std::this_thread::sleep_for(std::chrono::seconds(test_timeout_secs + 5));
fprintf(stderr, "Test: Wait complete.\n");

wt.join();

// re-open the control fd to remove the device
ctl_fd = open(control_device(0).c_str(), O_RDWR);
ASSERT_GT(ctl_fd, 0);
// detach the device
dev_remove(add.dev_id);
}

int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
Expand Down