diff --git a/src/watcher/file.zig b/src/watcher/file.zig index d916bd1..0eae944 100644 --- a/src/watcher/file.zig +++ b/src/watcher/file.zig @@ -167,7 +167,7 @@ fn FileStream(comptime xev: type) type { ) xev.CallbackAction, ) void { // Initialize our completion - req.* = .{}; + req.* = .{ .full_write_buffer = buf }; self.pwriteInit(&req.completion, buf, offset); req.completion.userdata = q; req.completion.callback = (struct { @@ -313,6 +313,133 @@ fn FileStream(comptime xev: type) type { test { _ = FileTests(xev, Self); } + + test "queuePWrite" { + // wasi: local files don't work with poll (always ready) + if (builtin.os.tag == .wasi) return error.SkipZigTest; + // windows: std.fs.File is not opened with OVERLAPPED flag. + if (builtin.os.tag == .windows) return error.SkipZigTest; + if (builtin.os.tag == .freebsd) return error.SkipZigTest; + + const testing = std.testing; + + var tpool = main.ThreadPool.init(.{}); + defer tpool.deinit(); + defer tpool.shutdown(); + var loop = try xev.Loop.init(.{ .thread_pool = &tpool }); + defer loop.deinit(); + + // Create our file + const path = "test_watcher_file"; + const f = try std.fs.cwd().createFile(path, .{ + .read = true, + .truncate = true, + }); + defer f.close(); + defer std.fs.cwd().deleteFile(path) catch {}; + + const file = try Self.init(f); + var write_queue: xev.WriteQueue = .{}; + var write_req: [2]xev.WriteRequest = undefined; + + // Perform a write and then a read + file.queueWrite( + &loop, + &write_queue, + &write_req[0], + .{ .slice = "1234" }, + void, + null, + (struct { + fn callback( + _: ?*void, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.WriteBuffer, + r: xev.WriteError!usize, + ) xev.CallbackAction { + _ = r catch unreachable; + return .disarm; + } + }).callback, + ); + file.queueWrite( + &loop, + &write_queue, + &write_req[1], + .{ .slice = "5678" }, + void, + null, + (struct { + fn callback( + _: ?*void, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.WriteBuffer, + r: xev.WriteError!usize, + ) xev.CallbackAction { + _ = r catch unreachable; + return .disarm; + } + }).callback, + ); + + file.queuePWrite( + &loop, + &write_queue, + &write_req[1], + .{ .slice = "000" }, + 3, + void, + null, + (struct { + fn callback( + _: ?*void, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.WriteBuffer, + r: xev.WriteError!usize, + ) xev.CallbackAction { + _ = r catch unreachable; + return .disarm; + } + }).callback, + ); + + // Wait for the write + try loop.run(.until_done); + + // Make sure the data is on disk + try f.sync(); + + const f2 = try std.fs.cwd().openFile(path, .{}); + defer f2.close(); + const file2 = try Self.init(f2); + + // Read + var read_buf: [128]u8 = undefined; + var read_len: usize = 0; + var c_read: xev.Completion = undefined; + file2.read(&loop, &c_read, .{ .slice = &read_buf }, usize, &read_len, (struct { + fn callback( + ud: ?*usize, + _: *xev.Loop, + _: *xev.Completion, + _: Self, + _: xev.ReadBuffer, + r: xev.ReadError!usize, + ) xev.CallbackAction { + ud.?.* = r catch unreachable; + return .disarm; + } + }).callback); + + try loop.run(.until_done); + try testing.expectEqualSlices(u8, "123000", read_buf[0..read_len]); + } }; }