diff --git a/src/coro/Frame.zig b/src/coro/Frame.zig index a9512b7..96cfa04 100644 --- a/src/coro/Frame.zig +++ b/src/coro/Frame.zig @@ -4,7 +4,7 @@ const Scheduler = @import("Scheduler.zig"); const Link = @import("minilib").Link; const log = std.log.scoped(.coro); -pub const List = std.DoublyLinkedList(Link(@This(), "link", .double)); +pub const List = std.DoublyLinkedList; pub const stack_alignment = Fiber.stack_alignment; pub const Stack = Fiber.Stack; @@ -23,7 +23,7 @@ pub const Status = enum(u8) { } }; -pub const WaitList = std.SinglyLinkedList(Link(@This(), "wait_link", .single)); +pub const WaitList = std.SinglyLinkedList; fiber: *Fiber, stack: ?Fiber.Stack = null, @@ -34,8 +34,8 @@ canceled: bool = false, cancelable: bool = true, status: Status = .active, yield_state: u8 = 0, -link: List.Node = .{ .data = .{} }, -wait_link: WaitList.Node = .{ .data = .{} }, +link: List.Node = .{}, +wait_link: WaitList.Node = .{}, completer: ?*@This() = null, pub fn current() ?*@This() { diff --git a/src/coro/Scheduler.zig b/src/coro/Scheduler.zig index 6528aa6..3d9d7f6 100644 --- a/src/coro/Scheduler.zig +++ b/src/coro/Scheduler.zig @@ -27,10 +27,11 @@ pub fn deinit(self: *@This()) void { var next = self.completed.first; while (next) |node| { next = node.next; - node.data.cast().deinit(); + const frame: *Frame = @fieldParentPtr("link", node); + frame.deinit(); } while (self.running.popFirst()) |node| { - var frame = node.data.cast(); + const frame: *Frame = @fieldParentPtr("link", node); frame.status = .completed; self.completed.append(&frame.link); frame.deinit(); @@ -83,7 +84,7 @@ pub fn tick(self: *@This(), mode: aio.CompletionMode) aio.Error!usize { var next: ?*Frame.List.Node = first; while (next) |node| { next = node.next; - var frame = node.data.cast(); + const frame: *Frame = @fieldParentPtr("link", node); if (frame.detached) { std.debug.assert(frame.completer == null); frame.deinit(); @@ -93,7 +94,7 @@ pub fn tick(self: *@This(), mode: aio.CompletionMode) aio.Error!usize { } } _ = try self.io.complete(mode, self); - return self.running.len; + return self.running.len(); } pub const CompleteMode = Frame.CompleteMode; @@ -104,7 +105,8 @@ pub fn run(self: *@This(), mode: CompleteMode) aio.Error!void { // start canceling tasks starting from the most recent one while (self.running.last) |node| { if (self.state == .tear_down) return error.Unexpected; - node.data.cast().complete(.cancel, void); + const frame: *Frame = @fieldParentPtr("link", node); + frame.complete(.cancel, void); } } else { while (self.state != .tear_down) { diff --git a/src/coro/sync.zig b/src/coro/sync.zig index 708c9f1..530d3d1 100644 --- a/src/coro/sync.zig +++ b/src/coro/sync.zig @@ -8,7 +8,8 @@ fn wakeupWaiters(list: *Frame.WaitList, status: anytype) void { var next = list.first; while (next) |node| { next = node.next; - node.data.cast().wakeup(status); + const frame: *Frame = @fieldParentPtr("wait_link", node); + frame.wakeup(status); } } @@ -465,14 +466,19 @@ test "RwLock.Cancel" { /// Only one consumer receives each sent data, regardless of the number of consumers. pub fn Queue(comptime T: type) type { return struct { - const QueueList = std.DoublyLinkedList(T); - const MemoryPool = std.heap.MemoryPool(QueueList.Node); + const QueueList = std.DoublyLinkedList; + const MemoryPool = std.heap.MemoryPool(QueueNode); pool: MemoryPool, queue: QueueList = .{}, mutex: Mutex, semaphore: aio.EventSource, + const QueueNode = struct { + data: T, + node: QueueList.Node = .{}, + }; + pub fn init(allocator: std.mem.Allocator, preheat_size: usize) !@This() { return .{ .pool = try MemoryPool.initPreheated(allocator, preheat_size), @@ -495,12 +501,11 @@ pub fn Queue(comptime T: type) type { try self.mutex.lock(); defer self.mutex.unlock(); - const node = try self.pool.create(); - errdefer self.pool.destroy(node); - - node.data = data; + const queue_node = try self.pool.create(); + errdefer self.pool.destroy(queue_node); - self.queue.prepend(node); + queue_node.* = .{ .data = data }; + self.queue.append(&queue_node.node); self.semaphore.notify(); } @@ -513,9 +518,10 @@ pub fn Queue(comptime T: type) type { error.WouldBlock => return null, }; - if (self.queue.pop()) |node| { - const data = node.data; - self.pool.destroy(node); + if (self.queue.popFirst()) |node| { + const queue_node: *QueueNode = @fieldParentPtr("node", node); + const data = queue_node.data; + self.pool.destroy(queue_node); return data; } } @@ -528,9 +534,10 @@ pub fn Queue(comptime T: type) type { try self.mutex.lock(); defer self.mutex.unlock(); - if (self.queue.pop()) |node| { - const data = node.data; - self.pool.destroy(node); + if (self.queue.popFirst()) |node| { + const queue_node: *QueueNode = @fieldParentPtr("node", node); + const data = queue_node.data; + self.pool.destroy(queue_node); return data; } @@ -543,8 +550,9 @@ pub fn Queue(comptime T: type) type { while (true) self.semaphore.waitNonBlocking() catch break; - while (self.queue.pop()) |node| { - self.pool.destroy(node); + while (self.queue.popFirst()) |node| { + const queue_node: *QueueNode = @fieldParentPtr("node", node); + self.pool.destroy(queue_node); } } }; @@ -603,7 +611,7 @@ test "Queue" { // check if it has returned to its initial state try std.testing.expectEqual(null, queue.tryRecv()); - try std.testing.expectEqual(0, queue.queue.len); + try std.testing.expectEqual(0, queue.queue.len()); var threads: [2]std.Thread = undefined; @@ -616,5 +624,5 @@ test "Queue" { // check if it has returned to its initial state try std.testing.expectEqual(null, queue.tryRecv()); - try std.testing.expectEqual(0, queue.queue.len); + try std.testing.expectEqual(0, queue.queue.len()); } diff --git a/src/minilib/dynamic_thread_pool.zig b/src/minilib/dynamic_thread_pool.zig index 9a8e2f7..dceb807 100644 --- a/src/minilib/dynamic_thread_pool.zig +++ b/src/minilib/dynamic_thread_pool.zig @@ -15,7 +15,7 @@ const DefaultImpl = struct { mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, threads: []DynamicThread = &.{}, - run_queue: RunQueue = .{}, + run_queue: std.SinglyLinkedList = .{}, idling_threads: u32 = 0, active_threads: u32 = 0, timeout: u64, @@ -24,8 +24,10 @@ const DefaultImpl = struct { name: ?[]const u8, stack_size: usize, - const RunQueue = std.SinglyLinkedList(Runnable); - const Runnable = struct { runFn: RunProto }; + const Runnable = struct { + runFn: RunProto, + node: std.SinglyLinkedList.Node = .{}, + }; const RunProto = *const fn (*@This(), *Runnable) void; pub const Options = struct { @@ -97,11 +99,10 @@ const DefaultImpl = struct { const ThreadPool = @This(); const Closure = struct { arguments: Args, - run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } }, + runnable: Runnable = .{ .runFn = runFn }, fn runFn(pool: *ThreadPool, runnable: *Runnable) void { - const run_node: *RunQueue.Node = @fieldParentPtr("data", runnable); - const closure: *@This() = @alignCast(@fieldParentPtr("run_node", run_node)); + const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable)); @call(.auto, func, closure.arguments); // The thread pool's allocator is protected by the mutex. pool.mutex.lock(); @@ -136,7 +137,7 @@ const DefaultImpl = struct { // Closures are often same size, so they can be bucketed and reused const closure = try self.arena.allocator().create(Closure); closure.* = .{ .arguments = args }; - self.run_queue.prepend(&closure.run_node); + self.run_queue.prepend(&closure.runnable.node); } // Notify waiting threads outside the lock to try and keep the critical section small. @@ -192,8 +193,8 @@ const DefaultImpl = struct { if (self.run_queue.popFirst()) |run_node| { self.mutex.unlock(); defer self.mutex.lock(); - const runFn = run_node.data.runFn; - runFn(self, &run_node.data); + const runnable: *Runnable = @fieldParentPtr("node", run_node); + runnable.runFn(self, runnable); timer.reset(); } else break; }