Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ pub fn build(b: *std.Build) void {
const os = target.query.os_tag orelse builtin.os.tag;
const exe = b.addExecutable(.{
.name = @tagName(example),
.root_source_file = b.path("examples/" ++ @tagName(example) ++ ".zig"),
.target = target,
.optimize = optimize,
.sanitize_thread = sanitize,
.single_threaded = if (example == .coro_wttr and os != .wasi) false else single_threaded,
.strip = false,
.root_module = b.createModule(.{
.root_source_file = b.path("examples/" ++ @tagName(example) ++ ".zig"),
.target = target,
.optimize = optimize,
.sanitize_thread = sanitize,
.single_threaded = if (example == .coro_wttr and os != .wasi) false else single_threaded,
.strip = false,
}),
});
exe.root_module.addImport("aio", aio);
exe.root_module.addImport("coro", coro);
Expand All @@ -91,14 +93,16 @@ pub fn build(b: *std.Build) void {
const test_step = b.step("test", "Run unit tests");
inline for (.{ .minilib, .aio, .coro }) |mod| {
const tst = b.addTest(.{
.root_source_file = b.path("src/" ++ @tagName(mod) ++ ".zig"),
.target = target,
.optimize = optimize,
.filters = &.{test_filter},
.link_libc = aio.link_libc,
.sanitize_thread = sanitize,
.single_threaded = single_threaded,
.strip = false,
.root_module = b.createModule(.{
.root_source_file = b.path("src/" ++ @tagName(mod) ++ ".zig"),
.target = target,
.optimize = optimize,
.link_libc = aio.link_libc,
.sanitize_thread = sanitize,
.single_threaded = single_threaded,
.strip = false,
}),
});
switch (mod) {
.minilib => addImportsFrom(tst.root_module, minilib),
Expand All @@ -121,16 +125,18 @@ pub fn build(b: *std.Build) void {
}) |bug| {
const exe = b.addExecutable(.{
.name = @tagName(bug),
.root_source_file = b.path("bugs/" ++ @tagName(bug) ++ ".zig"),
.target = target,
.optimize = switch (bug) {
// fails on io_uring if sanitize == true and optimize == debug, not sure why
.ticker => if (sanitize) .ReleaseFast else optimize,
else => optimize,
},
.sanitize_thread = sanitize,
.single_threaded = single_threaded,
.strip = false,
.root_module = b.createModule(.{
.root_source_file = b.path("bugs/" ++ @tagName(bug) ++ ".zig"),
.target = target,
.optimize = switch (bug) {
// fails on io_uring if sanitize == true and optimize == debug, not sure why
.ticker => if (sanitize) .ReleaseFast else optimize,
else => optimize,
},
.sanitize_thread = sanitize,
.single_threaded = single_threaded,
.strip = false,
}),
});
exe.root_module.addImport("aio", aio);
exe.root_module.addImport("coro", coro);
Expand All @@ -152,12 +158,14 @@ pub fn build(b: *std.Build) void {
}) |bench| {
const exe = b.addExecutable(.{
.name = @tagName(bench),
.root_source_file = b.path("bench/" ++ @tagName(bench) ++ ".zig"),
.target = target,
.optimize = .ReleaseFast,
.sanitize_thread = sanitize,
.single_threaded = single_threaded,
.strip = false,
.root_module = b.createModule(.{
.root_source_file = b.path("bench/" ++ @tagName(bench) ++ ".zig"),
.target = target,
.optimize = .ReleaseFast,
.sanitize_thread = sanitize,
.single_threaded = single_threaded,
.strip = false,
}),
});
exe.root_module.addImport("aio", aio);
exe.root_module.addImport("coro", coro);
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/coro-blocking-code.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ finishes on a worker thread.

```zig
fn blockingCode() u32 {
std.time.sleep(1 * std.time.ns_per_s);
std.posix.nanosleep(1, 0);
return 69;
}

Expand Down
27 changes: 16 additions & 11 deletions examples/coro_wttr.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ pub const std_options: std.Options = .{

fn getWeather(completed: *std.atomic.Value(u32), allocator: std.mem.Allocator, city: []const u8, lang: []const u8) anyerror![]const u8 {
defer _ = completed.fetchAdd(1, .monotonic);
var url: std.BoundedArray(u8, 256) = .{};
var buf: [256]u8 = undefined;
var url: std.ArrayListUnmanaged(u8) = .initBuffer(&buf);
if (builtin.target.os.tag == .windows) {
try url.writer().print("https://wttr.in/{s}?AFT&lang={s}", .{ city, lang });
try url.fixedWriter().print("https://wttr.in/{s}?AFT&lang={s}", .{ city, lang });
} else {
try url.writer().print("https://wttr.in/{s}?AF&lang={s}", .{ city, lang });
try url.fixedWriter().print("https://wttr.in/{s}?AF&lang={s}", .{ city, lang });
}
var body = std.ArrayList(u8).init(allocator);
errdefer body.deinit();
var client: std.http.Client = .{ .allocator = allocator };
defer client.deinit();
var writer = body.writer().adaptToNewApi();
_ = try client.fetch(.{
.location = .{ .url = url.constSlice() },
.response_storage = .{ .dynamic = &body },
.location = .{ .url = &buf },
.response_writer = &writer.new_interface,
});
return body.toOwnedSlice();
}
Expand All @@ -33,9 +35,10 @@ fn getLatestZig(completed: *std.atomic.Value(u32), allocator: std.mem.Allocator)
defer body.deinit();
var client: std.http.Client = .{ .allocator = allocator };
defer client.deinit();
var writer = body.writer().adaptToNewApi();
_ = try client.fetch(.{
.location = .{ .url = "https://ziglang.org/download/index.json" },
.response_storage = .{ .dynamic = &body },
.response_writer = &writer.new_interface,
});
const Index = struct {
master: struct { version: []const u8 },
Expand Down Expand Up @@ -102,18 +105,20 @@ pub fn main() !void {
// don't really have to call this, but I want the defer that cleans the progress bar to run
ltask.cancel();

var buf: [std.heap.pageSize()]u8 = undefined;
var writer = std.fs.File.stdout().writer(&buf);
for (tasks.items, 0..) |task, idx| {
if (task.complete(.wait)) |body| {
defer allocator.free(body);
if (idx == 3) {
try std.io.getStdOut().writer().print("\nAaand the current master zig version is... ", .{});
try writer.interface.print("\nAaand the current master zig version is... ", .{});
}
try std.io.getStdOut().writeAll(body);
try std.io.getStdOut().writeAll("\n");
try writer.interface.writeAll(body);
try writer.interface.writeAll("\n");
} else |err| {
try std.io.getStdOut().writer().print("request {} failed with: {}\n", .{ idx, err });
try writer.interface.print("request {} failed with: {}\n", .{ idx, err });
}
}

try std.io.getStdOut().writer().print("\nThat's all folks\n", .{});
try writer.interface.print("\nThat's all folks\n", .{});
}
2 changes: 1 addition & 1 deletion src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ test "ChildExit" {
.linux, .freebsd, .openbsd, .dragonfly, .netbsd, .macos, .ios, .watchos, .visionos, .tvos => blk: {
const pid = try std.posix.fork();
if (pid == 0) {
std.time.sleep(1 * std.time.ns_per_s);
std.posix.nanosleep(1, 0);
std.posix.exit(69);
}
break :blk pid;
Expand Down
9 changes: 5 additions & 4 deletions src/aio/IoUring.zig
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,15 @@ pub fn queue(self: *@This(), pairs: anytype, handler: anytype) aio.Error!void {
const saved_sq = self.io.sq;
errdefer self.io.sq = saved_sq;
if (comptime pairs.len > 1) {
var ids: std.BoundedArray(aio.Id, pairs.len) = .{};
errdefer inline for (ids.constSlice(), pairs) |id, pair| {
var buf: [pairs.len]aio.Id = undefined;
var ids: std.ArrayListUnmanaged(aio.Id) = .initBuffer(&buf);
errdefer inline for (buf, pairs) |id, pair| {
debug("dequeue: {}: {}, {s}", .{ id, pair.tag, @tagName(pair.link) });
self.ops.release(id) catch unreachable;
};
inline for (pairs) |pair| ids.append(try self.queueOperation(pair.tag, pair.op, pair.link)) catch unreachable;
inline for (pairs) |pair| ids.appendAssumeCapacity(try self.queueOperation(pair.tag, pair.op, pair.link));
if (@TypeOf(handler) != void) {
inline for (ids.constSlice(), pairs) |id, pair| {
inline for (buf, pairs) |id, pair| {
handler.aio_queue(id, pair.op.userdata);
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/aio/posix/wasi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,12 @@ fn clock(userdata: usize, timeout: i32) std.os.wasi.subscription_t {
pub fn poll(fds: []std.posix.pollfd, timeout: i32) std.posix.PollError!usize {
// TODO: maybe use thread local arena instead?
const MAX_POLL_FDS = 4096;
var subs: std.BoundedArray(std.os.wasi.subscription_t, MAX_POLL_FDS) = .{};
var buf: [MAX_POLL_FDS]std.os.wasi.subscription_t = undefined;
var subs: std.ArrayListUnmanaged(std.os.wasi.subscription_t) = .initBuffer(&buf);
for (fds) |*pfd| {
pfd.revents = 0;
if (pfd.events & std.posix.POLL.IN != 0) {
subs.append(.{
subs.appendBounded(.{
.userdata = @intFromPtr(pfd),
.u = .{
.tag = .FD_READ,
Expand All @@ -220,7 +221,7 @@ pub fn poll(fds: []std.posix.pollfd, timeout: i32) std.posix.PollError!usize {
}) catch return error.SystemResources;
}
if (pfd.events & std.posix.POLL.OUT != 0) {
subs.append(.{
subs.appendBounded(.{
.userdata = @intFromPtr(pfd),
.u = .{
.tag = .FD_WRITE,
Expand All @@ -235,12 +236,12 @@ pub fn poll(fds: []std.posix.pollfd, timeout: i32) std.posix.PollError!usize {
}

if (timeout >= 0) {
subs.append(clock(0, timeout)) catch return error.SystemResources;
subs.appendBounded(clock(0, timeout)) catch return error.SystemResources;
}

var n: usize = 0;
var events: [MAX_POLL_FDS]std.os.wasi.event_t = undefined;
switch (std.os.wasi.poll_oneoff(@ptrCast(subs.constSlice().ptr), @ptrCast(events[0..subs.len].ptr), subs.len, &n)) {
switch (std.os.wasi.poll_oneoff(@ptrCast(&buf), @ptrCast(events[0..subs.len].ptr), subs.len, &n)) {
.SUCCESS => {},
else => |e| {
log.err("poll: {}", .{e});
Expand Down
11 changes: 6 additions & 5 deletions src/aio/uringlator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,17 @@ pub fn Uringlator(BackendOperation: type) type {

pub fn queue(self: *@This(), pairs: anytype, backend: anytype, handler: anytype) aio.Error!void {
if (comptime pairs.len > 1) {
var ids: std.BoundedArray(aio.Id, pairs.len) = .{};
errdefer inline for (ids.constSlice(), pairs) |id, pair| {
var buf: [pairs.len]aio.Id = undefined;
var ids: std.ArrayListUnmanaged(aio.Id) = .initBuffer(&buf);
errdefer inline for (buf, pairs) |id, pair| {
debug("dequeue: {}: {}, {s} ({?})", .{ id, pair.tag, @tagName(pair.link), self.prev_id });
backend.uringlator_dequeue(id, pair.tag, pair.op);
self.ops.release(id) catch unreachable;
};
inline for (pairs) |pair| ids.append(try self.queueOperation(pair.tag, pair.op, pair.link, backend)) catch unreachable;
inline for (ids.constSlice()[0..pairs.len]) |id| self.queued.add(id) catch unreachable;
inline for (pairs) |pair| ids.appendAssumeCapacity(try self.queueOperation(pair.tag, pair.op, pair.link, backend));
inline for (buf) |id| self.queued.add(id) catch unreachable;
if (@TypeOf(handler) != void) {
inline for (ids.constSlice(), pairs) |id, pair| handler.aio_queue(id, pair.op.userdata);
inline for (buf, pairs) |id, pair| handler.aio_queue(id, pair.op.userdata);
}
} else {
inline for (pairs) |pair| {
Expand Down
4 changes: 2 additions & 2 deletions src/coro/ThreadPool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ test "ThreadPool" {

const Test = struct {
fn blocking() u32 {
std.time.sleep(1 * std.time.ns_per_s);
std.posix.nanosleep(1, 0);
return 69;
}

Expand All @@ -118,7 +118,7 @@ test "ThreadPool" {

fn blockingCanceled(token: *const CancellationToken) u32 {
while (!token.canceled.load(.acquire)) {
std.time.sleep(1 * std.time.ns_per_s);
std.posix.nanosleep(1, 0);
}
return if (token.canceled.load(.acquire)) 666 else 69;
}
Expand Down
4 changes: 2 additions & 2 deletions src/coro/zefi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn init(stack: Stack, user_data: usize, comptime func: anytype, args: anytyp

const Args = @TypeOf(args);
const state = try State.init(stack, user_data, @sizeOf(Args), struct {
fn entry() callconv(.C) noreturn {
fn entry() callconv(.c) noreturn {
const state = tls_state orelse unreachable;

// Call the functions with the args.
Expand Down Expand Up @@ -109,7 +109,7 @@ const State = extern struct {
stack_context: *anyopaque,
user_data: usize,

fn init(stack: Stack, user_data: usize, args_size: usize, entry_point: *const fn () callconv(.C) noreturn) Error!*State {
fn init(stack: Stack, user_data: usize, args_size: usize, entry_point: *const fn () callconv(.c) noreturn) Error!*State {
const stack_base = @intFromPtr(stack.ptr);
const stack_end = @intFromPtr(stack.ptr + stack.len);

Expand Down
Loading