@@ -17,8 +17,13 @@ const log = std.log.scoped(.libxev_kqueue);
1717/// True if this backend is available on this platform.
1818pub fn available () bool {
1919 return switch (builtin .os .tag ) {
20+ // macOS uses kqueue
2021 .ios , .macos = > true ,
2122
23+ // BSDs use kqueue, but we only test on FreeBSD for now.
24+ // kqueue isn't exactly the same here as it is on Apple platforms.
25+ .freebsd = > true ,
26+
2227 // Technically other BSDs support kqueue but our implementation
2328 // below hard requires mach ports currently. That's not a fundamental
2429 // requirement but until someone makes this implementation work
@@ -27,19 +32,21 @@ pub fn available() bool {
2732 };
2833}
2934
35+ pub const NOTE_EXIT_FLAGS = switch (builtin .os .tag ) {
36+ .ios , .macos = > std .c .NOTE .EXIT | std .c .NOTE .EXITSTATUS ,
37+ .freebsd = > std .c .NOTE .EXIT ,
38+ else = > @compileError ("kqueue not supported yet for target OS" ),
39+ };
40+
3041pub const Loop = struct {
3142 const TimerHeap = heap .Intrusive (Timer , void , Timer .less );
3243 const TaskCompletionQueue = queue_mpsc .Intrusive (Completion );
3344
3445 /// The fd of the kqueue.
3546 kqueue_fd : posix.fd_t ,
3647
37- /// The mach port that this kqueue always has a filter for. Writing
38- /// an empty message to this port can be used to wake up the loop
39- /// at any time. Waking up the loop via this port won't trigger any
40- /// particular completion, it just forces tick to cycle.
41- mach_port : xev.Async ,
42- mach_port_buffer : [32 ]u8 = undefined ,
48+ /// The wakeup mechanism (mach ports on Apple, eventfd on BSD).
49+ wakeup_state : Wakeup ,
4350
4451 /// The number of active completions. This DOES NOT include completions that
4552 /// are queued in the submissions queue.
@@ -94,16 +101,17 @@ pub const Loop = struct {
94101 const fd = try posix .kqueue ();
95102 errdefer posix .close (fd );
96103
97- var mach_port = try xev . Async .init ();
98- errdefer mach_port .deinit ();
104+ const wakeup_state : Wakeup = try .init ();
105+ errdefer wakeup_state .deinit ();
99106
100107 var res : Loop = .{
101108 .kqueue_fd = fd ,
102- .mach_port = mach_port ,
109+ .wakeup_state = wakeup_state ,
103110 .thread_pool = options .thread_pool ,
104111 .thread_pool_completions = undefined ,
105112 .cached_now = undefined ,
106113 };
114+
107115 res .update_now ();
108116 return res ;
109117 }
@@ -112,7 +120,7 @@ pub const Loop = struct {
112120 /// were unprocessed are lost -- their callbacks will never be called.
113121 pub fn deinit (self : * Loop ) void {
114122 posix .close (self .kqueue_fd );
115- self .mach_port .deinit ();
123+ self .wakeup_state .deinit ();
116124 }
117125
118126 /// Stop the loop. This can only be called from the main thread.
@@ -308,32 +316,12 @@ pub const Loop = struct {
308316 self .thread_pool_completions .init ();
309317 }
310318
311- // Add our event so that we wake up when our mach port receives an
312- // event. We have to add here because we need a stable self pointer.
313- const events = [_ ]Kevent {.{
314- .ident = @as (usize , @intCast (self .mach_port .port )),
315- .filter = std .c .EVFILT .MACHPORT ,
316- .flags = std .c .EV .ADD | std .c .EV .ENABLE ,
317- .fflags = darwin .MACH_RCV_MSG ,
318- .data = 0 ,
319- .udata = 0 ,
320- .ext = .{
321- @intFromPtr (& self .mach_port_buffer ),
322- self .mach_port_buffer .len ,
323- },
324- }};
325- const n = kevent_syscall (
326- self .kqueue_fd ,
327- & events ,
328- events [0.. 0],
329- null ,
330- ) catch | err | {
319+ self .wakeup_state .setup (self .kqueue_fd ) catch | err | {
331320 // We reset initialization because we can't do anything
332321 // safely unless we get this mach port registered!
333322 self .flags .init = false ;
334323 return err ;
335324 };
336- assert (n == 0 );
337325 }
338326
339327 // The list of events, used as both a changelist and eventlist.
@@ -959,14 +947,16 @@ pub const Loop = struct {
959947 // Add to our completion queue
960948 c .task_loop .thread_pool_completions .push (c );
961949
962- // Wake up our main loop
963- c .task_loop .wakeup () catch {};
950+ if (comptime builtin .target .os .tag == .macos ) {
951+ // Wake up our main loop
952+ c .task_loop .wakeup () catch {};
953+ }
964954 }
965955
966956 /// Sends an empty message to this loop's mach port so that it wakes
967957 /// up if it is blocking on kevent().
968958 fn wakeup (self : * Loop ) ! void {
969- try self .mach_port . notify ();
959+ try self .wakeup_state . wakeup ();
970960 }
971961};
972962
@@ -1073,7 +1063,7 @@ pub const Completion = struct {
10731063 .udata = @intFromPtr (self ),
10741064 }),
10751065
1076- .machport = > kevent : {
1066+ .machport = > if ( comptime builtin . os . tag != .macos ) return null else kevent : {
10771067 // We can't use |*v| above because it crahses the Zig
10781068 // compiler (as of 0.11.0-dev.1413). We can retry another time.
10791069 const v = & self .op .machport ;
@@ -1086,7 +1076,7 @@ pub const Completion = struct {
10861076 // available AND automatically reads the message into the
10871077 // buffer since MACH_RCV_MSG is set.
10881078 break :kevent .{
1089- .ident = @intCast ( v .port ),
1079+ .ident = @as ( c_uint , v .port ),
10901080 .filter = std .c .EVFILT .MACHPORT ,
10911081 .flags = std .c .EV .ADD | std .c .EV .ENABLE ,
10921082 .fflags = darwin .MACH_RCV_MSG ,
@@ -1266,7 +1256,7 @@ pub const Completion = struct {
12661256 const ev = ev_ orelse break :res .{ .proc = ProcError .MissingKevent };
12671257
12681258 // If we have the exit status, we read it.
1269- if (ev .fflags & ( std . c . NOTE . EXIT | std . c . NOTE . EXITSTATUS ) > 0 ) {
1259+ if (ev .fflags & NOTE_EXIT_FLAGS > 0 ) {
12701260 const data : u32 = @intCast (ev .data );
12711261 if (posix .W .IFEXITED (data )) break :res .{
12721262 .proc = posix .W .EXITSTATUS (data ),
@@ -1433,6 +1423,76 @@ pub const Completion = struct {
14331423 }
14341424};
14351425
1426+ /// The struct used for loop wakeup. This is only internal state.
1427+ const Wakeup = if (builtin .os .tag .isDarwin ()) struct {
1428+ const Self = @This ();
1429+
1430+ /// The mach port that this kqueue always has a filter for. Writing
1431+ /// an empty message to this port can be used to wake up the loop
1432+ /// at any time. Waking up the loop via this port won't trigger any
1433+ /// particular completion, it just forces tick to cycle.
1434+ mach_port : xev.Async ,
1435+ mach_port_buffer : [32 ]u8 = undefined ,
1436+
1437+ fn init () ! Self {
1438+ const mach_port = try xev .Async .init ();
1439+ errdefer mach_port .deinit ();
1440+ return .{ .mach_port = mach_port };
1441+ }
1442+
1443+ fn deinit (self : * Self ) void {
1444+ self .mach_port .deinit ();
1445+ }
1446+
1447+ fn setup (self : * Self , kqueue_fd : posix.fd_t ) ! void {
1448+ const events = [_ ]Kevent {.{
1449+ .ident = @as (usize , @intCast (self .mach_port .port )),
1450+ .filter = std .c .EVFILT .MACHPORT ,
1451+ .flags = std .c .EV .ADD | std .c .EV .ENABLE ,
1452+ .fflags = darwin .MACH_RCV_MSG ,
1453+ .data = 0 ,
1454+ .udata = 0 ,
1455+ .ext = .{
1456+ @intFromPtr (& self .mach_port_buffer ),
1457+ self .mach_port_buffer .len ,
1458+ },
1459+ }};
1460+ const n = try kevent_syscall (
1461+ kqueue_fd ,
1462+ & events ,
1463+ events [0.. 0],
1464+ null ,
1465+ );
1466+ assert (n == 0 );
1467+ }
1468+
1469+ fn wakeup (self : * Self ) ! void {
1470+ try self .mach_port .notify ();
1471+ }
1472+ } else struct {
1473+ // TODO: We should use eventfd for FreeBSD. Until this is
1474+ // implemented, loop wakeup will crash on BSD.
1475+ const Self = @This ();
1476+
1477+ fn init () ! Self {
1478+ return .{};
1479+ }
1480+
1481+ fn deinit (self : * Self ) void {
1482+ _ = self ;
1483+ }
1484+
1485+ fn setup (self : * Self , kqueue_fd : posix.fd_t ) ! void {
1486+ _ = self ;
1487+ _ = kqueue_fd ;
1488+ }
1489+
1490+ fn wakeup (self : * Self ) ! void {
1491+ _ = self ;
1492+ @panic ("wakeup not implemented on this platform" );
1493+ }
1494+ };
1495+
14361496pub const OperationType = enum {
14371497 noop ,
14381498 accept ,
@@ -1535,14 +1595,14 @@ pub const Operation = union(OperationType) {
15351595 c : * Completion ,
15361596 },
15371597
1538- machport : struct {
1598+ machport : if ( ! builtin . os . tag . isDarwin ()) void else struct {
15391599 port : posix.system.mach_port_name_t ,
15401600 buffer : ReadBuffer ,
15411601 },
15421602
15431603 proc : struct {
15441604 pid : posix.pid_t ,
1545- flags : u32 = std . c . NOTE . EXIT | std . c . NOTE . EXITSTATUS ,
1605+ flags : u32 = NOTE_EXIT_FLAGS ,
15461606 },
15471607};
15481608
@@ -1590,12 +1650,12 @@ pub const ReadError = posix.KEventError ||
15901650 posix .PReadError ||
15911651 posix .RecvFromError ||
15921652 error {
1593- EOF ,
1594- Canceled ,
1595- MissingKevent ,
1596- PermissionDenied ,
1597- Unexpected ,
1598- };
1653+ EOF ,
1654+ Canceled ,
1655+ MissingKevent ,
1656+ PermissionDenied ,
1657+ Unexpected ,
1658+ };
15991659
16001660pub const WriteError = posix .KEventError ||
16011661 posix .WriteError ||
@@ -1604,10 +1664,10 @@ pub const WriteError = posix.KEventError ||
16041664 posix .SendMsgError ||
16051665 posix .SendToError ||
16061666 error {
1607- Canceled ,
1608- PermissionDenied ,
1609- Unexpected ,
1610- };
1667+ Canceled ,
1668+ PermissionDenied ,
1669+ Unexpected ,
1670+ };
16111671
16121672pub const MachPortError = posix .KEventError || error {
16131673 Canceled ,
@@ -1734,6 +1794,7 @@ const Timer = struct {
17341794/// This lets us support both Mac and non-Mac platforms.
17351795const Kevent = switch (builtin .os .tag ) {
17361796 .ios , .macos = > posix .system .kevent64_s ,
1797+ .freebsd = > std .c .Kevent ,
17371798 else = > @compileError ("kqueue not supported yet for target OS" ),
17381799};
17391800
@@ -2416,6 +2477,7 @@ test "kqueue: socket accept/connect/send/recv/close" {
24162477}
24172478
24182479test "kqueue: file IO on thread pool" {
2480+ if (builtin .os .tag != .macos ) return error .SkipZigTest ;
24192481 const testing = std .testing ;
24202482
24212483 var tpool = main .ThreadPool .init (.{});
0 commit comments