Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/watcher/file.zig
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,19 @@ fn FileStream(comptime xev: type) type {
/// The underlying file
fd: FdType,

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .read,
.write = .write,
.threadpool = true,
});
pub const close = S.close;
pub const poll = S.poll;
pub const read = S.read;
pub const write = S.write;
pub const writeInit = S.writeInit;
pub const queueWrite = S.queueWrite;

/// Initialize a File from a std.fs.File.
pub fn init(file: std.fs.File) !Self {
Expand Down Expand Up @@ -318,14 +324,19 @@ fn FileDynamic(comptime xev: type) type {

pub const Union = xev.Union(&.{"File"});

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .read,
.write = .write,
.threadpool = true,
.type = "File",
});
pub const close = S.close;
pub const poll = S.poll;
pub const read = S.read;
pub const write = S.write;
pub const queueWrite = S.queueWrite;

pub fn init(file: std.fs.File) !Self {
return .{ .backend = switch (xev.backend) {
Expand Down Expand Up @@ -492,6 +503,18 @@ fn FileTests(
comptime Impl: type,
) type {
return struct {
test "File: Stream decls" {
if (!@hasDecl(Impl, "S")) return;
const Stream = Impl.S;
inline for (@typeInfo(Stream).@"struct".decls) |decl| {
const Decl = @TypeOf(@field(Stream, decl.name));
if (Decl == void) continue;
if (!@hasDecl(Impl, decl.name)) {
@compileError("missing decl: " ++ decl.name);
}
}
}

test "kqueue: zero-length read for readiness" {
if (builtin.os.tag != .macos) return error.SkipZigTest;

Expand Down
65 changes: 51 additions & 14 deletions src/watcher/stream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ const queue = @import("../queue.zig");
/// Options for creating a stream type. Each of the options makes the
/// functionality available for the stream.
pub const Options = struct {
read: ReadMethod,
write: WriteMethod,
close: bool,
poll: bool,
read: ReadMethod = .none,
write: WriteMethod = .none,
close: bool = false,
poll: bool = false,

/// True to schedule the read/write on the threadpool.
threadpool: bool = false,
Expand Down Expand Up @@ -104,21 +104,35 @@ pub fn Shared(comptime xev: type) type {
};
}

/// Creates a stream type that is meant to be embedded within other
/// types using "usingnamespace". A stream is something that supports read,
/// write, close, etc. The exact operations supported are defined by the
/// "options" struct.
/// Creates a stream type that is meant to be embedded within other types.
/// A stream is something that supports read, write, close, etc. The exact
/// operations supported are defined by the "options" struct.
///
/// T requirements:
/// - field named "fd" of type fd_t or socket_t
/// - decl named "initFd" to initialize a new T from a fd
///
pub fn Stream(comptime xev: type, comptime T: type, comptime options: Options) type {
return struct {
pub usingnamespace if (options.close) Closeable(xev, T, options) else struct {};
pub usingnamespace if (options.poll) Pollable(xev, T, options) else struct {};
pub usingnamespace if (options.read != .none) Readable(xev, T, options) else struct {};
pub usingnamespace if (options.write != .none) Writeable(xev, T, options) else struct {};
const C_: ?type = if (options.close) Closeable(xev, T, options) else null;
pub const close = if (C_) |C| C.close else {};

const P_: ?type = if (options.poll) Pollable(xev, T, options) else null;
pub const poll = if (P_) |P| poll: {
if (!@hasDecl(P, "poll")) break :poll {};
break :poll P.poll;
} else {};

const R_: ?type = if (options.read != .none) Readable(xev, T, options) else null;
pub const read = if (R_) |R| R.read else {};

const W_: ?type = if (options.write != .none) Writeable(xev, T, options) else null;
pub const writeInit = if (W_) |W| writeInit: {
if (xev.dynamic) break :writeInit {};
break :writeInit W.writeInit;
} else {};
pub const write = if (W_) |W| W.write else null;
pub const queueWrite = if (W_) |W| W.queueWrite else {};
};
}

Expand Down Expand Up @@ -1034,13 +1048,19 @@ pub fn GenericStream(comptime xev: type) type {

pub const Union = xev.Union(&.{"Stream"});

pub usingnamespace Stream(xev, Self, .{
const S = Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .read,
.write = .write,
.type = "Stream",
});
pub const close = S.close;
pub const poll = S.poll;
pub const read = S.read;
pub const write = S.write;
pub const writeInit = S.writeInit;
pub const queueWrite = S.queueWrite;

pub fn initFd(fd: std.posix.pid_t) Self {
return .{ .backend = switch (xev.backend) {
Expand Down Expand Up @@ -1075,12 +1095,18 @@ pub fn GenericStream(comptime xev: type) type {
/// The underlying file
fd: std.posix.fd_t,

pub usingnamespace Stream(xev, Self, .{
const S = Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .read,
.write = .write,
});
pub const close = S.close;
pub const poll = S.poll;
pub const read = S.read;
pub const write = S.write;
pub const writeInit = S.writeInit;
pub const queueWrite = S.queueWrite;

/// Initialize a generic stream from a file descriptor.
pub fn initFd(fd: std.posix.fd_t) Self {
Expand All @@ -1104,6 +1130,17 @@ pub fn GenericStream(comptime xev: type) type {

fn GenericStreamTests(comptime xev: type, comptime Impl: type) type {
return struct {
test "Stream decls" {
if (!@hasDecl(Impl, "S")) return;
inline for (@typeInfo(Impl.S).@"struct".decls) |decl| {
const Decl = @TypeOf(@field(Impl.S, decl.name));
if (Decl == void) continue;
if (!@hasDecl(Impl, decl.name)) {
@compileError("missing decl: " ++ decl.name);
}
}
}

test "pty: child to parent" {
const testing = std.testing;
switch (builtin.os.tag) {
Expand Down
27 changes: 25 additions & 2 deletions src/watcher/tcp.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ fn TCPStream(comptime xev: type) type {

fd: FdType,

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .recv,
.write = .send,
});
pub const close = S.close;
pub const poll = S.poll;
pub const read = S.read;
pub const write = S.write;
pub const writeInit = S.writeInit;
pub const queueWrite = S.queueWrite;

/// Initialize a new TCP with the family from the given address. Only
/// the family is used, the actual address has no impact on the created
Expand Down Expand Up @@ -249,14 +255,19 @@ fn TCPDynamic(comptime xev: type) type {

pub const Union = xev.Union(&.{"TCP"});

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .read,
.write = .write,
.threadpool = true,
.type = "TCP",
});
pub const close = S.close;
pub const poll = S.poll;
pub const read = S.read;
pub const write = S.write;
pub const queueWrite = S.queueWrite;

pub fn init(addr: std.net.Address) !Self {
return .{ .backend = switch (xev.backend) {
Expand Down Expand Up @@ -492,6 +503,18 @@ fn TCPDynamic(comptime xev: type) type {

fn TCPTests(comptime xev: type, comptime Impl: type) type {
return struct {
test "TCP: Stream decls" {
if (!@hasDecl(Impl, "S")) return;
const Stream = Impl.S;
inline for (@typeInfo(Stream).@"struct".decls) |decl| {
const Decl = @TypeOf(@field(Stream, decl.name));
if (Decl == void) continue;
if (!@hasDecl(Impl, decl.name)) {
@compileError("missing decl: " ++ decl.name);
}
}
}

test "TCP: accept/connect/send/recv/close" {
// We have no way to get a socket in WASI from a WASI context.
if (builtin.os.tag == .wasi) return error.SkipZigTest;
Expand Down
36 changes: 23 additions & 13 deletions src/watcher/udp.zig
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ fn UDPSendto(comptime xev: type) type {
userdata: ?*anyopaque,
};

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .none,
.write = .none,
});
pub const close = S.close;
pub const poll = S.poll;

/// Initialize a new UDP with the family from the given address. Only
/// the family is used, the actual address has no impact on the created
Expand Down Expand Up @@ -226,12 +226,10 @@ fn UDPSendtoIOCP(comptime xev: type) type {
userdata: ?*anyopaque,
};

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = false,
.read = .none,
.write = .none,
});
pub const close = S.close;

/// Initialize a new UDP with the family from the given address. Only
/// the family is used, the actual address has no impact on the created
Expand Down Expand Up @@ -423,12 +421,12 @@ fn UDPSendMsg(comptime xev: type) type {
},
};

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .none,
.write = .none,
});
pub const close = S.close;
pub const poll = S.poll;

/// Initialize a new UDP with the family from the given address. Only
/// the family is used, the actual address has no impact on the created
Expand Down Expand Up @@ -694,13 +692,13 @@ fn UDPDynamic(comptime xev: type) type {
pub const Union = xev.Union(&.{"UDP"});
pub const State = xev.Union(&.{ "UDP", "State" });

pub usingnamespace stream.Stream(xev, Self, .{
const S = stream.Stream(xev, Self, .{
.close = true,
.poll = true,
.read = .none,
.write = .none,
.type = "UDP",
});
pub const close = S.close;
pub const poll = S.poll;

pub fn init(addr: std.net.Address) !Self {
return .{ .backend = switch (xev.backend) {
Expand Down Expand Up @@ -901,6 +899,18 @@ fn UDPDynamic(comptime xev: type) type {

fn UDPTests(comptime xev: type, comptime Impl: type) type {
return struct {
test "UDP: Stream decls" {
if (!@hasDecl(Impl, "S")) return;
const Stream = Impl.S;
inline for (@typeInfo(Stream).@"struct".decls) |decl| {
const Decl = @TypeOf(@field(Stream, decl.name));
if (Decl == void) continue;
if (!@hasDecl(Impl, decl.name)) {
@compileError("missing decl: " ++ decl.name);
}
}
}

test "UDP: read/write" {
if (builtin.os.tag == .freebsd) return error.SkipZigTest;
const testing = std.testing;
Expand Down
36 changes: 32 additions & 4 deletions src/windows.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,35 @@ const std = @import("std");
const windows = std.os.windows;
const posix = std.posix;

pub usingnamespace std.os.windows;
// Forwarded declarations of std.os.windows.
pub const DWORD = windows.DWORD;
pub const FALSE = windows.FALSE;
pub const TRUE = windows.TRUE;
pub const INFINITE = windows.INFINITE;
pub const HANDLE = windows.HANDLE;
pub const INVALID_HANDLE_VALUE = windows.INVALID_HANDLE_VALUE;
pub const OVERLAPPED = windows.OVERLAPPED;
pub const OVERLAPPED_ENTRY = windows.OVERLAPPED_ENTRY;
pub const DUPLICATE_SAME_ACCESS = windows.DUPLICATE_SAME_ACCESS;
pub const GENERIC_READ = windows.GENERIC_READ;
pub const GENERIC_WRITE = windows.GENERIC_WRITE;
pub const OPEN_ALWAYS = windows.OPEN_ALWAYS;
pub const FILE_FLAG_OVERLAPPED = windows.FILE_FLAG_OVERLAPPED;
pub const ReadFileError = windows.ReadFileError;
pub const WriteFileError = windows.WriteFileError;
pub const Win32Error = windows.Win32Error;
pub const WSASocketW = windows.WSASocketW;
pub const kernel32 = windows.kernel32;
pub const ws2_32 = windows.ws2_32;
pub const unexpectedWSAError = windows.unexpectedWSAError;
pub const unexpectedError = windows.unexpectedError;
pub const sliceToPrefixedFileW = windows.sliceToPrefixedFileW;
pub const CloseHandle = windows.CloseHandle;
pub const QueryPerformanceCounter = windows.QueryPerformanceCounter;
pub const QueryPerformanceFrequency = windows.QueryPerformanceFrequency;
pub const GetQueuedCompletionStatusEx = windows.GetQueuedCompletionStatusEx;
pub const PostQueuedCompletionStatus = windows.PostQueuedCompletionStatus;
pub const CreateIoCompletionPort = windows.CreateIoCompletionPort;

pub extern "kernel32" fn DeleteFileW(lpFileName: [*:0]const u16) callconv(windows.WINAPI) windows.BOOL;

Expand Down Expand Up @@ -182,7 +210,7 @@ pub const exp = struct {
lpSecurityAttributes: ?*windows.SECURITY_ATTRIBUTES,
lpName: ?windows.LPCSTR,
) !windows.HANDLE {
const handle = kernel32.CreateJobObjectA(lpSecurityAttributes, lpName);
const handle = exp.kernel32.CreateJobObjectA(lpSecurityAttributes, lpName);
return switch (windows.kernel32.GetLastError()) {
.SUCCESS => handle,
.ALREADY_EXISTS => CreateJobObjectError.AlreadyExists,
Expand All @@ -191,7 +219,7 @@ pub const exp = struct {
}

pub fn AssignProcessToJobObject(hJob: windows.HANDLE, hProcess: windows.HANDLE) posix.UnexpectedError!void {
const result: windows.BOOL = kernel32.AssignProcessToJobObject(hJob, hProcess);
const result: windows.BOOL = exp.kernel32.AssignProcessToJobObject(hJob, hProcess);
if (result == windows.FALSE) {
const err = windows.kernel32.GetLastError();
return switch (err) {
Expand All @@ -206,7 +234,7 @@ pub const exp = struct {
lpJobObjectInformation: windows.LPVOID,
cbJobObjectInformationLength: windows.DWORD,
) posix.UnexpectedError!void {
const result: windows.BOOL = kernel32.SetInformationJobObject(
const result: windows.BOOL = exp.kernel32.SetInformationJobObject(
hJob,
JobObjectInformationClass,
lpJobObjectInformation,
Expand Down
Loading