diff --git a/.gitignore b/.gitignore index e73c965..a745fe5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -zig-cache/ -zig-out/ +.zig-cache +zig-out/ \ No newline at end of file diff --git a/build.zig b/build.zig index 67074cb..887ed71 100644 --- a/build.zig +++ b/build.zig @@ -1,36 +1,57 @@ const std = @import("std"); pub fn build(b: *std.Build) void { - // Standard release options allow the person running `zig build` to select - // between Debug, ReleaseSafe, ReleaseFast, and ReleaseSmall. const mode = b.standardOptimizeOption(.{}); const target = b.standardTargetOptions(.{}); - const local_module = b.addModule("antiphony", .{ - .root_source_file = .{ .path = "src/antiphony.zig" }, - }); - const s2s_dep = b.dependency("s2s", .{}); - const s2s_mod = s2s_dep.module("s2s"); - local_module.addImport("s2s",s2s_mod ); - const linux_example = b.addExecutable(.{ - .name = "socketpair-example", - .root_source_file = .{ - .path = "examples/linux.zig", - }, - .optimize = mode, + // Main antiphony module + const antiphony_mod = b.addModule("antiphony", .{ + .root_source_file = b.path("src/antiphony.zig"), .target = target, + .optimize = mode, }); - linux_example.root_module.addImport("antiphony", local_module); + // Add s2s dependency + const s2s_mod = b.dependency("s2s", .{}).module("s2s"); + antiphony_mod.addImport("s2s", s2s_mod); + + // Linux example + const linux_example = b.addExecutable(.{ + .name = "linux-socketpair-example", + .root_module = b.createModule(.{ + .root_source_file = b.path("examples/linux.zig"), + .optimize = mode, + .target = target, + .imports = &.{.{ .name = "antiphony", .module = antiphony_mod }}, + }), + }); b.installArtifact(linux_example); + // macOS example + const macos_example = b.addExecutable(.{ + .name = "macos-socketpair-example", + .root_module = b.createModule(.{ + .root_source_file = b.path("examples/macos.zig"), + .optimize = mode, + .target = target, + .imports = &.{.{ .name = "antiphony", .module = antiphony_mod }}, + }), + }); + b.installArtifact(macos_example); + + // Run steps for examples + const run_linux = b.addRunArtifact(linux_example); + const run_linux_step = b.step("run-linux", "Run the Linux socketpair example"); + run_linux_step.dependOn(&run_linux.step); + + const run_macos = b.addRunArtifact(macos_example); + const run_macos_step = b.step("run-macos", "Run the macOS machport example"); + run_macos_step.dependOn(&run_macos.step); + + // Tests const main_tests = b.addTest(.{ .name = "antiphony", - .root_source_file = .{ - .path = "src/antiphony.zig" - }, - .target = target, - .optimize = mode, + .root_module = antiphony_mod, }); main_tests.root_module.addImport("s2s", s2s_mod); diff --git a/build.zig.zon b/build.zig.zon index b17f34b..48d2634 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -1,10 +1,10 @@ .{ - .name = "antiphony", + .name = .antiphony, .version = "0.0.0", .dependencies = .{ .s2s = .{ - .url = "https://github.com/ziglibs/s2s/archive/b30205d5e9204899fb6d0fdf28d00ed4d18fe9c9.tar.gz", - .hash = "12202c39c98f05041f1052c268132669dbfcda87e4dbb0353cd84a6070924c8ac0e3", + .url = "git+https://github.com/burakssen/s2s#c48757afb221c003156e143787326c8e80875ee1", + .hash = "s2s-0.0.1-qt8ajZ98AACx_APAb4QftlxP9Ap3O6-lNejjB3YttSc1", }, }, .paths = .{ @@ -14,4 +14,5 @@ //"LICENSE", //"README.md", }, + .fingerprint = 0xc9c0a5e5ff3f18f0, } diff --git a/examples/macos.zig b/examples/macos.zig new file mode 100644 index 0000000..eadc036 --- /dev/null +++ b/examples/macos.zig @@ -0,0 +1,168 @@ +const std = @import("std"); +const rpc = @import("antiphony"); + +const CreateError = error{ OutOfMemory, UnknownCounter }; +const UsageError = error{ OutOfMemory, UnknownCounter }; + +// Define our RPC service via function signatures. +const RcpDefinition = rpc.CreateDefinition(.{ + .host = .{ + .createCounter = fn () CreateError!u32, + .destroyCounter = fn (u32) void, + .increment = fn (u32, u32) UsageError!u32, + .getCount = fn (u32) UsageError!u32, + }, + .client = .{ + .signalError = fn (msg: []const u8) void, + }, +}); + +fn clientImplementation(file: std.fs.File) !void { + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + defer _ = gpa.deinit(); + + const ClientImpl = struct { + pub fn signalError(msg: []const u8) void { + std.log.err("remote error: {s}", .{msg}); + } + }; + + const EndPoint = RcpDefinition.ClientEndPoint(ClientImpl); + + var reader_buffer: [1024]u8 = undefined; + var file_reader = file.reader(&reader_buffer); + + var writer_buffer: [1024]u8 = undefined; + var file_writer = file.writer(&writer_buffer); + + var end_point = EndPoint.init( + gpa.allocator(), + &file_reader.interface, + &file_writer.interface, + ); + defer end_point.destroy(); + + var impl = ClientImpl{}; + std.debug.print("Client connecting to host\n", .{}); + try end_point.connect(&impl); // establish RPC handshake + std.debug.print("Client connected to host\n", .{}); + const handle = try end_point.invoke("createCounter", .{}); + + std.log.info("first increment: {}", .{try end_point.invoke("increment", .{ handle, 5 })}); + std.log.info("second increment: {}", .{try end_point.invoke("increment", .{ handle, 3 })}); + std.log.info("third increment: {}", .{try end_point.invoke("increment", .{ handle, 7 })}); + std.log.info("final count: {}", .{try end_point.invoke("getCount", .{handle})}); + + try end_point.invoke("destroyCounter", .{handle}); + + _ = end_point.invoke("getCount", .{handle}) catch |err| std.log.info("error while calling getCount() with invalid handle: {s}", .{@errorName(err)}); + + try end_point.shutdown(); +} + +fn hostImplementation(file: std.fs.File) !void { + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + defer _ = gpa.deinit(); + + const HostImpl = struct { + const Self = @This(); + + const EndPoint = RcpDefinition.HostEndPoint(Self); + + allocator: std.mem.Allocator, + counters: std.ArrayList(?u32), + end_point: *EndPoint, + + fn sendErrorMessage(self: Self, msg: []const u8) void { + self.end_point.invoke("signalError", .{msg}) catch |err| std.log.err("failed to send error message: {s}", .{@errorName(err)}); + } + + pub fn createCounter(self: *Self) CreateError!u32 { + const index: u32 = @truncate(self.counters.items.len); + try self.counters.append(self.allocator, 0); + return index; + } + pub fn destroyCounter(self: *Self, handle: u32) void { + if (handle >= self.counters.items.len) { + self.sendErrorMessage("unknown counter"); + return; + } + self.counters.items[handle] = null; + } + pub fn increment(self: *Self, handle: u32, count: u32) UsageError!u32 { + if (handle >= self.counters.items.len) { + self.sendErrorMessage("This counter does not exist!"); + return error.UnknownCounter; + } + if (self.counters.items[handle]) |*counter| { + const previous = counter.*; + counter.* +%= count; + return previous; + } else { + self.sendErrorMessage("This counter was already deleted!"); + return error.UnknownCounter; + } + } + pub fn getCount(self: *Self, handle: u32) UsageError!u32 { + if (handle >= self.counters.items.len) { + self.sendErrorMessage("This counter does not exist!"); + return error.UnknownCounter; + } + if (self.counters.items[handle]) |value| { + return value; + } else { + self.sendErrorMessage("This counter was already deleted!"); + return error.UnknownCounter; + } + } + }; + + var reader_buffer: [1024]u8 = undefined; + var file_reader = file.reader(&reader_buffer); + + var writer_buffer: [1024]u8 = undefined; + var file_writer = file.writer(&writer_buffer); + + var end_point = HostImpl.EndPoint.init( + gpa.allocator(), // we need some basic session management + &file_reader.interface, // data input + &file_writer.interface, // data output + ); + defer end_point.destroy(); + + var impl = HostImpl{ + .allocator = gpa.allocator(), + .counters = .empty, + .end_point = &end_point, + }; + defer impl.counters.deinit(gpa.allocator()); + + try end_point.connect(&impl); // establish RPC handshake + + try end_point.acceptCalls(); // blocks until client exits. +} + +// This main function just creates a socket pair and hands them off to two threads that perform some RPC calls. +pub fn main() !void { + const c = std.c; + var sockets: [2]c.fd_t = undefined; + + // Use C socketpair which works on macOS + if (c.socketpair(c.AF.UNIX, c.SOCK.STREAM, 0, &sockets) != 0) { + return error.SocketPairError; + } + + var socket_a = std.fs.File{ .handle = sockets[0] }; + defer socket_a.close(); + + var socket_b = std.fs.File{ .handle = sockets[1] }; + defer socket_b.close(); + + { + var client_thread = try std.Thread.spawn(.{}, clientImplementation, .{socket_a}); + defer client_thread.join(); + + var host_thread = try std.Thread.spawn(.{}, hostImplementation, .{socket_b}); + defer host_thread.join(); + } +} diff --git a/src/antiphony.zig b/src/antiphony.zig index fef2d59..44e9f7a 100644 --- a/src/antiphony.zig +++ b/src/antiphony.zig @@ -49,15 +49,15 @@ pub fn CreateDefinition(comptime spec: anytype) type { return struct { const Self = @This(); - pub fn HostEndPoint(comptime Reader: type, comptime Writer: type, comptime Implementation: type) type { - return CreateEndPoint(.host, Reader, Writer, Implementation); + pub fn HostEndPoint(comptime Implementation: type) type { + return CreateEndPoint(.host, Implementation); } - pub fn ClientEndPoint(comptime Reader: type, comptime Writer: type, comptime Implementation: type) type { - return CreateEndPoint(.client, Reader, Writer, Implementation); + pub fn ClientEndPoint(comptime Implementation: type) type { + return CreateEndPoint(.client, Implementation); } - pub fn CreateEndPoint(comptime role: Role, comptime ReaderType: type, comptime WriterType: type, comptime ImplementationType: type) type { + pub fn CreateEndPoint(comptime role: Role, comptime ImplementationType: type) type { const inbound_spec = switch (role) { .host => host_spec, .client => client_spec, @@ -78,23 +78,21 @@ pub fn CreateDefinition(comptime spec: anytype) type { return struct { const EndPoint = @This(); - pub const Reader = ReaderType; - pub const Writer = WriterType; pub const Implementation = ImplementationType; - pub const IoError = Reader.Error || Writer.Error || error{EndOfStream}; + pub const IoError = std.Io.Reader.Error || std.Io.Writer.Error || error{EndOfStream}; pub const ProtocolError = error{ ProtocolViolation, InvalidProtocol, ProtocolMismatch }; pub const InvokeError = IoError || ProtocolError || std.mem.Allocator.Error; pub const ConnectError = IoError || ProtocolError; allocator: std.mem.Allocator, - reader: Reader, - writer: Writer, + reader: *std.Io.Reader, + writer: *std.Io.Writer, sequence_id: u32 = 0, impl: ?*Implementation = null, - pub fn init(allocator: std.mem.Allocator, reader: Reader, writer: Writer) EndPoint { + pub fn init(allocator: std.mem.Allocator, reader: *std.Io.Reader, writer: *std.Io.Writer) EndPoint { return EndPoint{ .allocator = allocator, .reader = reader, @@ -112,17 +110,16 @@ pub fn CreateDefinition(comptime spec: anytype) type { /// Performs the initial handshake with the remote peer. /// Both agree on the used RPC version and that they use the same protocol. pub fn connect(self: *EndPoint, impl: *Implementation) ConnectError!void { - std.debug.assert(self.impl == null); // do not call twice - try self.writer.writeAll(&protocol_magic); try self.writer.writeByte(current_version); // version byte + try self.writer.flush(); var remote_magic: [4]u8 = undefined; - try self.reader.readNoEof(&remote_magic); + try self.reader.readSliceAll(&remote_magic); if (!std.mem.eql(u8, &protocol_magic, &remote_magic)) return error.InvalidProtocol; - const remote_version = try self.reader.readByte(); + const remote_version = try self.reader.takeByte(); if (remote_version != current_version) return error.ProtocolMismatch; @@ -135,6 +132,7 @@ pub fn CreateDefinition(comptime spec: anytype) type { pub fn shutdown(self: *EndPoint) IoError!void { std.debug.assert(self.impl != null); // call these only after a successful connection! try self.writer.writeByte(@intFromEnum(CommandId.shutdown)); + try self.writer.flush(); } /// Waits for incoming calls and handles them till the client shuts down the connection. @@ -143,7 +141,7 @@ pub fn CreateDefinition(comptime spec: anytype) type { pub fn acceptCalls(self: *EndPoint) InvokeError!void { std.debug.assert(self.impl != null); // call these only after a successful connection! while (true) { - const cmd_id = try self.reader.readByte(); + const cmd_id = try self.reader.takeByte(); const cmd = std.meta.intToEnum(CommandId, cmd_id) catch return error.ProtocolViolation; switch (cmd) { .call => { @@ -157,16 +155,16 @@ pub fn CreateDefinition(comptime spec: anytype) type { pub fn InvokeReturnType(comptime func_name: []const u8) type { const FuncPrototype = @field(outbound_spec, func_name); - const func_info = @typeInfo(FuncPrototype).Fn; + const func_info = @typeInfo(FuncPrototype).@"fn"; const FuncReturnType = func_info.return_type.?; if (config.merge_error_sets) { switch (@typeInfo(FuncReturnType)) { // We merge error sets, but still return the original function payload - .ErrorUnion => |eu| return (InvokeError || eu.error_set)!eu.payload, + .error_union => |eu| return (InvokeError || eu.error_set)!eu.payload, // we just merge error sets, the result will be `void` in *no* case (but makes handling easier) - .ErrorSet => return (InvokeError || FuncReturnType)!void, + .error_set => return (InvokeError || FuncReturnType)!void, // The function doesn't return an error, so we just return InvokeError *or* the function return value. else => return InvokeError!FuncReturnType, @@ -199,7 +197,7 @@ pub fn CreateDefinition(comptime spec: anytype) type { std.debug.assert(self.impl != null); // call these only after a successful connection! const FuncPrototype = @field(outbound_spec, func_name); const ArgsTuple = std.meta.ArgsTuple(FuncPrototype); - const func_info = @typeInfo(FuncPrototype).Fn; + const func_info = @typeInfo(FuncPrototype).@"fn"; var arg_list: ArgsTuple = undefined; @@ -221,6 +219,8 @@ pub fn CreateDefinition(comptime spec: anytype) type { try self.writer.writeAll(func_name); try s2s.serialize(self.writer, ArgsTuple, arg_list); + try self.writer.flush(); + try self.waitForResponse(sequence_id); const FuncReturnType = func_info.return_type.?; @@ -245,9 +245,9 @@ pub fn CreateDefinition(comptime spec: anytype) type { const result = s2s.deserialize(self.reader, InvocationResult(FuncReturnType)) catch return error.ProtocolViolation; if (config.merge_error_sets) { - if (@typeInfo(FuncReturnType) == .ErrorUnion) { + if (@typeInfo(FuncReturnType) == .error_union) { return try result.unwrap(); - } else if (@typeInfo(FuncReturnType) == .ErrorSet) { + } else if (@typeInfo(FuncReturnType) == .error_set) { return result.unwrap(); } else { return result.unwrap(); @@ -264,14 +264,14 @@ pub fn CreateDefinition(comptime spec: anytype) type { /// Leaves the reader in a state so the response can be deserialized directly from the stream. fn waitForResponse(self: *EndPoint, sequence_id: SequenceID) !void { while (true) { - const cmd_id = try self.reader.readByte(); + const cmd_id = try self.reader.takeByte(); const cmd = std.meta.intToEnum(CommandId, cmd_id) catch return error.ProtocolViolation; switch (cmd) { .call => { try self.processCall(); }, .response => { - const seq = @as(SequenceID,@enumFromInt(try self.reader.readInt(u32, .little))); + const seq = @as(SequenceID, @enumFromInt(try self.reader.takeInt(u32, .little))); if (seq != sequence_id) return error.ProtocolViolation; return; @@ -283,13 +283,13 @@ pub fn CreateDefinition(comptime spec: anytype) type { /// Deserializes call information fn processCall(self: *EndPoint) !void { - const sequence_id = @as(SequenceID,@enumFromInt(try self.reader.readInt(u32, .little))); - const name_length = try self.reader.readInt(u32, .little); + const sequence_id = @as(SequenceID, @enumFromInt(try self.reader.takeInt(u32, .little))); + const name_length = try self.reader.takeInt(u32, .little); if (name_length > max_received_func_name_len) return error.ProtocolViolation; var name_buffer: [max_received_func_name_len]u8 = undefined; const function_name = name_buffer[0..name_length]; - try self.reader.readNoEof(function_name); + try self.reader.readSliceAll(function_name); inline for (std.meta.fields(InboundSpec)) |fld| { if (std.mem.eql(u8, fld.name, function_name)) { @@ -309,13 +309,13 @@ pub fn CreateDefinition(comptime spec: anytype) type { const ImplFunc = @TypeOf(impl_func); - const SpecReturnType = @typeInfo(FuncSpec).Fn.return_type.?; + const SpecReturnType = @typeInfo(FuncSpec).@"fn".return_type.?; const impl_func_info = @typeInfo(ImplFunc); - if (impl_func_info != .Fn) + if (impl_func_info != .@"fn") @compileError(@typeName(Implementation) ++ "." ++ function_name ++ " must be a function with invocable signature " ++ @typeName(FuncSpec)); - const impl_func_fn = impl_func_info.Fn; + const impl_func_fn = impl_func_info.@"fn"; var invocation_args: FuncArgs = s2s.deserializeAlloc(self.reader, FuncArgs, self.allocator) catch |err| switch (err) { error.UnexpectedData => return error.ProtocolViolation, @@ -345,7 +345,7 @@ pub fn CreateDefinition(comptime spec: anytype) type { break :b @call(.auto, impl_func, .{proxy} ++ invocation_args); } // invocation with self - else if (@typeInfo(Arg0Type) == .Pointer) + else if (@typeInfo(Arg0Type) == .pointer) @call(.auto, impl_func, .{self.impl.?} ++ invocation_args) else @call(.auto, impl_func, .{self.impl.?.*} ++ invocation_args); @@ -354,6 +354,7 @@ pub fn CreateDefinition(comptime spec: anytype) type { try self.writer.writeByte(@intFromEnum(CommandId.response)); try self.writer.writeInt(u32, @intFromEnum(sequence_id), .little); try s2s.serialize(self.writer, InvocationResult(SpecReturnType), invocationResult(SpecReturnType, result)); + try self.writer.flush(); } fn nextSequenceID(self: *EndPoint) SequenceID { @@ -412,10 +413,10 @@ fn validateSpec(comptime funcs: anytype) void { @compileError("All fields of .host or .client must be function types!"); const field_info = @typeInfo(@field(funcs, fld.name)); - if (field_info != .Fn) + if (field_info != .@"fn") @compileError("All fields of .host or .client must be function types!"); - const func_info: std.builtin.Type.Fn = field_info.Fn; + const func_info: std.builtin.Type.Fn = field_info.@"fn"; if (func_info.is_generic) @compileError("Cannot handle generic functions"); for (func_info.params) |param| { if (param.is_generic) @compileError("Cannot handle generic functions"); @@ -453,30 +454,27 @@ test "invoke function (emulated host)" { const ClientImpl = struct {}; - var output_stream = std.ArrayList(u8).init(std.testing.allocator); + var output_stream: std.Io.Writer.Allocating = .init(std.testing.allocator); defer output_stream.deinit(); - const input_data = comptime blk: { - var buffer: [4096]u8 = undefined; - var stream = std.io.fixedBufferStream(&buffer); - var writer = stream.writer(); - - try writer.writeAll(&protocol_magic); - try writer.writeByte(current_version); + var buffer: [4096]u8 = undefined; + var writer: std.Io.Writer = .fixed(&buffer); - try writer.writeByte(@intFromEnum(CommandId.response)); - try writer.writeInt(u32, 0, .little); // first sequence id + try writer.writeAll(&protocol_magic); + try writer.writeByte(current_version); - try s2s.serialize(writer, InvocationResult(void), invocationResult(void, {})); + try writer.writeByte(@intFromEnum(CommandId.response)); + try writer.writeInt(u32, 0, .little); // first sequence id - break :blk stream.getWritten(); - }; - var input_stream = std.io.fixedBufferStream(@as([]const u8, input_data)); + try s2s.serialize(&writer, InvocationResult(void), invocationResult(void, {})); + try writer.flush(); + const input_data = writer.buffered(); - const EndPoint = RcpDefinition.ClientEndPoint(std.io.FixedBufferStream([]const u8).Reader, std.ArrayList(u8).Writer, ClientImpl); + var input_stream: std.Io.Reader = .fixed(input_data); - var end_point = EndPoint.init(std.testing.allocator, input_stream.reader(), output_stream.writer()); + const EndPoint = RcpDefinition.ClientEndPoint(ClientImpl); + var end_point = EndPoint.init(std.testing.allocator, &input_stream, &output_stream.writer); var impl = ClientImpl{}; try end_point.connect(&impl); @@ -497,41 +495,41 @@ test "invoke function (emulated client, no self parameter)" { const HostImpl = struct { fn some(a: u32, b: f32, c: []const u8) u32 { std.debug.print("some({}, {d}, \"{s}\");\n", .{ a, b, c }); - return a + @as(u32,@intFromFloat(b)); + return a + @as(u32, @intFromFloat(b)); } }; - var output_stream = std.ArrayList(u8).init(std.testing.allocator); + var output_stream: std.Io.Writer.Allocating = .init(std.testing.allocator); defer output_stream.deinit(); - const input_data = comptime blk: { - var buffer: [4096]u8 = undefined; - var stream = std.io.fixedBufferStream(&buffer); - var writer = stream.writer(); + var buffer: [4096]u8 = undefined; + var writer: std.Io.Writer = .fixed(&buffer); - try writer.writeAll(&protocol_magic); - try writer.writeByte(current_version); + try writer.writeAll(&protocol_magic); + try writer.writeByte(current_version); - try writer.writeByte(@intFromEnum(CommandId.call)); - try writer.writeInt(u32, 1337, .little); // first sequence id - try writer.writeInt(u32, "some".len, .little); - try writer.writeAll("some"); + try writer.writeByte(@intFromEnum(CommandId.call)); + try writer.writeInt(u32, 1337, .little); // first sequence id + try writer.writeInt(u32, "some".len, .little); + try writer.writeAll("some"); - try s2s.serialize(writer, std.meta.Tuple(&.{ u32, f32, []const u8 }), .{ - .@"0" = 1334, - .@"1" = std.math.pi, - .@"2" = "Hello, Host!", - }); + try s2s.serialize(&writer, std.meta.Tuple(&.{ u32, f32, []const u8 }), .{ + .@"0" = 1334, + .@"1" = std.math.pi, + .@"2" = "Hello, Host!", + }); - try writer.writeByte(@intFromEnum(CommandId.shutdown)); + try writer.writeByte(@intFromEnum(CommandId.shutdown)); - break :blk stream.getWritten(); - }; - var input_stream = std.io.fixedBufferStream(@as([]const u8, input_data)); + try writer.flush(); + + const input_data = writer.buffered(); - const EndPoint = RcpDefinition.HostEndPoint(std.io.FixedBufferStream([]const u8).Reader, std.ArrayList(u8).Writer, HostImpl); + var input_stream: std.Io.Reader = .fixed(input_data); - var end_point = EndPoint.init(std.testing.allocator, input_stream.reader(), output_stream.writer()); + const EndPoint = RcpDefinition.HostEndPoint(HostImpl); + + var end_point = EndPoint.init(std.testing.allocator, &input_stream, &output_stream.writer); var impl = HostImpl{}; try end_point.connect(&impl); @@ -555,41 +553,41 @@ test "invoke function (emulated client, with self parameter)" { fn some(self: @This(), a: u32, b: f32, c: []const u8) u32 { std.debug.print("some({}, {}, {d}, \"{s}\");\n", .{ self.dummy, a, b, c }); - return a + @as(u32,@intFromFloat(b)); + return a + @as(u32, @intFromFloat(b)); } }; - var output_stream = std.ArrayList(u8).init(std.testing.allocator); + var output_stream: std.Io.Writer.Allocating = .init(std.testing.allocator); defer output_stream.deinit(); - const input_data = comptime blk: { - var buffer: [4096]u8 = undefined; - var stream = std.io.fixedBufferStream(&buffer); - var writer = stream.writer(); + var buffer: [4096]u8 = undefined; + var writer: std.Io.Writer = .fixed(&buffer); - try writer.writeAll(&protocol_magic); - try writer.writeByte(current_version); + try writer.writeAll(&protocol_magic); + try writer.writeByte(current_version); - try writer.writeByte(@intFromEnum(CommandId.call)); - try writer.writeInt(u32, 1337, .little); // first sequence id - try writer.writeInt(u32, "some".len, .little); - try writer.writeAll("some"); + try writer.writeByte(@intFromEnum(CommandId.call)); + try writer.writeInt(u32, 1337, .little); // first sequence id + try writer.writeInt(u32, "some".len, .little); + try writer.writeAll("some"); - try s2s.serialize(writer, std.meta.Tuple(&.{ u32, f32, []const u8 }), .{ - .@"0" = 1334, - .@"1" = std.math.pi, - .@"2" = "Hello, Host!", - }); + try s2s.serialize(&writer, std.meta.Tuple(&.{ u32, f32, []const u8 }), .{ + .@"0" = 1334, + .@"1" = std.math.pi, + .@"2" = "Hello, Host!", + }); - try writer.writeByte(@intFromEnum(CommandId.shutdown)); + try writer.writeByte(@intFromEnum(CommandId.shutdown)); - break :blk stream.getWritten(); - }; - var input_stream = std.io.fixedBufferStream(@as([]const u8, input_data)); + try writer.flush(); + + const input_data = writer.buffered(); + + var input_stream: std.Io.Reader = .fixed(input_data); - const EndPoint = RcpDefinition.HostEndPoint(std.io.FixedBufferStream([]const u8).Reader, std.ArrayList(u8).Writer, HostImpl); + const EndPoint = RcpDefinition.HostEndPoint(HostImpl); - var end_point = EndPoint.init(std.testing.allocator, input_stream.reader(), output_stream.writer()); + var end_point = EndPoint.init(std.testing.allocator, &input_stream, &output_stream.writer); var impl = HostImpl{ .dummy = 123 }; try end_point.connect(&impl); @@ -616,39 +614,38 @@ test "invoke function with callback (emulated host, no self parameter)" { } }; - var output_stream = std.ArrayList(u8).init(std.testing.allocator); + var output_stream: std.Io.Writer.Allocating = .init(std.testing.allocator); defer output_stream.deinit(); - const input_data = comptime blk: { - var buffer: [4096]u8 = undefined; - var stream = std.io.fixedBufferStream(&buffer); - var writer = stream.writer(); + var buffer: [4096]u8 = undefined; + var writer: std.Io.Writer = .fixed(&buffer); - try writer.writeAll(&protocol_magic); - try writer.writeByte(current_version); + try writer.writeAll(&protocol_magic); + try writer.writeByte(current_version); - try writer.writeByte(@intFromEnum(CommandId.call)); - try writer.writeInt(u32, 1337, .little); // first sequence id - try writer.writeInt(u32, "callback".len, .little); - try writer.writeAll("callback"); + try writer.writeByte(@intFromEnum(CommandId.call)); + try writer.writeInt(u32, 1337, .little); // first sequence id + try writer.writeInt(u32, "callback".len, .little); + try writer.writeAll("callback"); - try s2s.serialize(writer, std.meta.Tuple(&.{[]const u8}), .{ - .@"0" = "Hello, World!", - }); + try s2s.serialize(&writer, std.meta.Tuple(&.{[]const u8}), .{ + .@"0" = "Hello, World!", + }); - try writer.writeByte(@intFromEnum(CommandId.response)); - try writer.writeInt(u32, 0, .little); // first sequence id + try writer.writeByte(@intFromEnum(CommandId.response)); + try writer.writeInt(u32, 0, .little); // first sequence id - try s2s.serialize(writer, InvocationResult(void), invocationResult(void, {})); + try s2s.serialize(&writer, InvocationResult(void), invocationResult(void, {})); - break :blk stream.getWritten(); - }; - var input_stream = std.io.fixedBufferStream(@as([]const u8, input_data)); + try writer.flush(); - const EndPoint = RcpDefinition.ClientEndPoint(std.io.FixedBufferStream([]const u8).Reader, std.ArrayList(u8).Writer, ClientImpl); + const input_data = writer.buffered(); - var end_point = EndPoint.init(std.testing.allocator, input_stream.reader(), output_stream.writer()); + var input_stream: std.Io.Reader = .fixed(input_data); + const EndPoint = RcpDefinition.ClientEndPoint(ClientImpl); + + var end_point = EndPoint.init(std.testing.allocator, &input_stream, &output_stream.writer); var impl = ClientImpl{}; try end_point.connect(&impl); @@ -676,38 +673,38 @@ test "invoke function with callback (emulated host, with self parameter)" { } }; - var output_stream = std.ArrayList(u8).init(std.testing.allocator); + var output_stream: std.Io.Writer.Allocating = .init(std.testing.allocator); defer output_stream.deinit(); - const input_data = comptime blk: { - var buffer: [4096]u8 = undefined; - var stream = std.io.fixedBufferStream(&buffer); - var writer = stream.writer(); + var buffer: [4096]u8 = undefined; + var writer: std.Io.Writer = .fixed(&buffer); - try writer.writeAll(&protocol_magic); - try writer.writeByte(current_version); + try writer.writeAll(&protocol_magic); + try writer.writeByte(current_version); - try writer.writeByte(@intFromEnum(CommandId.call)); - try writer.writeInt(u32, 1337, .little); // first sequence id - try writer.writeInt(u32, "callback".len, .little); - try writer.writeAll("callback"); + try writer.writeByte(@intFromEnum(CommandId.call)); + try writer.writeInt(u32, 1337, .little); // first sequence id + try writer.writeInt(u32, "callback".len, .little); + try writer.writeAll("callback"); - try s2s.serialize(writer, std.meta.Tuple(&.{[]const u8}), .{ - .@"0" = "Hello, World!", - }); + try s2s.serialize(&writer, std.meta.Tuple(&.{[]const u8}), .{ + .@"0" = "Hello, World!", + }); - try writer.writeByte(@intFromEnum(CommandId.response)); - try writer.writeInt(u32, 0, .little); // first sequence id + try writer.writeByte(@intFromEnum(CommandId.response)); + try writer.writeInt(u32, 0, .little); // first sequence id - try s2s.serialize(writer, InvocationResult(void), invocationResult(void, {})); + try s2s.serialize(&writer, InvocationResult(void), invocationResult(void, {})); - break :blk stream.getWritten(); - }; - var input_stream = std.io.fixedBufferStream(@as([]const u8, input_data)); + try writer.flush(); + + const input_data = writer.buffered(); + + var input_stream: std.Io.Reader = .fixed(input_data); - const EndPoint = RcpDefinition.ClientEndPoint(std.io.FixedBufferStream([]const u8).Reader, std.ArrayList(u8).Writer, ClientImpl); + const EndPoint = RcpDefinition.ClientEndPoint(ClientImpl); - var end_point = EndPoint.init(std.testing.allocator, input_stream.reader(), output_stream.writer()); + var end_point = EndPoint.init(std.testing.allocator, &input_stream, &output_stream.writer); var impl = ClientImpl{}; try end_point.connect(&impl);