Skip to content

Commit 402580e

Browse files
charlesrocketnikneym
authored andcommitted
fix: adjust mach port
1 parent 8f97b0e commit 402580e

File tree

2 files changed

+96
-88
lines changed

2 files changed

+96
-88
lines changed

src/backend/kqueue.zig

Lines changed: 89 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ pub const Loop = struct {
4646
/// an empty message to this port can be used to wake up the loop
4747
/// at any time. Waking up the loop via this port won't trigger any
4848
/// particular completion, it just forces tick to cycle.
49-
//mach_port: xev.Async,
50-
//mach_port_buffer: [32]u8 = undefined,
49+
mach_port: xev.Async,
50+
mach_port_buffer: [32]u8 = undefined,
5151

5252
/// The number of active completions. This DOES NOT include completions that
5353
/// are queued in the submissions queue.
@@ -102,16 +102,17 @@ pub const Loop = struct {
102102
const fd = try posix.kqueue();
103103
errdefer posix.close(fd);
104104

105-
var mach_port = try xev.Async.init();
105+
const mach_port = try xev.Async.init();
106106
errdefer mach_port.deinit();
107107

108108
var res: Loop = .{
109109
.kqueue_fd = fd,
110-
// .mach_port = mach_port,
110+
.mach_port = mach_port,
111111
.thread_pool = options.thread_pool,
112112
.thread_pool_completions = undefined,
113113
.cached_now = undefined,
114114
};
115+
115116
res.update_now();
116117
return res;
117118
}
@@ -120,7 +121,7 @@ pub const Loop = struct {
120121
/// were unprocessed are lost -- their callbacks will never be called.
121122
pub fn deinit(self: *Loop) void {
122123
posix.close(self.kqueue_fd);
123-
//self.mach_port.deinit();
124+
if (comptime builtin.os.tag == .macos) self.mach_port.deinit();
124125
}
125126

126127
/// Stop the loop. This can only be called from the main thread.
@@ -316,32 +317,34 @@ pub const Loop = struct {
316317
self.thread_pool_completions.init();
317318
}
318319

319-
// Add our event so that we wake up when our mach port receives an
320-
// event. We have to add here because we need a stable self pointer.
321-
//const events = [_]Kevent{.{
322-
// .ident = @as(usize, @intCast(self.mach_port.port)),
323-
// .filter = std.c.EVFILT.MACHPORT,
324-
// .flags = std.c.EV.ADD | std.c.EV.ENABLE,
325-
// .fflags = darwin.MACH_RCV_MSG,
326-
// .data = 0,
327-
// .udata = 0,
328-
// .ext = .{
329-
// @intFromPtr(&self.mach_port_buffer),
330-
// self.mach_port_buffer.len,
331-
// },
332-
//}};
333-
//const n = kevent_syscall(
334-
// self.kqueue_fd,
335-
// &events,
336-
// events[0..0],
337-
// null,
338-
//) catch |err| {
339-
// // We reset initialization because we can't do anything
340-
// // safely unless we get this mach port registered!
341-
// self.flags.init = false;
342-
// return err;
343-
//};
344-
//assert(n == 0);
320+
if (comptime builtin.target.os.tag == .macos) {
321+
// Add our event so that we wake up when our mach port receives an
322+
// event. We have to add here because we need a stable self pointer.
323+
const events = [_]Kevent{.{
324+
.ident = @as(usize, @intCast(self.mach_port.port)),
325+
.filter = std.c.EVFILT.MACHPORT,
326+
.flags = std.c.EV.ADD | std.c.EV.ENABLE,
327+
.fflags = darwin.MACH_RCV_MSG,
328+
.data = 0,
329+
.udata = 0,
330+
.ext = .{
331+
@intFromPtr(&self.mach_port_buffer),
332+
self.mach_port_buffer.len,
333+
},
334+
}};
335+
const n = kevent_syscall(
336+
self.kqueue_fd,
337+
&events,
338+
events[0..0],
339+
null,
340+
) catch |err| {
341+
// We reset initialization because we can't do anything
342+
// safely unless we get this mach port registered!
343+
self.flags.init = false;
344+
return err;
345+
};
346+
assert(n == 0);
347+
}
345348
}
346349

347350
// The list of events, used as both a changelist and eventlist.
@@ -801,10 +804,10 @@ pub const Loop = struct {
801804
break :action .{ .kevent = {} };
802805
},
803806

804-
// .machport => action: {
805-
// ev.* = c.kevent().?;
806-
// break :action .{ .kevent = {} };
807-
// },
807+
.machport => action: {
808+
ev.* = c.kevent().?;
809+
break :action .{ .kevent = {} };
810+
},
808811

809812
.proc => action: {
810813
ev.* = c.kevent().?;
@@ -967,15 +970,17 @@ pub const Loop = struct {
967970
// Add to our completion queue
968971
c.task_loop.thread_pool_completions.push(c);
969972

970-
// Wake up our main loop
971-
//c.task_loop.wakeup() catch {};
973+
if (comptime builtin.target.os.tag == .macos) {
974+
// Wake up our main loop
975+
c.task_loop.wakeup() catch {};
976+
}
972977
}
973978

974-
///// Sends an empty message to this loop's mach port so that it wakes
975-
///// up if it is blocking on kevent().
976-
//fn wakeup(self: *Loop) !void {
977-
//try self.mach_port.notify();
978-
//}
979+
/// Sends an empty message to this loop's mach port so that it wakes
980+
/// up if it is blocking on kevent().
981+
fn wakeup(self: *Loop) !void {
982+
try self.mach_port.notify();
983+
}
979984
};
980985

981986
/// A completion is a request to perform some work with the loop.
@@ -1081,28 +1086,28 @@ pub const Completion = struct {
10811086
.udata = @intFromPtr(self),
10821087
}),
10831088

1084-
// .machport => kevent: {
1085-
// // We can't use |*v| above because it crahses the Zig
1086-
// // compiler (as of 0.11.0-dev.1413). We can retry another time.
1087-
// const v = &self.op.machport;
1088-
// const slice: []u8 = switch (v.buffer) {
1089-
// .slice => |slice| slice,
1090-
// .array => |*arr| arr,
1091-
// };
1092-
1093-
// The kevent below waits for a machport to have a message
1094-
// available AND automatically reads the message into the
1095-
// buffer since MACH_RCV_MSG is set.
1096-
//break :kevent .{
1097-
// .ident = @intCast(v.port),
1098-
// .filter = std.c.EVFILT.MACHPORT,
1099-
// .flags = std.c.EV.ADD | std.c.EV.ENABLE,
1100-
// .fflags = darwin.MACH_RCV_MSG,
1101-
// .data = 0,
1102-
// .udata = @intFromPtr(self),
1103-
// .ext = .{ @intFromPtr(slice.ptr), slice.len },
1104-
//};
1105-
//},
1089+
.machport => if (comptime builtin.os.tag != .macos) return null else kevent: {
1090+
// We can't use |*v| above because it crahses the Zig
1091+
// compiler (as of 0.11.0-dev.1413). We can retry another time.
1092+
const v = &self.op.machport;
1093+
const slice: []u8 = switch (v.buffer) {
1094+
.slice => |slice| slice,
1095+
.array => |*arr| arr,
1096+
};
1097+
1098+
// The kevent below waits for a machport to have a message
1099+
// available AND automatically reads the message into the
1100+
// buffer since MACH_RCV_MSG is set.
1101+
break :kevent .{
1102+
.ident = @as(c_uint, v.port),
1103+
.filter = std.c.EVFILT.MACHPORT,
1104+
.flags = std.c.EV.ADD | std.c.EV.ENABLE,
1105+
.fflags = darwin.MACH_RCV_MSG,
1106+
.data = 0,
1107+
.udata = @intFromPtr(self),
1108+
.ext = .{ @intFromPtr(slice.ptr), slice.len },
1109+
};
1110+
},
11061111

11071112
.proc => |v| kevent_init(.{
11081113
.ident = @intCast(v.pid),
@@ -1265,9 +1270,9 @@ pub const Completion = struct {
12651270
// Our machport operation ALWAYS has MACH_RCV set so there
12661271
// is no operation to perform. kqueue automatically reads in
12671272
// the mach message into the read buffer.
1268-
// .machport => .{
1269-
// .machport = {},
1270-
// },
1273+
.machport => .{
1274+
.machport = {},
1275+
},
12711276

12721277
// For proc watching, it is identical to the syscall result.
12731278
.proc => res: {
@@ -1381,13 +1386,13 @@ pub const Completion = struct {
13811386
},
13821387
},
13831388

1384-
// .machport => .{
1385-
// .machport = switch (errno) {
1386-
// .SUCCESS => {},
1387-
// .CANCELED => error.Canceled,
1388-
// else => |err| posix.unexpectedErrno(err),
1389-
// },
1390-
// },
1389+
.machport => .{
1390+
.machport = switch (errno) {
1391+
.SUCCESS => {},
1392+
.CANCELED => error.Canceled,
1393+
else => |err| posix.unexpectedErrno(err),
1394+
},
1395+
},
13911396

13921397
.proc => .{
13931398
.proc = switch (errno) {
@@ -1457,7 +1462,7 @@ pub const OperationType = enum {
14571462
shutdown,
14581463
timer,
14591464
cancel,
1460-
// machport,
1465+
machport,
14611466
proc,
14621467
};
14631468

@@ -1543,10 +1548,10 @@ pub const Operation = union(OperationType) {
15431548
c: *Completion,
15441549
},
15451550

1546-
// machport: struct {
1547-
// port: posix.system.mach_port_name_t,
1548-
// buffer: ReadBuffer,
1549-
// },
1551+
machport: if (!builtin.os.tag.isDarwin()) struct {} else struct {
1552+
port: posix.system.mach_port_name_t,
1553+
buffer: ReadBuffer,
1554+
},
15501555

15511556
proc: struct {
15521557
pid: posix.pid_t,
@@ -1570,7 +1575,7 @@ pub const Result = union(OperationType) {
15701575
shutdown: ShutdownError!void,
15711576
timer: TimerError!TimerTrigger,
15721577
cancel: CancelError!void,
1573-
// machport: MachPortError!void,
1578+
machport: MachPortError!void,
15741579
proc: ProcError!u32,
15751580
};
15761581

@@ -1617,10 +1622,10 @@ pub const WriteError = posix.KEventError ||
16171622
Unexpected,
16181623
};
16191624

1620-
// pub const MachPortError = posix.KEventError || error{
1621-
// Canceled,
1622-
// Unexpected,
1623-
// };
1625+
pub const MachPortError = posix.KEventError || error{
1626+
Canceled,
1627+
Unexpected,
1628+
};
16241629

16251630
pub const ProcError = posix.KEventError || error{
16261631
Canceled,

src/watcher/async.zig

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,22 @@ pub fn Async(comptime xev: type) type {
1515
// Supported, uses eventfd
1616
.io_uring,
1717
.epoll,
18-
.kqueue,
1918
=> AsyncEventFd(xev),
2019

2120
// Supported, uses the backend API
2221
.wasi_poll => AsyncLoopState(xev, xev.Loop.threaded),
2322

24-
// Supported, uses mach ports
25-
// .kqueue => AsyncMachPort(xev),
23+
// Supported, uses mach port on Mac and eventfd on BSD
24+
.kqueue => if (comptime builtin.target.os.tag.isDarwin())
25+
AsyncMachPort(xev)
26+
else
27+
AsyncEventFd(xev),
28+
2629
.iocp => AsyncIOCP(xev),
2730
};
2831
}
2932

30-
/// Async implementation using eventfd (Linux).
33+
/// Async implementation using eventfd (Unix/Linux).
3134
fn AsyncEventFd(comptime xev: type) type {
3235
return struct {
3336
const Self = @This();

0 commit comments

Comments
 (0)