Skip to content

Commit 8179587

Browse files
committed
implement Queue
1 parent 9a0f1d6 commit 8179587

File tree

2 files changed

+212
-52
lines changed

2 files changed

+212
-52
lines changed

src/coro/sync.zig

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,3 +458,163 @@ test "RwLock.Cancel" {
458458

459459
try std.testing.expectEqual(value, check_value);
460460
}
461+
462+
/// A multi-producer, multi-consumer queue for sending data between tasks, schedulers, and threads.
463+
/// It supports various configurations: single-producer & multi-consumer, multi-producer & single-consumer,
464+
/// and single-producer & single-consumer.
465+
/// Only one consumer receives each sent data, regardless of the number of consumers.
466+
pub fn Queue(comptime T: type) type {
467+
return struct {
468+
const QueueList = std.DoublyLinkedList(T);
469+
const MemoryPool = std.heap.MemoryPool(QueueList.Node);
470+
471+
pool: MemoryPool,
472+
queue: QueueList = .{},
473+
mutex: Mutex,
474+
semaphore: aio.EventSource,
475+
476+
pub fn init(allocator: std.mem.Allocator, preheat_size: usize) !@This() {
477+
return .{
478+
.pool = try MemoryPool.initPreheated(allocator, preheat_size),
479+
.mutex = try Mutex.init(),
480+
.semaphore = try aio.EventSource.init(),
481+
};
482+
}
483+
484+
pub fn deinit(self: *@This()) void {
485+
self.clear() catch |err| switch (err) {
486+
error.Canceled => {},
487+
else => unreachable,
488+
};
489+
self.pool.deinit();
490+
self.mutex.deinit();
491+
self.semaphore.deinit();
492+
}
493+
494+
pub fn send(self: *@This(), data: T) !void {
495+
try self.mutex.lock();
496+
defer self.mutex.unlock();
497+
498+
const node = try self.pool.create();
499+
errdefer self.pool.destroy(node);
500+
501+
node.data = data;
502+
503+
self.queue.prepend(node);
504+
505+
self.semaphore.notify();
506+
}
507+
508+
pub fn tryRecv(self: *@This()) ?T {
509+
if (self.mutex.tryLock()) {
510+
defer self.mutex.unlock();
511+
512+
self.semaphore.waitNonBlocking() catch |err| switch (err) {
513+
error.WouldBlock => return null,
514+
};
515+
516+
if (self.queue.pop()) |node| {
517+
const data = node.data;
518+
self.pool.destroy(node);
519+
return data;
520+
}
521+
}
522+
return null;
523+
}
524+
525+
pub fn recv(self: *@This()) !T {
526+
try coro.io.single(.wait_event_source, .{ .source = &self.semaphore });
527+
528+
try self.mutex.lock();
529+
defer self.mutex.unlock();
530+
531+
if (self.queue.pop()) |node| {
532+
const data = node.data;
533+
self.pool.destroy(node);
534+
return data;
535+
}
536+
537+
unreachable;
538+
}
539+
540+
pub fn clear(self: *@This()) !void {
541+
try self.mutex.lock();
542+
defer self.mutex.unlock();
543+
544+
while (true) self.semaphore.waitNonBlocking() catch break;
545+
546+
while (self.queue.pop()) |node| {
547+
self.pool.destroy(node);
548+
}
549+
}
550+
};
551+
}
552+
553+
test "Queue" {
554+
if (builtin.single_threaded) {
555+
return error.SkipZigTest;
556+
}
557+
558+
const Test = struct {
559+
fn provider_task(queue: *Queue(u32), data: u32) !void {
560+
try queue.send(data);
561+
}
562+
563+
fn provider(queue: *Queue(u32)) !void {
564+
var scheduler = try coro.Scheduler.init(std.testing.allocator, .{});
565+
defer scheduler.deinit();
566+
567+
for (0..128) |i| {
568+
_ = try scheduler.spawn(provider_task, .{ queue, @as(u32, @intCast(i)) }, .{ .detached = true });
569+
}
570+
571+
try scheduler.run(.wait);
572+
}
573+
574+
fn consumer_task(queue: *Queue(u32)) !void {
575+
_ = try queue.recv();
576+
}
577+
578+
fn consumer(queue: *Queue(u32)) !void {
579+
var scheduler = try coro.Scheduler.init(std.testing.allocator, .{});
580+
defer scheduler.deinit();
581+
582+
for (0..128) |_| {
583+
_ = try scheduler.spawn(consumer_task, .{queue}, .{ .detached = true });
584+
}
585+
586+
try scheduler.run(.wait);
587+
}
588+
};
589+
590+
var queue = try Queue(u32).init(std.testing.allocator, 0);
591+
defer queue.deinit();
592+
593+
// test queue order without threads/schedulers
594+
try queue.send(780);
595+
try queue.send(632);
596+
try queue.send(1230);
597+
try queue.send(6);
598+
599+
try std.testing.expectEqual(780, try queue.recv());
600+
try std.testing.expectEqual(632, try queue.recv());
601+
try std.testing.expectEqual(1230, try queue.recv());
602+
try std.testing.expectEqual(6, try queue.recv());
603+
604+
// check if it has returned to its initial state
605+
try std.testing.expectEqual(null, queue.tryRecv());
606+
try std.testing.expectEqual(0, queue.queue.len);
607+
608+
var threads: [2]std.Thread = undefined;
609+
610+
threads[0] = try std.Thread.spawn(.{}, Test.consumer, .{&queue});
611+
threads[1] = try std.Thread.spawn(.{}, Test.provider, .{&queue});
612+
613+
for (threads) |thread| {
614+
thread.join();
615+
}
616+
617+
// check if it has returned to its initial state
618+
try std.testing.expectEqual(null, queue.tryRecv());
619+
try std.testing.expectEqual(0, queue.queue.len);
620+
}

src/coro/zefi.zig

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -261,27 +261,27 @@ const Intel_SysV = struct {
261261

262262
const assembly =
263263
std.fmt.comptimePrint(
264-
\\.global {[symbol]s}
265-
\\{[symbol]s}:
266-
\\ pushq %rbx
267-
\\ pushq %rbp
268-
\\ pushq %r12
269-
\\ pushq %r13
270-
\\ pushq %r14
271-
\\ pushq %r15
272-
\\
273-
\\ movq %rsp, (%rdi)
274-
\\ movq (%rsi), %rsp
275-
\\
276-
\\ popq %r15
277-
\\ popq %r14
278-
\\ popq %r13
279-
\\ popq %r12
280-
\\ popq %rbp
281-
\\ popq %rbx
282-
\\
283-
\\ retq
284-
, .{ .symbol = symbol });
264+
\\.global {[symbol]s}
265+
\\{[symbol]s}:
266+
\\ pushq %rbx
267+
\\ pushq %rbp
268+
\\ pushq %r12
269+
\\ pushq %r13
270+
\\ pushq %r14
271+
\\ pushq %r15
272+
\\
273+
\\ movq %rsp, (%rdi)
274+
\\ movq (%rsi), %rsp
275+
\\
276+
\\ popq %r15
277+
\\ popq %r14
278+
\\ popq %r13
279+
\\ popq %r12
280+
\\ popq %rbp
281+
\\ popq %rbx
282+
\\
283+
\\ retq
284+
, .{ .symbol = symbol });
285285
};
286286

287287
const Arm_64 = struct {
@@ -300,35 +300,35 @@ const Arm_64 = struct {
300300

301301
const assembly =
302302
std.fmt.comptimePrint(
303-
\\.global {[symbol]s}
304-
\\{[symbol]s}:
305-
\\ stp lr, fp, [sp, #-20*8]!
306-
\\ stp d8, d9, [sp, #2*8]
307-
\\ stp d10, d11, [sp, #4*8]
308-
\\ stp d12, d13, [sp, #6*8]
309-
\\ stp d14, d15, [sp, #8*8]
310-
\\ stp x19, x20, [sp, #10*8]
311-
\\ stp x21, x22, [sp, #12*8]
312-
\\ stp x23, x24, [sp, #14*8]
313-
\\ stp x25, x26, [sp, #16*8]
314-
\\ stp x27, x28, [sp, #18*8]
315-
\\
316-
\\ mov x9, sp
317-
\\ str x9, [x0]
318-
\\ ldr x9, [x1]
319-
\\ mov sp, x9
320-
\\
321-
\\ ldp x27, x28, [sp, #18*8]
322-
\\ ldp x25, x26, [sp, #16*8]
323-
\\ ldp x23, x24, [sp, #14*8]
324-
\\ ldp x21, x22, [sp, #12*8]
325-
\\ ldp x19, x20, [sp, #10*8]
326-
\\ ldp d14, d15, [sp, #8*8]
327-
\\ ldp d12, d13, [sp, #6*8]
328-
\\ ldp d10, d11, [sp, #4*8]
329-
\\ ldp d8, d9, [sp, #2*8]
330-
\\ ldp lr, fp, [sp], #20*8
331-
\\
332-
\\ ret
333-
, .{ .symbol = symbol });
303+
\\.global {[symbol]s}
304+
\\{[symbol]s}:
305+
\\ stp lr, fp, [sp, #-20*8]!
306+
\\ stp d8, d9, [sp, #2*8]
307+
\\ stp d10, d11, [sp, #4*8]
308+
\\ stp d12, d13, [sp, #6*8]
309+
\\ stp d14, d15, [sp, #8*8]
310+
\\ stp x19, x20, [sp, #10*8]
311+
\\ stp x21, x22, [sp, #12*8]
312+
\\ stp x23, x24, [sp, #14*8]
313+
\\ stp x25, x26, [sp, #16*8]
314+
\\ stp x27, x28, [sp, #18*8]
315+
\\
316+
\\ mov x9, sp
317+
\\ str x9, [x0]
318+
\\ ldr x9, [x1]
319+
\\ mov sp, x9
320+
\\
321+
\\ ldp x27, x28, [sp, #18*8]
322+
\\ ldp x25, x26, [sp, #16*8]
323+
\\ ldp x23, x24, [sp, #14*8]
324+
\\ ldp x21, x22, [sp, #12*8]
325+
\\ ldp x19, x20, [sp, #10*8]
326+
\\ ldp d14, d15, [sp, #8*8]
327+
\\ ldp d12, d13, [sp, #6*8]
328+
\\ ldp d10, d11, [sp, #4*8]
329+
\\ ldp d8, d9, [sp, #2*8]
330+
\\ ldp lr, fp, [sp], #20*8
331+
\\
332+
\\ ret
333+
, .{ .symbol = symbol });
334334
};

0 commit comments

Comments
 (0)