Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/coro/Frame.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const Scheduler = @import("Scheduler.zig");
const Link = @import("minilib").Link;
const log = std.log.scoped(.coro);

pub const List = std.DoublyLinkedList(Link(@This(), "link", .double));
pub const List = std.DoublyLinkedList;
pub const stack_alignment = Fiber.stack_alignment;
pub const Stack = Fiber.Stack;

Expand All @@ -23,7 +23,7 @@ pub const Status = enum(u8) {
}
};

pub const WaitList = std.SinglyLinkedList(Link(@This(), "wait_link", .single));
pub const WaitList = std.SinglyLinkedList;

fiber: *Fiber,
stack: ?Fiber.Stack = null,
Expand All @@ -34,8 +34,8 @@ canceled: bool = false,
cancelable: bool = true,
status: Status = .active,
yield_state: u8 = 0,
link: List.Node = .{ .data = .{} },
wait_link: WaitList.Node = .{ .data = .{} },
link: List.Node = .{},
wait_link: WaitList.Node = .{},
completer: ?*@This() = null,

pub fn current() ?*@This() {
Expand Down
12 changes: 7 additions & 5 deletions src/coro/Scheduler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ pub fn deinit(self: *@This()) void {
var next = self.completed.first;
while (next) |node| {
next = node.next;
node.data.cast().deinit();
const frame: *Frame = @fieldParentPtr("link", node);
frame.deinit();
}
while (self.running.popFirst()) |node| {
var frame = node.data.cast();
const frame: *Frame = @fieldParentPtr("link", node);
frame.status = .completed;
self.completed.append(&frame.link);
frame.deinit();
Expand Down Expand Up @@ -83,7 +84,7 @@ pub fn tick(self: *@This(), mode: aio.CompletionMode) aio.Error!usize {
var next: ?*Frame.List.Node = first;
while (next) |node| {
next = node.next;
var frame = node.data.cast();
const frame: *Frame = @fieldParentPtr("link", node);
if (frame.detached) {
std.debug.assert(frame.completer == null);
frame.deinit();
Expand All @@ -93,7 +94,7 @@ pub fn tick(self: *@This(), mode: aio.CompletionMode) aio.Error!usize {
}
}
_ = try self.io.complete(mode, self);
return self.running.len;
return self.running.len();
}

pub const CompleteMode = Frame.CompleteMode;
Expand All @@ -104,7 +105,8 @@ pub fn run(self: *@This(), mode: CompleteMode) aio.Error!void {
// start canceling tasks starting from the most recent one
while (self.running.last) |node| {
if (self.state == .tear_down) return error.Unexpected;
node.data.cast().complete(.cancel, void);
const frame: *Frame = @fieldParentPtr("link", node);
frame.complete(.cancel, void);
}
} else {
while (self.state != .tear_down) {
Expand Down
44 changes: 26 additions & 18 deletions src/coro/sync.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ fn wakeupWaiters(list: *Frame.WaitList, status: anytype) void {
var next = list.first;
while (next) |node| {
next = node.next;
node.data.cast().wakeup(status);
const frame: *Frame = @fieldParentPtr("wait_link", node);
frame.wakeup(status);
}
}

Expand Down Expand Up @@ -465,14 +466,19 @@ test "RwLock.Cancel" {
/// Only one consumer receives each sent data, regardless of the number of consumers.
pub fn Queue(comptime T: type) type {
return struct {
const QueueList = std.DoublyLinkedList(T);
const MemoryPool = std.heap.MemoryPool(QueueList.Node);
const QueueList = std.DoublyLinkedList;
const MemoryPool = std.heap.MemoryPool(QueueNode);

pool: MemoryPool,
queue: QueueList = .{},
mutex: Mutex,
semaphore: aio.EventSource,

const QueueNode = struct {
data: T,
node: QueueList.Node = .{},
};

pub fn init(allocator: std.mem.Allocator, preheat_size: usize) !@This() {
return .{
.pool = try MemoryPool.initPreheated(allocator, preheat_size),
Expand All @@ -495,12 +501,11 @@ pub fn Queue(comptime T: type) type {
try self.mutex.lock();
defer self.mutex.unlock();

const node = try self.pool.create();
errdefer self.pool.destroy(node);

node.data = data;
const queue_node = try self.pool.create();
errdefer self.pool.destroy(queue_node);

self.queue.prepend(node);
queue_node.* = .{ .data = data };
self.queue.append(&queue_node.node);

self.semaphore.notify();
}
Expand All @@ -513,9 +518,10 @@ pub fn Queue(comptime T: type) type {
error.WouldBlock => return null,
};

if (self.queue.pop()) |node| {
const data = node.data;
self.pool.destroy(node);
if (self.queue.popFirst()) |node| {
const queue_node: *QueueNode = @fieldParentPtr("node", node);
const data = queue_node.data;
self.pool.destroy(queue_node);
return data;
}
}
Expand All @@ -528,9 +534,10 @@ pub fn Queue(comptime T: type) type {
try self.mutex.lock();
defer self.mutex.unlock();

if (self.queue.pop()) |node| {
const data = node.data;
self.pool.destroy(node);
if (self.queue.popFirst()) |node| {
const queue_node: *QueueNode = @fieldParentPtr("node", node);
const data = queue_node.data;
self.pool.destroy(queue_node);
return data;
}

Expand All @@ -543,8 +550,9 @@ pub fn Queue(comptime T: type) type {

while (true) self.semaphore.waitNonBlocking() catch break;

while (self.queue.pop()) |node| {
self.pool.destroy(node);
while (self.queue.popFirst()) |node| {
const queue_node: *QueueNode = @fieldParentPtr("node", node);
self.pool.destroy(queue_node);
}
}
};
Expand Down Expand Up @@ -603,7 +611,7 @@ test "Queue" {

// check if it has returned to its initial state
try std.testing.expectEqual(null, queue.tryRecv());
try std.testing.expectEqual(0, queue.queue.len);
try std.testing.expectEqual(0, queue.queue.len());

var threads: [2]std.Thread = undefined;

Expand All @@ -616,5 +624,5 @@ test "Queue" {

// check if it has returned to its initial state
try std.testing.expectEqual(null, queue.tryRecv());
try std.testing.expectEqual(0, queue.queue.len);
try std.testing.expectEqual(0, queue.queue.len());
}
19 changes: 10 additions & 9 deletions src/minilib/dynamic_thread_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const DefaultImpl = struct {
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
threads: []DynamicThread = &.{},
run_queue: RunQueue = .{},
run_queue: std.SinglyLinkedList = .{},
idling_threads: u32 = 0,
active_threads: u32 = 0,
timeout: u64,
Expand All @@ -24,8 +24,10 @@ const DefaultImpl = struct {
name: ?[]const u8,
stack_size: usize,

const RunQueue = std.SinglyLinkedList(Runnable);
const Runnable = struct { runFn: RunProto };
const Runnable = struct {
runFn: RunProto,
node: std.SinglyLinkedList.Node = .{},
};
const RunProto = *const fn (*@This(), *Runnable) void;

pub const Options = struct {
Expand Down Expand Up @@ -97,11 +99,10 @@ const DefaultImpl = struct {
const ThreadPool = @This();
const Closure = struct {
arguments: Args,
run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
runnable: Runnable = .{ .runFn = runFn },

fn runFn(pool: *ThreadPool, runnable: *Runnable) void {
const run_node: *RunQueue.Node = @fieldParentPtr("data", runnable);
const closure: *@This() = @alignCast(@fieldParentPtr("run_node", run_node));
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
// The thread pool's allocator is protected by the mutex.
pool.mutex.lock();
Expand Down Expand Up @@ -136,7 +137,7 @@ const DefaultImpl = struct {
// Closures are often same size, so they can be bucketed and reused
const closure = try self.arena.allocator().create(Closure);
closure.* = .{ .arguments = args };
self.run_queue.prepend(&closure.run_node);
self.run_queue.prepend(&closure.runnable.node);
}

// Notify waiting threads outside the lock to try and keep the critical section small.
Expand Down Expand Up @@ -192,8 +193,8 @@ const DefaultImpl = struct {
if (self.run_queue.popFirst()) |run_node| {
self.mutex.unlock();
defer self.mutex.lock();
const runFn = run_node.data.runFn;
runFn(self, &run_node.data);
const runnable: *Runnable = @fieldParentPtr("node", run_node);
runnable.runFn(self, runnable);
timer.reset();
} else break;
}
Expand Down