Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 128 additions & 1 deletion src/watcher/file.zig
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ fn FileStream(comptime xev: type) type {
) xev.CallbackAction,
) void {
// Initialize our completion
req.* = .{};
req.* = .{ .full_write_buffer = buf };
self.pwriteInit(&req.completion, buf, offset);
req.completion.userdata = q;
req.completion.callback = (struct {
Expand Down Expand Up @@ -313,6 +313,133 @@ fn FileStream(comptime xev: type) type {
test {
_ = FileTests(xev, Self);
}

test "queuePWrite" {
// wasi: local files don't work with poll (always ready)
if (builtin.os.tag == .wasi) return error.SkipZigTest;
// windows: std.fs.File is not opened with OVERLAPPED flag.
if (builtin.os.tag == .windows) return error.SkipZigTest;
if (builtin.os.tag == .freebsd) return error.SkipZigTest;

const testing = std.testing;

var tpool = main.ThreadPool.init(.{});
defer tpool.deinit();
defer tpool.shutdown();
var loop = try xev.Loop.init(.{ .thread_pool = &tpool });
defer loop.deinit();

// Create our file
const path = "test_watcher_file";
const f = try std.fs.cwd().createFile(path, .{
.read = true,
.truncate = true,
});
defer f.close();
defer std.fs.cwd().deleteFile(path) catch {};

const file = try Self.init(f);
var write_queue: xev.WriteQueue = .{};
var write_req: [2]xev.WriteRequest = undefined;

// Perform a write and then a read
file.queueWrite(
&loop,
&write_queue,
&write_req[0],
.{ .slice = "1234" },
void,
null,
(struct {
fn callback(
_: ?*void,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
_: xev.WriteBuffer,
r: xev.WriteError!usize,
) xev.CallbackAction {
_ = r catch unreachable;
return .disarm;
}
}).callback,
);
file.queueWrite(
&loop,
&write_queue,
&write_req[1],
.{ .slice = "5678" },
void,
null,
(struct {
fn callback(
_: ?*void,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
_: xev.WriteBuffer,
r: xev.WriteError!usize,
) xev.CallbackAction {
_ = r catch unreachable;
return .disarm;
}
}).callback,
);

file.queuePWrite(
&loop,
&write_queue,
&write_req[1],
.{ .slice = "000" },
3,
void,
null,
(struct {
fn callback(
_: ?*void,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
_: xev.WriteBuffer,
r: xev.WriteError!usize,
) xev.CallbackAction {
_ = r catch unreachable;
return .disarm;
}
}).callback,
);

// Wait for the write
try loop.run(.until_done);

// Make sure the data is on disk
try f.sync();

const f2 = try std.fs.cwd().openFile(path, .{});
defer f2.close();
const file2 = try Self.init(f2);

// Read
var read_buf: [128]u8 = undefined;
var read_len: usize = 0;
var c_read: xev.Completion = undefined;
file2.read(&loop, &c_read, .{ .slice = &read_buf }, usize, &read_len, (struct {
fn callback(
ud: ?*usize,
_: *xev.Loop,
_: *xev.Completion,
_: Self,
_: xev.ReadBuffer,
r: xev.ReadError!usize,
) xev.CallbackAction {
ud.?.* = r catch unreachable;
return .disarm;
}
}).callback);

try loop.run(.until_done);
try testing.expectEqualSlices(u8, "123000", read_buf[0..read_len]);
}
};
}

Expand Down