Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion include/silk/fibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class Fiber;
*/
static constexpr uint64_t FIBER_PARAMETERS_SIZE = 64;

/**
* Hard cap on CPU index (largest known socket: 384 cores).
*/
static constexpr uint16_t INVALID_PROCESSOR_NUMBER = (1 << 10);

/**
* Fiber entry point signature. Returns an integer result code.
*/
Expand Down Expand Up @@ -320,6 +325,9 @@ class FiberScheduler
uint64_t * result = nullptr;
uint64_t submitTimestamp = 0;
uint8_t category = 0;
// Processor whose io_uring ring holds this SQE; cancelIo must submit
// the cancel to the same ring to avoid a cross-ring -ENOENT failure.
uint32_t processorNumber = INVALID_PROCESSOR_NUMBER;
Comment thread
qkrorlqr marked this conversation as resolved.
};

/**
Expand Down Expand Up @@ -491,7 +499,7 @@ class FiberScheduler
StackEntry stackEntry;
TreeEntry treeEntry;
uint64_t deadlineCycles = 0;
uint32_t processorNumber = UINT32_MAX;
uint32_t processorNumber = INVALID_PROCESSOR_NUMBER;
std::atomic<uint32_t> state{};
};

Expand Down
45 changes: 35 additions & 10 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ static constexpr uint64_t CQE_TAG_CANCEL = 0;
static constexpr uint64_t CQE_TAG_TIMEOUT = 1;
static constexpr uint64_t CQE_TAG_DOORBELL = 2;

// Hard cap on CPU index (largest known socket: 384 cores).
static constexpr uint16_t INVALID_PROCESSOR_NUMBER = (1 << 10);

// clang-format off
#define FIBER_SIMPLE_COUNTERS(x) \
x(FIBER_STARTED, "FiberStarted") \
Expand Down Expand Up @@ -839,6 +836,10 @@ bool FiberScheduler::ProcessorState::enqueueIo(IoFuture * future, Setup && setup
{
::io_uring_sqe_set_data(sqe, future);

// Record which processor holds this SQE so cancelIo can submit the
// cancel to the correct ring (cross-ring cancels fail with -ENOENT).
future->processorNumber = this->number;

Comment thread
qkrorlqr marked this conversation as resolved.
if (profiler)
{
future->submitTimestamp = Tsc::getCycles();
Expand Down Expand Up @@ -1450,13 +1451,37 @@ void FiberScheduler::accept(int fd, sockaddr * addr, socklen_t * addrlen, int fl

void FiberScheduler::cancelIo(IoFuture * future) noexcept
{
enqueueIo(
nullptr,
[=](io_uring_sqe * sqe) noexcept
{
::io_uring_prep_cancel(sqe, future, 0);
::io_uring_sqe_set_data64(sqe, CQE_TAG_CANCEL);
});
// The cancel SQE must go to the SAME io_uring ring that holds the original
// SQE. If we submit the cancel to a different ring (e.g. because the fiber
// was work-stolen to another CPU between registering the poll and cancelling
// it), io_uring returns -ENOENT and the original operation is never removed,
// leaving the caller's IoFuture::wait() blocked forever.
uint32_t processorNumber = future->processorNumber;
if (processorNumber == INVALID_PROCESSOR_NUMBER)
{
processorNumber = getCurrentProcessor();
}

auto * target = &scheduler->processorState[processorNumber];

Comment thread
qkrorlqr marked this conversation as resolved.
auto setup = [=](io_uring_sqe * sqe) noexcept
{
::io_uring_prep_cancel(sqe, future, 0);
::io_uring_sqe_set_data64(sqe, CQE_TAG_CANCEL);
};

// Retry if the SQ ring is temporarily full.
while (!target->enqueueIo(nullptr, setup))
{
Perf::getSimpleCounter(simpleCounters[SQ_RING_OVERFLOW], target->number).increment();
yield();
}

// If we enqueued to a remote processor's ring, force-submit.
if (processorNumber != getCurrentProcessor() || getCurrentFiber()->isProxyFiber)
{
target->submitIo(true);
}
}

void FiberScheduler::sleep(uint64_t nanoseconds, SleepFuture * future) noexcept
Expand Down
75 changes: 75 additions & 0 deletions src/fibers/tests/fiber-test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,81 @@ TEST(Fiber, cancelSleepAfterInsert)
ASSERT_EQ(r, ECANCELED);
}

// Cross-fiber cancel: one fiber submits a poll and blocks; a second fiber
// (which the scheduler may run on a different CPU via work-stealing) cancels
// it. Without the logic in cancelIo() that routes the cancel SQE to the same
// ring as the original POLL_ADD, io_uring returns -ENOENT on the cancel and
// the poller's wait() hangs forever.
//
// We repeat N times to increase the probability that work-stealing migrates
// the canceller to a different CPU than the poller on at least some iterations.
TEST(Fiber, cancelPollFromAnotherFiber)
{
static constexpr int N = 200;

struct Ctx
{
int readFd;
FiberFuture pollRegistered; // poller -> canceller: poll is in the ring
FiberScheduler::IoFuture pollFuture;
};

struct Params
{
Ctx * ctx;

static int pollerMain(Params * p) noexcept
{
auto * ctx = p->ctx;

// Register an async poll (does not block yet).
FiberScheduler::poll(ctx->readFd, POLLIN, nullptr, &ctx->pollFuture);
// Signal the canceller that the SQE is now committed to a ring.
ctx->pollRegistered.set(0);
// Block until the cancel (or a spurious write) resolves us.
return ctx->pollFuture.wait();
}

static int cancellerMain(Params * p) noexcept
{
auto * ctx = p->ctx;

// Don't cancel until the poll SQE is definitely in a ring;
// otherwise the cancel might race and fail with -EALREADY.
ctx->pollRegistered.wait();
ctx->pollFuture.cancel();
return 0;
}
};

int fds[2];
ASSERT_EQ(::pipe(fds), 0);

for (int i = 0; i < N; ++i)
{
Ctx ctx;
ctx.readFd = fds[0];

FiberFuture f1, f2;
int r = FiberScheduler::run(Params::pollerMain, {&ctx}, &f1);
ASSERT_FALSE(r);
r = FiberScheduler::run(Params::cancellerMain, {&ctx}, &f2);
ASSERT_FALSE(r);

r = FiberFuture::waitWithTimeout(&f1, 1'000'000'000);
if (r)
{
ASSERT_EQ(r, ECANCELED);
}
r = f2.wait();
ASSERT_FALSE(r);
}

::close(fds[0]);
::close(fds[1]);
}


// SleepFuture reuse: reset() between calls allows the same future to be used
// for successive sleeps.
TEST(Fiber, sleepReuse)
Expand Down
Loading