Skip to content

Commit d5dfcf4

Browse files
committed
implement Queue
1 parent 9a0f1d6 commit d5dfcf4

File tree

2 files changed

+210
-52
lines changed

2 files changed

+210
-52
lines changed

src/coro/sync.zig

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const std = @import("std");
33
const aio = @import("aio");
44
const coro = @import("../coro.zig");
55
const Frame = @import("Frame.zig");
6+
const Allocator = std.mem.Allocator;
67

78
fn wakeupWaiters(list: *Frame.WaitList, status: anytype) void {
89
var next = list.first;
@@ -458,3 +459,160 @@ test "RwLock.Cancel" {
458459

459460
try std.testing.expectEqual(value, check_value);
460461
}
462+
463+
/// A multi-producer, multi-consumer queue for sending data between tasks, schedulers, and threads.
464+
/// It supports various configurations: single-producer & multi-consumer, multi-producer & single-consumer,
465+
/// and single-producer & single-consumer.
466+
/// Only one consumer receives each sent data, regardless of the number of consumers.
467+
pub fn Queue(comptime T: type) type {
468+
return struct {
469+
const QueueList = std.DoublyLinkedList(T);
470+
const MemoryPool = std.heap.MemoryPool(QueueList.Node);
471+
472+
pool: MemoryPool,
473+
queue: QueueList = .{},
474+
mutex: Mutex,
475+
semaphore: aio.EventSource,
476+
477+
pub fn init(allocator: Allocator) !@This() {
478+
return .{
479+
.pool = MemoryPool.init(allocator),
480+
.mutex = try Mutex.init(),
481+
.semaphore = try aio.EventSource.init(),
482+
};
483+
}
484+
485+
pub fn deinit(self: *@This()) void {
486+
self.clear() catch {};
487+
self.pool.deinit();
488+
self.mutex.deinit();
489+
self.semaphore.deinit();
490+
}
491+
492+
pub fn send(self: *@This(), data: T) !void {
493+
const node = try self.pool.create();
494+
errdefer self.pool.destroy(node);
495+
496+
node.data = data;
497+
498+
try self.mutex.lock();
499+
defer self.mutex.unlock();
500+
501+
self.queue.prepend(node);
502+
503+
self.semaphore.notify();
504+
}
505+
506+
pub fn tryRecv(self: *@This()) ?T {
507+
if (self.mutex.tryLock()) {
508+
defer self.mutex.unlock();
509+
510+
self.semaphore.waitNonBlocking() catch |err| switch (err) {
511+
error.WouldBlock => return null,
512+
};
513+
514+
if (self.queue.pop()) |node| {
515+
const data = node.data;
516+
self.pool.destroy(node);
517+
return data;
518+
}
519+
}
520+
return null;
521+
}
522+
523+
pub fn recv(self: *@This()) !T {
524+
try coro.io.single(.wait_event_source, .{ .source = &self.semaphore });
525+
526+
try self.mutex.lock();
527+
defer self.mutex.unlock();
528+
529+
if (self.queue.pop()) |node| {
530+
const data = node.data;
531+
self.pool.destroy(node);
532+
return data;
533+
}
534+
535+
unreachable;
536+
}
537+
538+
pub fn clear(self: *@This()) !void {
539+
try self.mutex.lock();
540+
defer self.mutex.unlock();
541+
542+
while (true) self.semaphore.waitNonBlocking() catch break;
543+
544+
while (self.queue.pop()) |node| {
545+
self.pool.destroy(node);
546+
}
547+
}
548+
};
549+
}
550+
551+
test "Queue" {
552+
if (builtin.single_threaded) {
553+
return error.SkipZigTest;
554+
}
555+
556+
const Test = struct {
557+
fn provider_task(queue: *Queue(u32), data: u32) !void {
558+
try queue.send(data);
559+
}
560+
561+
fn provider(queue: *Queue(u32)) !void {
562+
var scheduler = try coro.Scheduler.init(std.testing.allocator, .{});
563+
defer scheduler.deinit();
564+
565+
for (0..128) |i| {
566+
_ = try scheduler.spawn(provider_task, .{ queue, @as(u32, @intCast(i)) }, .{ .detached = true });
567+
}
568+
569+
try scheduler.run(.wait);
570+
}
571+
572+
fn consumer_task(queue: *Queue(u32)) !void {
573+
_ = try queue.recv();
574+
}
575+
576+
fn consumer(queue: *Queue(u32)) !void {
577+
var scheduler = try coro.Scheduler.init(std.testing.allocator, .{});
578+
defer scheduler.deinit();
579+
580+
for (0..128) |_| {
581+
_ = try scheduler.spawn(consumer_task, .{queue}, .{ .detached = true });
582+
}
583+
584+
try scheduler.run(.wait);
585+
}
586+
};
587+
588+
var queue = try Queue(u32).init(std.testing.allocator);
589+
defer queue.deinit();
590+
591+
// test queue order without threads/schedulers
592+
try queue.send(780);
593+
try queue.send(632);
594+
try queue.send(1230);
595+
try queue.send(6);
596+
597+
try std.testing.expectEqual(780, try queue.recv());
598+
try std.testing.expectEqual(632, try queue.recv());
599+
try std.testing.expectEqual(1230, try queue.recv());
600+
try std.testing.expectEqual(6, try queue.recv());
601+
602+
// check if it has returned to its initial state
603+
try std.testing.expectEqual(null, queue.tryRecv());
604+
try std.testing.expectEqual(0, queue.queue.len);
605+
606+
var threads: [2]std.Thread = undefined;
607+
608+
threads[0] = try std.Thread.spawn(.{}, Test.consumer, .{&queue});
609+
threads[1] = try std.Thread.spawn(.{}, Test.provider, .{&queue});
610+
611+
for (threads) |thread| {
612+
thread.join();
613+
}
614+
615+
// check if it has returned to its initial state
616+
try std.testing.expectEqual(null, queue.tryRecv());
617+
try std.testing.expectEqual(0, queue.queue.len);
618+
}

src/coro/zefi.zig

Lines changed: 52 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -261,27 +261,27 @@ const Intel_SysV = struct {
261261

262262
const assembly =
263263
std.fmt.comptimePrint(
264-
\\.global {[symbol]s}
265-
\\{[symbol]s}:
266-
\\ pushq %rbx
267-
\\ pushq %rbp
268-
\\ pushq %r12
269-
\\ pushq %r13
270-
\\ pushq %r14
271-
\\ pushq %r15
272-
\\
273-
\\ movq %rsp, (%rdi)
274-
\\ movq (%rsi), %rsp
275-
\\
276-
\\ popq %r15
277-
\\ popq %r14
278-
\\ popq %r13
279-
\\ popq %r12
280-
\\ popq %rbp
281-
\\ popq %rbx
282-
\\
283-
\\ retq
284-
, .{ .symbol = symbol });
264+
\\.global {[symbol]s}
265+
\\{[symbol]s}:
266+
\\ pushq %rbx
267+
\\ pushq %rbp
268+
\\ pushq %r12
269+
\\ pushq %r13
270+
\\ pushq %r14
271+
\\ pushq %r15
272+
\\
273+
\\ movq %rsp, (%rdi)
274+
\\ movq (%rsi), %rsp
275+
\\
276+
\\ popq %r15
277+
\\ popq %r14
278+
\\ popq %r13
279+
\\ popq %r12
280+
\\ popq %rbp
281+
\\ popq %rbx
282+
\\
283+
\\ retq
284+
, .{ .symbol = symbol });
285285
};
286286

287287
const Arm_64 = struct {
@@ -300,35 +300,35 @@ const Arm_64 = struct {
300300

301301
const assembly =
302302
std.fmt.comptimePrint(
303-
\\.global {[symbol]s}
304-
\\{[symbol]s}:
305-
\\ stp lr, fp, [sp, #-20*8]!
306-
\\ stp d8, d9, [sp, #2*8]
307-
\\ stp d10, d11, [sp, #4*8]
308-
\\ stp d12, d13, [sp, #6*8]
309-
\\ stp d14, d15, [sp, #8*8]
310-
\\ stp x19, x20, [sp, #10*8]
311-
\\ stp x21, x22, [sp, #12*8]
312-
\\ stp x23, x24, [sp, #14*8]
313-
\\ stp x25, x26, [sp, #16*8]
314-
\\ stp x27, x28, [sp, #18*8]
315-
\\
316-
\\ mov x9, sp
317-
\\ str x9, [x0]
318-
\\ ldr x9, [x1]
319-
\\ mov sp, x9
320-
\\
321-
\\ ldp x27, x28, [sp, #18*8]
322-
\\ ldp x25, x26, [sp, #16*8]
323-
\\ ldp x23, x24, [sp, #14*8]
324-
\\ ldp x21, x22, [sp, #12*8]
325-
\\ ldp x19, x20, [sp, #10*8]
326-
\\ ldp d14, d15, [sp, #8*8]
327-
\\ ldp d12, d13, [sp, #6*8]
328-
\\ ldp d10, d11, [sp, #4*8]
329-
\\ ldp d8, d9, [sp, #2*8]
330-
\\ ldp lr, fp, [sp], #20*8
331-
\\
332-
\\ ret
333-
, .{ .symbol = symbol });
303+
\\.global {[symbol]s}
304+
\\{[symbol]s}:
305+
\\ stp lr, fp, [sp, #-20*8]!
306+
\\ stp d8, d9, [sp, #2*8]
307+
\\ stp d10, d11, [sp, #4*8]
308+
\\ stp d12, d13, [sp, #6*8]
309+
\\ stp d14, d15, [sp, #8*8]
310+
\\ stp x19, x20, [sp, #10*8]
311+
\\ stp x21, x22, [sp, #12*8]
312+
\\ stp x23, x24, [sp, #14*8]
313+
\\ stp x25, x26, [sp, #16*8]
314+
\\ stp x27, x28, [sp, #18*8]
315+
\\
316+
\\ mov x9, sp
317+
\\ str x9, [x0]
318+
\\ ldr x9, [x1]
319+
\\ mov sp, x9
320+
\\
321+
\\ ldp x27, x28, [sp, #18*8]
322+
\\ ldp x25, x26, [sp, #16*8]
323+
\\ ldp x23, x24, [sp, #14*8]
324+
\\ ldp x21, x22, [sp, #12*8]
325+
\\ ldp x19, x20, [sp, #10*8]
326+
\\ ldp d14, d15, [sp, #8*8]
327+
\\ ldp d12, d13, [sp, #6*8]
328+
\\ ldp d10, d11, [sp, #4*8]
329+
\\ ldp d8, d9, [sp, #2*8]
330+
\\ ldp lr, fp, [sp], #20*8
331+
\\
332+
\\ ret
333+
, .{ .symbol = symbol });
334334
};

0 commit comments

Comments
 (0)